0

J'ai réécrit un noeud de producteur Kafka dans node-red en utilisant nodejs. J'ai remarqué que Kafka10 ajoutait une fonctionnalité permettant de joindre un horodatage aux messages envoyés aux producteurs, mais je n'ai trouvé que des exemples java. Comment puis-je ajouter un horodatage aux messages à l'aide de nodejs?Comment ajouter un horodatage à kafka 10 messages avec nodejs

Dans ce qui suit, je rapport comment le producteur est mis en œuvre nodejs:

function kafkaAdvancedNode(config) { 
    RED.nodes.createNode(this,config); 
    var topic = config.topic; 
    var partition = config.partition; 
    var clusterZookeeper = config.zkquorum; 
    var debug = (config.debug == "debug"); 
    var node = this; 
    var kafka = require('kafka-node'); 
    var HighLevelProducer = kafka.HighLevelProducer; 
    var Client = kafka.Client; 
    var topics = config.topics; 
    var client = new Client(clusterZookeeper); 

    try { 
     this.on("input", function(msg) { 
      var payloads = []; 

      // check if multiple topics 
      if (topics.indexOf(",") > -1){ 
       var topicArry = topics.split(','); 

       for (i = 0; i < topicArry.length; i++) { 
        payloads.push({topic: topicArry[i], messages: msg.payload}); 
       } 
      } 
      else { 
       if(partition == "" || !partition) 
        payloads = [{topic: topics, messages: msg.payload}]; 
       else 
        payloads = [{topic: topics, messages: msg.payload, partition: partition}]; 
      } 

      producer.send(payloads, function(err, data){ 
       if (err){ 
        node.error(err); 
       } 
       node.log("Message Sent: " + data); 
      }); 
     }); 
    } 
    catch(e) { 
     node.error(e); 
    } 
    var producer = new HighLevelProducer(client); 
    this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper}); 
} 

sur Google, je trouve ceci: link

mais en fait je ne comprends pas comment intégrer dans mon code (ce qui est Avro?)

Répondre

0

Je ne pense pas que kafka-node supporte le protocole Kafka 0.10 donc il est peu probable qu'il supporte l'horodatage des messages. Pour cette raison (et d'autres), j'ai écrit quelque chose de similaire en utilisant node-rdkafka et je l'ai publié sous le nom node-red-contrib-rdkafka. MISE À JOUR: Je viens de publier une nouvelle version (0.2.0) de node-red-contrib-rdkafka qui prend en charge soit 0.10 "time event" ou "processing time" timestamps ainsi que des sujets dynamiques, des clés, et partitions.

+0

Malheureusement, sur Raspberry pi, je reçois une erreur en essayant de l'installer. lien -> http://imgur.com/a/idV4U –

+0

Je vois, vous n'avez pas mentionné cela fonctionne sur un pi de framboise. Cette erreur est probablement due à la bibliothèque librdkafka intégrée écrite en C. J'ai également écrit un module JavaScript pur nœud-red-contrib-confluent pour une utilisation sur des dispositifs embarqués comme le rPI. Il utilise le proxy REST Confluent pour pub/sub à Kafka. Cela définirait automatiquement l'horodatage en tant que temps d'ingestion. Il fonctionne également mieux à travers les pare-feu et sur Internet car il utilise le protocole HTTP jusqu'au proxy et limite le protocole Kafka à l'intérieur du centre de données/cloud. –

+0

Pas bon pour moi, mon horodatage est encapsulé dans les données du capteur qui sont JSON-LD –