J'ai essayé de faire un peu de travail POC pour Spring Kafka. Plus précisément, je voulais expérimenter quelles sont les meilleures pratiques en termes de gestion des erreurs tout en consommant des messages au sein de Kafka.Kafka l'exception de la consommation et de compensation committe
Je me demande si quelqu'un est en mesure d'aider:
- Partage des meilleures pratiques entourant ce que les consommateurs Kafka devraient faire quand il y a un échec
- Aidez-moi à comprendre comment fonctionne Enregistrez AckMode, et comment empêche la validation de la file d'attente de décalage Kafka lorsqu'une exception est levée dans la méthode de l'écouteur.
L'exemple de code pour deux est donnée ci-dessous:
Étant donné que AckMode est réglé pour enregistrer, qui, selon la documentation:
engager le décalage lorsque l'auditeur revient après le traitement de la record.
J'aurais pensé que le décalage ne serait pas incrémenté si la méthode d'écoute émettait une exception. Cependant, ce n'était pas le cas lorsque je l'ai testé en utilisant la combinaison code/config/commande ci-dessous. Le décalage est toujours mis à jour et le message suivant continue d'être traité.
Ma config:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
Mon code:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition(topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
commande pour vérifier offset:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
J'utilise kafka_2.12-0.10.2.0 et org. springframework.kafka: spring-kafka: 1.1.3.RELEASE
Merci pour le conseil @Gary. Savez-vous s'il existe des bonnes pratiques concernant la gestion des erreurs chez les consommateurs de Kafka? Il semble que, dès la sortie de la boîte, une erreur de lecture d'un message sera simplement enregistrée puis avalée. – yfl
De plus, j'ai remarqué que les commentaires pour le code semblent être contradictoires: "n'est efficace que lorsque l'accusé de réception automatique est faux, il ne s'applique pas aux accusés de réception manuels.". Je devine que la première partie devrait être vraie, plutôt que fausse? – yfl
Nous devrions régler cela; ce sont des choses différentes; Dans ce contexte, auto ack fait référence à une propriété kafka 'enable.auto.commit' (où le client consommateur de la librairie kafka fait ses propres accusations en interne). 'AckMode' est utilisé pour dire au conteneur quand/si faire commet quand' enable.auto.commit' est 'false'. Si ce paramètre est défini pour les acks manuels, le conteneur ne reçoit jamais d'acks. –