J'essaye de mettre en place un pipeline de données simple d'un producteur de console Kafka au système de fichiers Hadoop (HDFS). Je travaille sur une machine virtuelle Ubuntu 64 bits et j'ai créé des utilisateurs séparés pour Hadoop et Kafka, comme l'ont suggéré les guides que j'ai suivis. Consommer l'entrée produite dans Kafka avec une console de consommation fonctionne et le HDFS semble être opérationnel.EOFException de Kafka dans Flume
Maintenant, je veux utiliser Flume pour canaliser l'entrée dans le HDFS. J'utilise le fichier de configuration suivante:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 2000
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://flume/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
Maintenant, quand je lance Flume avec le je reçois la même exception dans la sortie de la console encore et encore commande
bin/flume-ng agent --conf ./conf -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n tier1
suivant:
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:467)] Completed connection to node 2147483647
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)] Connection with Ubuntu-Sandbox/127.0.1.1 disconnected
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:529)
at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
La seule façon d'arrêter Flume est de tuer le processus Java. Je pensais que cela pourrait avoir quelque chose à voir avec les utilisateurs séparés pour Hadoop et Kafka, mais même quand tout est exécuté avec l'utilisateur Kafka, j'obtiens le même résultat. Je n'ai pas non plus trouvé quoi que ce soit concernant la méthode EOFException en ligne, ce qui est étrange étant donné que je viens de suivre les guides "Getting Started" et que j'ai utilisé de jolies configurations standard pour tout.
Peut-être que cela a quelque chose à voir avec la ligne précédente ("Ubuntu-Sandbox/127.0.1.1 déconnecté") et donc la configuration de ma machine virtuelle?
Toute aide est fortement appréciée!
Merci pour le conseil, Robin. Je me suis familiarisé avec Confluent et il semble définitivement rendre tout plus facile. Cependant, encore une fois je ne peux pas l'obtenir pour écrire des données de Kafka au HDFS en suivant simplement le guide Quickstart ... Cette fois je ne reçois même pas d'exception, le processus "connect-standalone" ne finira pas et le dossier dans le HDFS - en dépit d'être créé - est vide ... C'est vraiment frustrant! – stefanS