2016-10-20 3 views
1

J'utilise HighlevelProducer et HighlevelConsumer pour envoyer et recevoir des messages. Le HighlevelConsumer est configuré avec autoCommit = false car je veux commettre des Messages seulement quand il a été produit avec succès. Le problème est, que le premier message n'est jamais vraiment engagé.Ordre de validation incorrect lors de l'utilisation de autoCommit = false dans HighlevelConsumer

Exemple:

  • Envoyer des messages 1-10.
  • Recevoir un message 1
  • Recevoir un message 2
  • Message 2 Engagez
  • ...
  • recevoir des messages 10
  • le message de validation 10
  • le message de validation 1

Si je redémarrer mon client, tous les messages de 1 à 10 sont traités à nouveau. Seulement si j'envoie de nouveaux messages au consommateur, les anciens messages sont commis. Cela arrive pour n'importe quel nombre de messages.

Mon code se lit comme suit:

var kafka = require('kafka-node'), 
    HighLevelConsumer = kafka.HighLevelConsumer, 
    client = new kafka.Client("localhost:2181/"); 
consumer = new HighLevelConsumer(
    client, 
    [ 
     { topic: 'mytopic' } 
    ], 
    { 
     groupId: 'my-group', 
     id: "my-consumer-1", 
     autoCommit: false 
    } 
); 

consumer.on('message', function (message) { 
    console.log("consume: " + message.offset); 
    consumer.commit(function (err, data) { 
     console.log("commited:" + message.offset); 
    }); 
    console.log("consumed:" + message.offset); 
}); 

process.on('SIGINT', function() { 
    consumer.close(true, function() { 
     process.exit(); 
    }); 
}); 

process.on('exit', function() { 
    consumer.close(true, function() { 
     process.exit(); 
    }); 
}); 
var messages = 10; 
var kafka = require('kafka-node'), 
    HighLevelProducer = kafka.HighLevelProducer, 
    client = new kafka.Client("localhost:2181/"); 
var producer = new HighLevelProducer(client, { partitionerType: 2, requireAcks: 1 }); 

producer.on('error', function (err) { console.log(err) }); 
producer.on('ready', function() { 
    for (i = 0; i < messages; i++) { 
     payloads = [{ topic: 'mytopic', messages: "" }]; 
     producer.send(payloads, function (err, data) { 
      err ? console.log(i + "err", err) : console.log(i + "data", data); 
     }); 
    } 
}); 

Est-ce que je fais quelque chose de mal ou est-ce un bogue dans kafka-nœud?

+0

Copie possible de [Pourquoi commitAsync ne parvient pas à commettre les 2 premiers décalages] (http://stackoverflow.com/questions/37794718/why-commitasync-fails-to-commit-the-first-2-offsets) –

Répondre

0

commit un message A 2 est une validation implicite du message 1.

Comme vous commits sont faites de manière asynchrone et validation de message 1 et 2 messages sont fait rapidement après l'autre (c.-à commettre 2 se produit avant la le consommateur a envoyé la validation de 1), le premier commit ne se fera pas explicitement et seulement une seule validation du message 2 sera envoyée.