2017-09-11 4 views
0

Je suis en train d'utiliser le partitionnement clé avec kafka Highlevelproducer, sur 4 partitions sujet Je le code ci-dessous:nodejs et kafka partitionneur clé

var kafka = require('kafka-node'), 
     HighLevelProducer = kafka.HighLevelProducer, 
     client = new kafka.Client(Host+":"+Port,client_id), 
     producer = new HighLevelProducer(client,{partitionerType: 3}); 
. 
. 
    theKey = theKey+1; 
    if (theKey > Nb_key) { 
     theKey = 0; 
    } 
    var payloads = [ 
     { topic: Topic, key: theKey, messages: JSON.stringify({"hello": "world", "Timestamp": +timestamp}) } 
     ]; 
     producer.send(payloads, function (err, data) { 
     }); 

J'ai vérifié que les messages ont la valeur de clé correcte, mais toujours , tous les messages sont envoyés à la partition 1. Voyez-vous quelque chose de mal avec ce que je fais?

Merci

Répondre

1

En regardant le code, je pense avoir compris le problème. baseProducer effectue un hachage sur la clé sous forme de chaîne et le résultat est toujours le même, donc toujours la même partition. J'ai écrit mon propre partitionneur personnalisé, en faisant un simple modulo sur la clé avec le nb de partitions, cela fonctionne parfaitement. Voir le code ci-dessous:

var MyPartitioner = function (partitions, key) { 
    key = key || '0'; 
    var index = parseInt(key) % partitions.length; 
    return partitions[index]; 
    }; 
    var kafka = require('kafka-node'), 
     HighLevelProducer = kafka.HighLevelProducer, 
     client = new kafka.Client(Host+":"+Port,client_id), 
     producer = new HighLevelProducer(client,{requireAcks: 0,partitionerType: 4},MyPartitioner); 
     producer.on('ready', function() { 
     timer.setInterval(send_data, '',SendIntervalInUsecs+'u'); 
    });