Je voudrais savoir comment fonctionne la validation lorsque le AckMode
est réglé sur MANUAL
au printemps kafka. Voici la propriété que j'ai définie dans KafkaConfig
containerProperties.setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL);Comment savoir si mon enregistrement a été manuellement engagé en utilisant Spring Kafka
Le code listener
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
acknowledgment.acknowledge();
}
Je suis en train de faire la acknowledgement
selon la documentation du ressort de kafka, mais cela ne signifie pas que mon message est marqué comme envoyé mais pas consumé (Ceci est ma compréhension) .
Dans ce cas, dois-je appeler la méthode
commitSync()
. Si oui, d'où puis-je invoquer cela, car j'ai besoin d'obtenir une référence àKafkaConsumer
. Si non, comment cela fonctionne-t-il en interne? Puis-je le suivre?Y at-il un
commitId
ou une valeur qui est retournée? Mon idée est de savoir si un enregistrement consommateur particulier est consommé ou non. Je souhaite stocker la valeur à des fins de suivi interne.- -t-kafka maintenir en interne tout Etat sur le dossier des consommateurs comme ( Prend acte, ENGAGE, pas commis) qui peut être utile de classer.
Cela m'aiderait vraiment à distinguer combien d'enregistrements sont consommés et combien sont en attente et leurs états.
Merci Artem pour la réponse détaillée. Donc, ce que je comprends de la réponse ci-dessus est que le 'consumer.commitSync()' se passe sur un thread différent lorsque nous invoquons 'acknowledgment.acknowledge()'. Par conséquent, cela garantit que la validation est en cours. Est-ce le même comportement pour 'commitAsync() 'aussi? J'espère que c'est la même chose pour 'AckMode.MANUAL', car je n'utilise pas' MANUAL_IMMEDIATE' – user1564626
Oui, c'est vrai. Toutes les opérations 'Consumer' se produisent sur le même thread. 'MANUAL_IMMEDIATE' ne diffère de' MANUAL' que par l'intermédiaire immédiat de 'consumer.wakeup()' qui rompt le sondage en cours. –