Cette question est assez vieux, donc je suppose que vous avez trouvé une solution dans l'intervalle. Cependant, juste au cas où cela aiderait quelqu'un d'autre, j'ai trouvé que ma classe ProducerInterceptor
, qui distribue des messages à différents sujets basés sur le contenu du message, n'a pas été invoquée à moins que mon flux ait déjà une sortie spécifiée.
Ma première tentative ressemblait à ceci parce que je pensais que je n'avais pas besoin de spécifier un sujet de sortie. Cela ne fonctionne pas:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
Mais cela ne:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
Il convient de noter que rien ne soit publié que dummy-output-topic
dans le second exemple, et que l'utilisation to
au lieu de through
semble également travailler la de la même façon.
Dans mon cas, j'invoquait map
de modifier les enregistrements avant d'utiliser l'intercepteur pour les envoyer à différents sujets, donc mon code ressemble en fait plus comme ceci:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
.map(new CustomKeyValueMapper)
.through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
J'espère que ces exemples aident tous ceux qui travaillent avec ProducerInterceptor
s qui ont fait la même erreur que moi.
merci pour la réponse Chris. Oui, il est correctement mentionné en tant que interceptor.classes dans producer.properties. Désolé pour la faute de frappe. –