J'utilise Kafka 2.12 et le module kafka-python en tant que client Kafka. Je suis en train de tester d'un simple producteur:Kafka produce.send n'envoie jamais le message
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
Lorsque ce processus est instancié, le message est jamais reçu par le consommateur
Si je tire la chasse producteur et changer linger_ms param (ce qui en fait la synchronisation), le message est envoyé et lu par le consommateur:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
dans les versions précédentes de Kafka, il y avait les queue.buffering.max.ms param pour indiquer combien de temps le producteur attendra envoyer les messages dans la file d'attente, mais n'est pas présent dans la dernière version (kafka-python 1 .3.3). Comment pourrais-je le spécifier dans les versions plus récentes de Kafka pour garder ma communication asynchrone?
Merci!