2017-07-14 5 views
0

J'utilise Kafka 2.12 et le module kafka-python en tant que client Kafka. Je suis en train de tester d'un simple producteur:Kafka produce.send n'envoie jamais le message

class Producer(Process): 
daemon = True 
def run(self): 
    producer = KafkaProducer(bootstrap_servers='kafka:9092') 
    print("Sending messages...") 
    producer.send('topic', json.dumps(message).encode('utf-8')) 

Lorsque ce processus est instancié, le message est jamais reçu par le consommateur

Si je tire la chasse producteur et changer linger_ms param (ce qui en fait la synchronisation), le message est envoyé et lu par le consommateur:

class Producer(Process): 
daemon = True 
def run(self): 
    producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10) 
    print("Sending messages...") 
    producer.send('topic', json.dumps(message).encode('utf-8')) 
    producer.flush() 

dans les versions précédentes de Kafka, il y avait les queue.buffering.max.ms param pour indiquer combien de temps le producteur attendra envoyer les messages dans la file d'attente, mais n'est pas présent dans la dernière version (kafka-python 1 .3.3). Comment pourrais-je le spécifier dans les versions plus récentes de Kafka pour garder ma communication asynchrone?

Merci!

Répondre

0

Comme vous l'avez observé, les messages sont mis en file d'attente pour l'envoi asynchrone, et il n'y a aucune garantie qu'il sera envoyé immédiatement. Donc, si vous voulez forcer le message à être envoyé au courtier, vous devez appeler explicitement producer.flush() qui bloquera jusqu'à ce que le message soit envoyé (bien que flush() ne garantit pas les acks).

Je suis assez sûr que la queue.buffering.max.ms a été remplacé param par linger_ms: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer

Vous utilisez déjà que dans votre exemple param de travail.