J'ai réécrit un noeud de producteur Kafka dans node-red en utilisant nodejs. J'ai remarqué que Kafka10 ajoutait une fonctionnalité permettant de joindre un horodatage aux messages envoyés aux producteurs, mais je n'ai trouvé que des exemples java. Comment puis-je ajouter un horodatage aux messages à l'aide de nodejs?Comment ajouter un horodatage à kafka 10 messages avec nodejs
Dans ce qui suit, je rapport comment le producteur est mis en œuvre nodejs:
function kafkaAdvancedNode(config) {
RED.nodes.createNode(this,config);
var topic = config.topic;
var partition = config.partition;
var clusterZookeeper = config.zkquorum;
var debug = (config.debug == "debug");
var node = this;
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var topics = config.topics;
var client = new Client(clusterZookeeper);
try {
this.on("input", function(msg) {
var payloads = [];
// check if multiple topics
if (topics.indexOf(",") > -1){
var topicArry = topics.split(',');
for (i = 0; i < topicArry.length; i++) {
payloads.push({topic: topicArry[i], messages: msg.payload});
}
}
else {
if(partition == "" || !partition)
payloads = [{topic: topics, messages: msg.payload}];
else
payloads = [{topic: topics, messages: msg.payload, partition: partition}];
}
producer.send(payloads, function(err, data){
if (err){
node.error(err);
}
node.log("Message Sent: " + data);
});
});
}
catch(e) {
node.error(e);
}
var producer = new HighLevelProducer(client);
this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper});
}
sur Google, je trouve ceci: link
mais en fait je ne comprends pas comment intégrer dans mon code (ce qui est Avro?)
Malheureusement, sur Raspberry pi, je reçois une erreur en essayant de l'installer. lien -> http://imgur.com/a/idV4U –
Je vois, vous n'avez pas mentionné cela fonctionne sur un pi de framboise. Cette erreur est probablement due à la bibliothèque librdkafka intégrée écrite en C. J'ai également écrit un module JavaScript pur nœud-red-contrib-confluent pour une utilisation sur des dispositifs embarqués comme le rPI. Il utilise le proxy REST Confluent pour pub/sub à Kafka. Cela définirait automatiquement l'horodatage en tant que temps d'ingestion. Il fonctionne également mieux à travers les pare-feu et sur Internet car il utilise le protocole HTTP jusqu'au proxy et limite le protocole Kafka à l'intérieur du centre de données/cloud. –
Pas bon pour moi, mon horodatage est encapsulé dans les données du capteur qui sont JSON-LD –