J'ai créé un script python raw_tweets_stream.py
pour diffuser des données twitter en utilisant twitter api. Les données json de twitter sont envoyées au producteur kafka en utilisant le script ci-dessous.Impossible d'envoyer des événements json tweets à Kafka sujet/producteur en utilisant la ligne de commande kafka
`python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:2181 --topic raw_json_tweets`
raw_json_tweets
est le sujet de kafka créé pour ces tweets. Le script python raw_tweets_stream.py
fonctionne très bien mais il génère une erreur en l'envoyant au producteur kafka. J'utilise le bac à sable Hortonworks HDP 2.3.1 et je me suis assuré que le zookeeper et le kafka sont démarrés.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic raw_json_tweets
Topic:raw_json_tweets PartitionCount:1 ReplicationFactor:1 Configs:
Topic: raw_json_tweets Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Erreur:
[2016-08-25 22:36:26,212] ERROR Failed to send requests for topics raw_json_tweets with correlation ids in [57,64] (kafka.producer.async.DefaultEventHandler)
[2016-08-25 22:36:26,213] ERROR Error in handling batch of 131 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-08-25 22:36:27,217] WARN Fetching topic metadata with correlation id 65 for topics [Set(json_tweets1)] from broker [BrokerEndPoint(0,localhost,2181)] failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Mise à jour: Solution
- Nous sommes allés à Ambari Services et avons modifié le répertoire des journaux Kafka en
/tmp/kafka-logs
. Modification du script d'origine pour inclure le port et le nom d'hôte corrects.
python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic raw_json_tweets
vérifié que les événements sont envoyés au sujet de kafka à l'aide des consommateurs de la console.
/usr/hdp/2.3.0.0-2557/kafka/bin/kafka-console-consumer.sh -zookeeper sandbox.hortonworks.com:2181 -topic raw_json_tweets -from-beginning
Merci de nous avoir signalé que @Binary Nerd. J'ai mis à jour le courtier de kafka avec le bon port 9092 mais jetant toujours l'erreur. Voici une partie de l'erreur -> [2016-08-26 13: 24: 12,718] ERREUR Échec de l'assemblage des messages par sujet, partition due à: récupérer les métadonnées de rubrique pour les rubriques [Set (raw_json_tweets)] du courtier [ArrayBuffer (BrokerEndPoint (0, localhost, 9092))] a échoué (kafka.producer.async.DefaultEventHandler) .. java.nio.channels.ClosedChannelException – gkc123
Selon la documentation de Hortonworks, si vous utilisez Ambari, le port par défaut est '6667', peut-être essayez cela - https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_secure-kafka-ambari/content/ch_secure-kafka-config-options.html –
Vous avez raison. Le port correct pour hortonworks est 6667 (vérifié aussi en allant à Ambari Services). Modification du script pour inclure le port correct '6667' et le nom d'hôte complet' sandbox.hortonworks.com' .. travaillé comme un charme. 'python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic raw_json_tweets' – gkc123