0

J'essaye de réaliser une transaction dans un Kafka Processor pour m'assurer que je ne retraite pas deux fois le même message. Étant donné un message (A) j'ai besoin de créer une liste de messages qui seront produits sur un autre sujet dans une transaction et je veux commettre le message original (A) dans la même transaction. De la documentation j'ai trouvé la méthode ProducersendOffsetsToTransaction qui semble pouvoir commettre un décalage dans une transaction seulement si elle réussit. Voici le code dans la méthode process() de mon Processor:KafkaProducer sendOffsetsToTransaction besoin de décalage + 1 pour valider le décalage actuel

producer.beginTransaction() 
    val topicPartition = new TopicPartition(this.context().topic(), this.context().partition()) 
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset()) 
    val map    = Map(topicPartition -> offsetAndMetadata).asJava 
    producer.sendOffsetsToTransaction(map, "consumer-group-id") 
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value))) 
    producer.commitTransaction() 
    throw new RuntimeException("expected exception") 

Unfortunatly avec ce code (qui ne évidemment à chaque exécution) le message traité (A) est retraitée chaque fois que je re-démarrer l'application après l'exception .

je parviens à faire fonctionne en ajoutant un +1 au décalage retourné par this.context().offset() et la redéfinition du val offsetAndMetadata de cette façon:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1) 

Est-ce le comportement normal ou je fais quelque chose de mal?

Merci :)

Répondre

1

Votre code est correct. Les offsets que vous validez sont les décalages des messages que vous voulez lire ensuite (pas les décalages des messages que vous avez lus en dernier).

Comparez: https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346

+1

Merci Matthias :) il semble que vous avez raison! Cependant, si c'est le comportement prévu de 'sendOffsetsToTransaction', je pense que la documentation est un peu trompeuse. –

+0

J'ai juste vérifié le JavaDoc et aussi comparé avec les docs de 'KafkaConsumer # commit' - je suis d'accord et je vais augmenter un PR pour le corriger pour la prochaine version. THX! –

+0

@ MatthiasJ.Sax: Vouliez-vous dire "les messages que vous voulez * écrire * suivant"? –