Dans mon application KafkaConsumer, je souhaite lire un lot de messages avec poll() et les traiter. Mais le traitement peut échouer. Dans ce cas, je veux réessayer jusqu'à ce que je réussisse mais seulement réessayer si le client possède toujours des partitions. Je ne veux pas appeler constamment poll() parce que je ne veux pas lire plus de données.Comment vérifier que KafkaConsumer a toujours attribué des partitions sans lire plus de données avec poll()
C'est un extrait de code:
consumer = new KafkaConsumer<>(consumerConfig);
try {
consumer.subscribe(config.topics() /** Callback does not work as I do not call poll in between */);
while (true) {
ConsumerRecords<byte[], Value> values = consumer.poll(10000);
while (/* I am still owner of partitions */) {
try {
process(values);
} catch (Exception e) {
log.error("I dont care, just retry while I own the partitions", e)
}
}
}
} catch (WakeupException e) {
// shutting down
} finally {
consumer.close();
}
Mais comment cette méthode de rappel va être appelé? De quel fil? –
On pense qu'il est appelé par le thread principal client Kafka Consumer. Essaye le. Il est recommandé d'implémenter ce rappel de toute façon afin que vous ne perdiez pas votre décalage et finissiez les messages de retraitement lors d'un rééquilibrage. –
Le thread principal du client KafkaConsumer essaie de traiter les ressources exécutant 'process (values);' dans une boucle, n'est-ce pas? –