2017-10-19 19 views
0

Le rappel Producer.send fournit un objet message. message.offset() renvoie souvent 0 dans ce qui semble être un bug.confluent-python kafka producteur envoyer callback message.offset() renvoie 0

Ceci utilise: bibliothèque confluente-kafka python version 0.11.0 librdkafka: stable 0.11.0 (en bouteille), HEAD. Installé via Mac OS Homebrew

Le programme de test simple:

import confluent_kafka 
import timeit 


def delivery_callback(error, message): 
    print("delivery_callback. error={}. message={}".format(error, message)) 
    print("message.topic={}".format(message.topic())) 
    print("message.timestamp={}".format(message.timestamp())) 
    print("message.key={}".format(message.key())) 
    print("message.value={}".format(message.value())) 
    print("message.partition={}".format(message.partition())) 
    print("message.offset={}".format(message.offset())) 


def produce_string_messages(kafka_producer, topic_name, num_messages): 
    start_time = timeit.default_timer() 

    for i in range(num_messages): 
     kafka_producer.produce(topic_name, value="cf-k test. v{}".format(i), on_delivery=delivery_callback) 

    elapsed = timeit.default_timer() - start_time 
    print("completed producing messages. They are queued for delivery. elapsed={}. elapsed/msg={}".format(elapsed, elapsed/num_messages)) 


if __name__ == "__main__": 
    print("starting") 

    conf = { 
     'bootstrap.servers': "kafka-broker-1:9092" 
    } 

    kafka_producer = confluent_kafka.Producer(conf) 

    print("opened KafkaProducer") 
    produce_string_messages(kafka_producer, "my-string-topic", 3) 

    print("flushing...") 
    kafka_producer.flush() 

    print("exiting") 

produit:

starting 
opened KafkaProducer 
completed producing messages. They are queued for delivery. elapsed=0.000994920730591. elapsed/msg=0.00033164024353 
flushing... 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v0 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v1 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v2 
message.partition=0 
message.offset=24 
exiting 

Notez que message.offset() est égal à zéro pour les deux premiers messages et pour le troisième non nul. Si je lance ce programme de test à nouveau qui envoie trois messages, le troisième message.offset incréments par 3. Cela ressemble juste un bug où message.offset() souvent incorrectement renvoie 0.

Répondre

0

Pour des performances [1] raisonne le rapport de livraison fournit seulement un décalage valide pour le dernier message d'un lot produit. Cela peut être modifié pour fournir des compensations adéquates pour tous les messages du lot en définissant la propriété de configuration au niveau du sujet produce.offset.report true, comme ceci:

p = confluent_kafka.Producer({'bootstrap.servers': ..., 
           'default.topic.config': { 'produce.offset.report': True } }) 

Nous allons changer la valeur par défaut pour être vrai dans une version future de le client Python.

[1]: Il évite un balayage linéaire des messages dans le lot, mais l'impact sur les performances est minuscule et non pertinent en Python, donc rien à craindre.

+0

Parfait. Merci! En tant qu'auteur principal, pouvez-vous répondre à celui-ci: https://stackoverflow.com/questions/44732214/apt-get-install-librdkafka1-fails-on-debian-9-x-due-to-libssl-dependency – clay