2017-02-23 1 views
1

Je voudrais savoir comment fonctionne la validation lorsque le AckMode est réglé sur MANUAL au printemps kafka. Voici la propriété que j'ai définie dans KafkaConfig containerProperties.setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL);Comment savoir si mon enregistrement a été manuellement engagé en utilisant Spring Kafka

Le code listener

@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup") 
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) { 
     countDownLatch.countDown();  
     acknowledgment.acknowledge(); 
} 

Je suis en train de faire la acknowledgement selon la documentation du ressort de kafka, mais cela ne signifie pas que mon message est marqué comme envoyé mais pas consumé (Ceci est ma compréhension) .

  1. Dans ce cas, dois-je appeler la méthode commitSync(). Si oui, d'où puis-je invoquer cela, car j'ai besoin d'obtenir une référence à KafkaConsumer. Si non, comment cela fonctionne-t-il en interne? Puis-je le suivre?

  2. Y at-il un commitId ou une valeur qui est retournée? Mon idée est de savoir si un enregistrement consommateur particulier est consommé ou non. Je souhaite stocker la valeur à des fins de suivi interne.

  3. -t-kafka maintenir en interne tout Etat sur le dossier des consommateurs comme ( Prend acte, ENGAGE, pas commis) qui peut être utile de classer.

Cela m'aiderait vraiment à distinguer combien d'enregistrements sont consommés et combien sont en attente et leurs états.

Répondre

2

Je peux répondre à la première question. Tout reste ressemble à une histoire pour Apache Kafka directement.

Puisque nous ne pouvons pas effectuer commit d'où nous voudrions, mais seulement à partir du même fil qui effectue consumer.poll(), nous stockons toutes COMMIT requêtes dans la file d'attente KafkaMessageListenerContainer interne et jeter un oeil dans ce dans la boucle du principal consommateur avant effectuer this.consumer.poll().

Même si vous utilisez MANUAL_IMMEDIATE, le consumer.commitSync() réel est effectué sur un thread différent de votre acknowledgment.acknowledge().

OTOH main l'API là dans le Consumer ressemble:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets); 

Donc, il n'y a pas tout commitId crochets pour attaquer.

Je pense qu'il n'y a pas une telle notion dans Apache Kafka comme Not Committed ou toute autre chose. Les données sont toujours présentes dans le journal des rubriques et elles ne sont pas supprimées avant l'opération d'administration ou la configuration de compactage.

Je pense que la fonction commit offset est entièrement liée à l'objet consumer group et selon JavaDocs nous avons:

* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every 
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API 
* should not be used. The committed offset should be the next message your application will consume, 
* i.e. lastProcessedMessageOffset + 1. 

Ainsi, lorsque votre client est mort, il redémarre à partir du dernier offset pour son commises groupe. Un groupe différent peut lire les mêmes données mais à partir d'un autre décalage. Je pense que c'est absolument pourquoi leur API ne fournit aucun crochet à l'état actuel. Il n'y en a tout simplement pas!

+0

Merci Artem pour la réponse détaillée. Donc, ce que je comprends de la réponse ci-dessus est que le 'consumer.commitSync()' se passe sur un thread différent lorsque nous invoquons 'acknowledgment.acknowledge()'. Par conséquent, cela garantit que la validation est en cours. Est-ce le même comportement pour 'commitAsync() 'aussi? J'espère que c'est la même chose pour 'AckMode.MANUAL', car je n'utilise pas' MANUAL_IMMEDIATE' – user1564626

+0

Oui, c'est vrai. Toutes les opérations 'Consumer' se produisent sur le même thread. 'MANUAL_IMMEDIATE' ne diffère de' MANUAL' que par l'intermédiaire immédiat de 'consumer.wakeup()' qui rompt le sondage en cours. –