2016-12-23 1 views
0

J'utilise le plugin d'entrée Logstash Kafka pour lire les messages d'un sujet. J'étais auparavant capable de démarrer de nouveaux consommateurs appartenant à de nouveaux groupes de consommateurs et en définissant auto_offset_reset = le plus tôt était capable de consommer des messages depuis le début du sujet.Logstash kafka plugin d'entrée incapable de lire les messages avec un nouveau consommateur et en réglant au plus tôt auto_offset_reset

Configuration du plugin:

input {  
    kafka {   
    bootstrap_servers => "localhost:9092" 
     topics => ["test_topic"] 
     group_id => "new_consumer" 
     client_id => "new_consumer" 
     consumer_threads => 1 
     auto_offset_reset => "earliest" 
    } 
} 

Mais maintenant, je remarque un comportement étrange. Même s'il s'agit d'un nouveau consommateur appartenant à un nouveau groupe de consommateurs et que auto_offset_reset est défini sur "plus tôt", je ne peux pas utiliser de messages.

journaux de débogage Activés suivent est le comportement: Il montre clairement que le consommateur n'a pas précédent décalage et tout à coup la partition offset est récupérée et le consommateur utilise cela et fixe ses nouveaux offset (S'il vous plaît noter: 36387 messages ont été lu plus tôt du sujet et par conséquent le nombre dans les journaux ci-dessous)

[2016-12-22T16: 45: 13454] [INFO ] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Groupe nouveau_consommateur ayant rejoint avec succès la génération 1

[2016-12-22T16: 45: 13455] [INFO ] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] partitions de réglage nouvellement affectés [test_topic-0] pour le groupe new_consumer

[2016-12-22T16: 45: 13456] [DEBUG] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Groupe new_consumer aller chercher des compensations pour les partitions commises: [test_topic-0]

[2016 -12-22T16: 45: 13,544] [DEBUG] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Le groupe new_consumer n'a pas de décalage validé pour partition test_topic-0

[2016-12-22T16: 45: 13544] [Debug] [org.apache.kafka.clients.consumer.internals.Fetcher] Réinitialisation de décalage pour la partition test_topic-0 au plus ancien décalage.

[2016-12-22T16: 45: 13,546] [DEBUG] [org.apache.kafka.clients.NetworkClient] Initialisation de la connexion au noeud 0 à l'hôte local: 9092.

[2016-12-22T16: 45: 13657] [DEBUG] [logstash.instrument.collector] Collector: envoi instantané aux observateurs {: created_at => 22/12/2016 16:45:13 -0800 }

[2016-12-22T16: 45: 13741] [Debug] [org.apache.kafka.common.metrics.Metrics] capteur Ajouté avec le nom du nœud-0.bytes-envoyé

[2016 -12-22T16: 45: 13,741] [DEBUG] [org.apache.kafka.common.metrics.Metrics] Capteur ajouté avec le nom node-0.bytes-received

[201 6-12-22T16: 45: 13741] [Debug] [org.apache.kafka.common.metrics.Metrics] capteur Ajouté avec un nom de noeud 0.latency

[2016-12-22T16: 45: 13742 ] [DEBUG] [org.apache.kafka.clients.NetworkClient] connexion terminées au noeud 0

[2016-12-22T16: 45: 13901] [Debug] [org.apache.kafka.clients.consumer.internals.Fetcher] extraits se compenser 36387 pour la partition test_topic-0

[2016-12-22T16: 45: 18050] [Debug] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] groupe newconsumer engagé décalage 36387 pour la partition test_topic- 0

[2016-12-22T16: 45: 18,563] [DEBUG] [org.apache.kafka.clients. consumer.internals.ConsumerCoordinator] Groupe newconsumer engagé pour compenser 36387 partition test_topic-0

Quelqu'un peut-il me dire pourquoi nous voyons ce comportement?

+0

Quelles versions de Logstash et Kafka utilisez-vous? – Val

+0

Désolé de ne pas l'avoir ajouté plus tôt. Logstash 5.0.2 et Kafka 2.0.10 – minion

+1

2.10 est la version Scala. Actuellement, Kafka est à la version 0.10.1.0, est-ce celle que vous utilisez? – Val

Répondre

1

Les anciens messages ont-ils pu être supprimés en fonction de la période de rétention configurée? Il se peut que le décalage 36387 soit le décalage le plus précoce et que tous les messages antérieurs aient expiré. La période de rétention par défaut est de 7 jours.

+0

Merci pour la réponse. Laissez-moi vérifier la période de rétention des messages. – minion

+0

Merci @hans. C'était en effet le problème – minion