2017-10-16 16 views
1

Je ces modules en cours d'exécution (tous locaux):ne peut pas produire à Kafka à partir du code Python

  • ZooKeeper
  • Kafka serveur
  • Kafka Consumer
  • script Python

Dans le script il y a un send() appel:

producer = KafkaProducer(bootstrap_servers=['localhost:9092']) 
producer.send('test', 'entry1') 

toutes les 15 secondes environ. Presque tous les send() rendements d'appel sur le Kafka serveur ces erreurs:

[2017-10-16 18:59:10,953] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) 
kafka.common.KafkaException: Wrong request type 16 
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) 
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) 
at kafka.network.Processor.read(SocketServer.scala:450) 
at kafka.network.Processor.run(SocketServer.scala:340) 
at java.lang.Thread.run(Thread.java:748) 
[2017-10-16 18:59:11,158] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2017-10-16 18:59:11,162] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2017-10-16 18:59:11,162] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) 
kafka.common.KafkaException: Wrong request type 18 
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) 
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) 
at kafka.network.Processor.read(SocketServer.scala:450) 
at kafka.network.Processor.run(SocketServer.scala:340) 
at java.lang.Thread.run(Thread.java:748) 

Il est un presque, parce que tous les 5 menuets (plus ou moins) l'entrée n'obtient le droit à la consommation, mais les erreurs retour.

Merci pour l'aide

+0

Quel client Kafka utilisez-vous, et quelle version de Kafka? Avez-vous essayé celui-ci? https://github.com/confluentinc/confluent-kafka-python –

Répondre

0

Il ressemble à la version de Kafka n'est pas compatible avec la version du client Python - s'il vous plaît les vérifier. Liste des codes pour les opérations pourrait être trouvé here.

+0

Cela pourrait être la raison, le Spark avec Kafka sur Python a toutes sortes de limitations de version. Actuellement, j'ai décidé de ne pas utiliser Kafka. Si je décide de l'entrer à nouveau dans mon architecture, je vais utiliser ces conseils et mettre à jour le problème. –