2017-06-10 1 views
1

J'essaye d'écrire un client en tant que producteur. J'ai suivi des exemples pour créer un nouveau projet de console Win32. J'ai trouvé l'API ne fonctionne pas pour moi à moins que j'ajoute la fonction getline() à la fin de mon programme.Comment utiliser l'API de production KAFKA dans librdkafka avec le code C++ sur windows

Si je supprime getline(), la méthode produit (..) renvoie toujours le résultat du succès. Cependant, je ne vois aucune réponse dans la fenêtre de commande de kafka-console-consumer

Je suis un peu confus. Est-ce correct? Comment envoyer un message sans utiliser getline()? Quelqu'un sait?

J'ai trouvé pourquoi cela ne fonctionne pas. Il semble trop rapide pour supprimer l'objet producteur provoquant le producteur ne peut pas envoyer de messages à courtier.

Lorsque j'ajoute sleep 1000 entre la méthode de production et la suppression d'un objet producteur, le producteur peut envoyer un message correctement. Donc, la question est Comment envoyer un message immédiatement. Comment puis-je m'assurer que ces messages ont bien été envoyés avant de détruire l'objet producteur?

Comment résoudre ce problème, en fait je n'aime pas ajouter sleep() dans mon code source.

win10 + vs2015 + kafka_2.10-0.9.0.1 + Zookeeper-3.4.6 + librdkafka S'il vous plaît consulter le code suivant

// kafka_test_win32_nomfc.cpp 
// 

#include "stdafx.h" 
#include <iostream> 
#include "librdkafka/rdkafkacpp.h" 


int static producer_1() 
{ 
    std::string brokers = "127.0.0.1"; 
    std::string errstr; 
    std::string topic_str = "linli"; 
    std::string mode; 
    std::string debug; 
    int32_t partition = RdKafka::Topic::PARTITION_UA; 
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; 
    bool do_conf_dump = false; 
    int opt; 
    // MyHashPartitionerCb hash_partitioner; 
    int use_ccb = 0; 

    /* 
    * Create configuration objects 
    */ 
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 

    conf->set("metadata.broker.list", brokers, errstr); 

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); 
    if (!producer) { 
     std::cerr << "Failed to create producer: " << errstr << std::endl; 
     exit(1); 
    } 

    std::cout << "% Created producer " << producer->name() << std::endl; 

    /* 
    * Create topic handle. 
    */ 
    RdKafka::Topic *topic = NULL; 
    if (!topic_str.empty()) { 
     topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); 
     if (!topic) { 
      std::cerr << "Failed to create topic: " << errstr << std::endl; 
      exit(1); 
     } 
    } 

    RdKafka::ErrorCode resp = producer->produce(topic, partition, 
     RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 
     const_cast<char *>("hello worlf"), 11, 
     NULL, NULL); 

    delete topic; 
    delete producer; 
    return 0; 
} 


int static producer_2() 
{ 
    std::string brokers = "127.0.0.1"; 
    std::string errstr; 
    std::string topic_str = "linli"; 
    std::string mode; 
    std::string debug; 
    int32_t partition = RdKafka::Topic::PARTITION_UA; 
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; 
    bool do_conf_dump = false; 
    int opt; 
    // MyHashPartitionerCb hash_partitioner; 
    int use_ccb = 0; 

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 

    conf->set("metadata.broker.list", brokers, errstr); 

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); 
    if (!producer) { 
     std::cerr << "Failed to create producer: " << errstr << std::endl; 
     exit(1); 
    } 

    std::cout << "% Created producer " << producer->name() << std::endl; 

    RdKafka::ErrorCode resp = producer->produce(topic_str, partition, 
     RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 
     (void *)"hi", 2, 
     NULL, 0, 0, NULL); 



    std::string errs(RdKafka::err2str(resp)); 
    std::cout << errs << std::endl; 
    //producer->poll(0); 


    delete producer; 

    return 0; 
} 


int main() 
{ 

    producer_2(); 

    return 0; 
} 

Répondre

1

L'API de produits librdkafka() (à la fois C et C++) est asynchrone, votre message sera initialement placé dans la file d'attente interne du producteur et seulement plus tard (voir la propriété de configuration queue.buffering.max.ms - par défaut 1 seconde) avec d'autres messages dans un lot de messages (MessageSet) et envoyé au courtier à partir d'un thread d'arrière-plan. Votre programme appelle produce() et se ferme rapidement, bien avant que le thread de producteur d'arrière-plan ait eu la chance d'envoyer le message au courtier, et encore moins de recevoir un accusé de réception du courtier.

Pour vous assurer que tous les messages en attente ont bien été envoyés, appelez le flush() avant de mettre fin à votre demande. Si votre application est de longue durée, vous devez appeler le poll() à intervalles réguliers pour signaler les rappels que vous avez enregistrés.

+0

Nous vous remercions de votre rediffusion. En fait, je veux intégrer le producteur dans une application de serveur socket multi-thread. Je suis prêt à mettre l'objet producteur dans le thread de travail socket, de sorte que le producteur ne travaille pas sur un thread de longue vie. D'une autre manière, j'ai essayé la méthode flush() que vous avez mentionnée, je l'ajoute entre la méthode produce et la méthode delete, mais le kafka-console-consumer ne peut toujours pas avoir d'accusé de réception. Cela prouve que flush() ne peut pas s'assurer que vos messages sont envoyés avant la fin de l'application. – CodeOverflow

+0

Si le producteur doit être dans un thread de longue vie, l'intégration deviendra très complexe car je dois écrire un nouveau pool de threads pour gérer les threads du producteur. Y a-t-il de meilleures solutions? – CodeOverflow

+0

librdkafka est thread-safe, vous pouvez (et devez) utiliser la même instance Producer à partir de plusieurs threads d'application. Ce que vous faites généralement, c'est laisser un thread d'application appeler produire(), et créer un thread d'application distinct (ou utiliser la boucle du thread principal) pour interroger() pour servir les rapports de livraison et autres rappels. – Edenhill