0

J'étudie Kafka Streams et j'ai un problème avec le premier exemple de WordCount en Java 8, tiré de la documentation. Utilisation des dernières versions disponibles des flux kafka, des exemples d'expressions lambda Kafka Connect et WordCount.Kafka streams - Premier exemple WordCount ne compte pas correctement le premier tour

Je suis les étapes suivantes: Je crée un sujet d'entrée dans Kafka, et un en sortie. Commencez la diffusion en continu de l'application, puis en insérant des mots à partir d'un fichier .txt

Sur le premier compte, dans le sujet de sortie, je vois les mots correctement regroupés, mais les chiffres sont incorrects. Si j'essaie de réinsérer les mêmes mots, les comptes successifs des comptes incorrects précédents sont tous corrects.

Si je regarde le vidage de rubrique d'entrée avec une console grand public, il est chargé correctement et il n'y a pas de données incorrectes.

Comment se fait-il que la première fois compte?

Exemple [FIRST DATA]: (entrée Topic à Kafka) salut salut micro micro test

(Streaming App est en cours d'exécution)

(sortie Topic) salut 12 micro 4 Test 3 (chiffres occasionnels)

[données successifs - affichage de la rubrique d'entrée les mêmes mots]

(sortie sujet) hi 14 micro 6 Test 4

[nouvelle tentative]

(sortie Topic) salut 16 micro 8 essai 5

et ainsi de suite ....

+0

Ça a l'air bizarre. Pouvez-vous reproduire le problème? Cela ne devrait pas arriver. –

Répondre

3

La démonstration de WordCount dans Apache Kafka a the following lines:

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data 
// Note: To re-run the demo, you need to use the offset reset tool: 
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

Cela signifie que, lorsque vous redémarrez l'application, il va lire son sujet d'entrée dès le début ("plus tôt") si il n'y a pas existi des compensations de consommateurs pour l'application WordCount stockée dans Kafka. Les décalages de consommation d'une application expirent dans Kafka après une certaine période d'inactivité, la valeur par défaut est de 24 heures (voir le offsets.retention.minutesbroker configuration).

je pourrais imaginer que ce qui suit est arrivé:

  • Vous expérimenté avec Kafka quelque temps auparavant et entré des données de test au sujet d'entrée.
  • Ensuite, vous avez pris une pause> 24 heures avant de reprendre vos expériences.
  • Maintenant, l'application, lorsqu'elle a redémarré, est revenue à la relecture du sujet d'entrée depuis le début, ramassant ainsi des données d'entrée de test plus anciennes et menant ainsi à des comptages "gonflés".

Si je recherche la décharge du sujet d'entrée avec une console de consommation, il est chargé correctement et il n'y a pas de données sales.

Vous pouvez vérifier mon hypothèse ci-dessus en regardant le sujet d'entrée à nouveau avec le consommateur de la console tout en ajoutant l'option CLI --from-beginning (voir https://kafka.apache.org/documentation/#quickstart_consume).

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning 

Cela vous montrera toutes les données disponibles dans la rubrique « yourInputTopic » - moins les données qui auraient pu être purgés des sujets Kafka dans l'intervalle (la configuration du courtier par défaut purger les données qui est plus ancienne que 7 jours, voir).

+0

Merci pour votre réponse. En fait, quand je testais après 24 heures (alors de nouveaux décalages), je supprimais un ancien sujet (j'ai activé l'annulation) et je les recréais de zéro pour une nouvelle exécution propre. Le problème a refait surface. Mais maintenant j'ai ajouté la ligne streamsConfiguration.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earlyliest") dans le code d'exemple (pas là avant) et semble bien fonctionner. Peut-être que je n'ai pas résolu exactement cela, mais ça fonctionne. –

+0

Super, heureux d'entendre ça fonctionne maintenant! –

+0

J'ai eu un problème similaire il y a quelques semaines, mais parfois les chiffres étaient négatifs. Cela pourrait-il être causé par quelque chose de similaire? – foxygen