2017-03-29 2 views
1

J'ai essayé de faire un peu de travail POC pour Spring Kafka. Plus précisément, je voulais expérimenter quelles sont les meilleures pratiques en termes de gestion des erreurs tout en consommant des messages au sein de Kafka.Kafka l'exception de la consommation et de compensation committe

Je me demande si quelqu'un est en mesure d'aider:

  1. Partage des meilleures pratiques entourant ce que les consommateurs Kafka devraient faire quand il y a un échec
  2. Aidez-moi à comprendre comment fonctionne Enregistrez AckMode, et comment empêche la validation de la file d'attente de décalage Kafka lorsqu'une exception est levée dans la méthode de l'écouteur.

L'exemple de code pour deux est donnée ci-dessous:

Étant donné que AckMode est réglé pour enregistrer, qui, selon la documentation:

engager le décalage lorsque l'auditeur revient après le traitement de la record.

J'aurais pensé que le décalage ne serait pas incrémenté si la méthode d'écoute émettait une exception. Cependant, ce n'était pas le cas lorsque je l'ai testé en utilisant la combinaison code/config/commande ci-dessous. Le décalage est toujours mis à jour et le message suivant continue d'être traité.

Ma config:

private Map<String, Object> producerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092"); 
    props.put(ProducerConfig.RETRIES_CONFIG, 0); 
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    return props; 
} 

    @Bean 
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD); 
    return factory; 
} 

Mon code:

@Component 
public class KafkaMessageListener{ 
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) 
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException { 
      throw new RuntimeException("Oops!"); 
    } 

commande pour vérifier offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group 

J'utilise kafka_2.12-0.10.2.0 et org. springframework.kafka: spring-kafka: 1.1.3.RELEASE

Répondre

2

Le conteneur (via ContainerProperties) possède une propriété, ackOnError qui est vrai par défaut ...

/** 
* Set whether or not the container should commit offsets (ack messages) where the 
* listener throws exceptions. This works in conjunction with {@link #ackMode} and is 
* effective only when the kafka property {@code enable.auto.commit} is {@code false}; 
* it is not applicable to manual ack modes. When this property is set to {@code true} 
* (the default), all messages handled will have their offset committed. When set to 
* {@code false}, offsets will be committed only for successfully handled messages. 
* Manual acks will be always be applied. Bear in mind that, if the next message is 
* successfully handled, its offset will be committed, effectively committing the 
* offset of the failed message anyway, so this option has limited applicability. 
* Perhaps useful for a component that starts throwing exceptions consistently; 
* allowing it to resume when restarted from the last successfully processed message. 
* @param ackOnError whether the container should acknowledge messages that throw 
* exceptions. 
*/ 
public void setAckOnError(boolean ackOnError) { 
    this.ackOnError = ackOnError; 
} 

Gardez à l'esprit, cependant, que si le message suivant est un succès, son décalage sera engagée de toute façon, qui a effectivement valide le décalage échoué aussi.

+0

Merci pour le conseil @Gary. Savez-vous s'il existe des bonnes pratiques concernant la gestion des erreurs chez les consommateurs de Kafka? Il semble que, dès la sortie de la boîte, une erreur de lecture d'un message sera simplement enregistrée puis avalée. – yfl

+0

De plus, j'ai remarqué que les commentaires pour le code semblent être contradictoires: "n'est efficace que lorsque l'accusé de réception automatique est faux, il ne s'applique pas aux accusés de réception manuels.". Je devine que la première partie devrait être vraie, plutôt que fausse? – yfl

+0

Nous devrions régler cela; ce sont des choses différentes; Dans ce contexte, auto ack fait référence à une propriété kafka 'enable.auto.commit' (où le client consommateur de la librairie kafka fait ses propres accusations en interne). 'AckMode' est utilisé pour dire au conteneur quand/si faire commet quand' enable.auto.commit' est 'false'. Si ce paramètre est défini pour les acks manuels, le conteneur ne reçoit jamais d'acks. –