2017-10-19 27 views
0

J'essaye de mettre en place un pipeline de données simple d'un producteur de console Kafka au système de fichiers Hadoop (HDFS). Je travaille sur une machine virtuelle Ubuntu 64 bits et j'ai créé des utilisateurs séparés pour Hadoop et Kafka, comme l'ont suggéré les guides que j'ai suivis. Consommer l'entrée produite dans Kafka avec une console de consommation fonctionne et le HDFS semble être opérationnel.EOFException de Kafka dans Flume

Maintenant, je veux utiliser Flume pour canaliser l'entrée dans le HDFS. J'utilise le fichier de configuration suivante:

tier1.sources = source1 
tier1.channels = channel1 
tier1.sinks = sink1 

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181 
tier1.sources.source1.topic = test 
tier1.sources.source1.groupId = flume 
tier1.sources.source1.channels = channel1 
tier1.sources.source1.interceptors = i1 
tier1.sources.source1.interceptors.i1.type = timestamp 
tier1.sources.source1.kafka.consumer.timeout.ms = 2000 

tier1.channels.channel1.type = memory 
tier1.channels.channel1.capacity = 10000 
tier1.channels.channel1.transactionCapacity = 1000 

tier1.sinks.sink1.type = hdfs 
tier1.sinks.sink1.hdfs.path = hdfs://flume/kafka/%{topic}/%y-%m-%d 
tier1.sinks.sink1.hdfs.rollInterval = 5 
tier1.sinks.sink1.hdfs.rollSize = 0 
tier1.sinks.sink1.hdfs.rollCount = 0 
tier1.sinks.sink1.hdfs.fileType = DataStream 
tier1.sinks.sink1.channel = channel1 

Maintenant, quand je lance Flume avec le je reçois la même exception dans la sortie de la console encore et encore commande

bin/flume-ng agent --conf ./conf -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n tier1 

suivant:

2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:467)] Completed connection to node 2147483647 
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)] Connection with Ubuntu-Sandbox/127.0.1.1 disconnected 
java.io.EOFException 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:286) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:529) 
    at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83) 
    at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71) 
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

La seule façon d'arrêter Flume est de tuer le processus Java. Je pensais que cela pourrait avoir quelque chose à voir avec les utilisateurs séparés pour Hadoop et Kafka, mais même quand tout est exécuté avec l'utilisateur Kafka, j'obtiens le même résultat. Je n'ai pas non plus trouvé quoi que ce soit concernant la méthode EOFException en ligne, ce qui est étrange étant donné que je viens de suivre les guides "Getting Started" et que j'ai utilisé de jolies configurations standard pour tout.

Peut-être que cela a quelque chose à voir avec la ligne précédente ("Ubuntu-Sandbox/127.0.1.1 déconnecté") et donc la configuration de ma machine virtuelle?

Toute aide est fortement appréciée!

Répondre

0

Avez-vous envisagé d'utiliser Kafka Connect (partie d'Apache Kafka) et le HDFS connector à la place? Ceci est généralement vu pour avoir remplacé Flume. Il est facile à utiliser, avec une configuration similaire à celle de Flume.

+0

Merci pour le conseil, Robin. Je me suis familiarisé avec Confluent et il semble définitivement rendre tout plus facile. Cependant, encore une fois je ne peux pas l'obtenir pour écrire des données de Kafka au HDFS en suivant simplement le guide Quickstart ... Cette fois je ne reçois même pas d'exception, le processus "connect-standalone" ne finira pas et le dossier dans le HDFS - en dépit d'être créé - est vide ... C'est vraiment frustrant! – stefanS