1

Compte tenu: J'ai deux sujets dans Kafka disons le sujet A et le sujet B. Le flux Kafka lit un enregistrement de la rubrique A, le traite et produit plusieurs enregistrements (disons enregistrementA et enregistrementB) correspondant à l'enregistrement consommé. Maintenant, la question est comment puis-je y parvenir en utilisant Kafka Streams.Kafka Streams: un enregistrement à plusieurs enregistrements

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() { 
     @Override 
     public List<Message> apply(final Message message) { 
      return consumerRecordHandler.process(message); 
     } 
    }).*someFunction*() 

Ici, l'enregistrement lu est Message; Après le traitement, il renvoie une liste de messages. Comment puis-je diviser cette liste en deux groupes de producteurs? Toute aide serait appréciée.

Répondre

5

Je ne suis pas Bien sûr, si je comprends bien la question, et je ne comprends pas la réponse de @Abhishek :(

Si vous avez un flux d'entrée, et que vous voulez obtenir zéro, un ou plusieurs enregistrements de sortie par enregistrements d'entrée, vous devez appliquer un flatMap() ou flatMapValues() (en fonction de si vous souhaitez modifier la clé ou non)

Vous vous demandez également "Comment puis-je diviser cette liste en deux flux de production?" Si vous voulez diviser un flux en plusieurs, vous pouvez utiliser branch().

Pour plus de détails, je me réfère à la documentation: http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations

+0

@ user2538255 N'hésitez pas à faire un suivi si ma réponse n'est pas claire. –

+0

C'est exactement ce que je fais. Après quelques recherches sur la réponse d'Abhishek, j'ai atterri sur cet exemple https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/ confluent/examples/streams/WordCountLambdaIntegrationTest.java – user2538255

+0

Avoir accepté la bonne réponse :) Merci :) – user2538255

2

Quelle est votre clé (type)? Je devine que ce n'est pas String. Après avoir exécuté le mapValues, vous l'aurez - KStream<K,List<Message>>. Si K n'est pas String alors someFunction() peut être un map qui convertira K en String (si son est, vous avez déjà le résultat) et laisser la List<Message> (la valeur) intacte puisque c'est votre résultat final destiné

+0

Ouais ce works..thanks une tonne! – user2538255