2016-09-09 2 views
0

J'essaie d'ajouter un Interceptor pour faire la validation des messages publiés par le producteur sur le sujet Kafka. J'ai besoin de faire quelques validations en plus de la validation du schéma qui est effectuée par le sujet Kafka. Les étapes que j'ai suivies sont les suivantes. J'ai écrit une classe Java prolongeant l'interface ProducerInterceptor. Kafka producteur Interceptor

  • Compilé la classe et créé un fichier jar qui est placé dans un dossier inclus dans le chemin de classe.
  • Ajouté intercetors.classes = classname à producer.properties dans l'installation de Kafka. Mais lorsque je publie un message sur le sujet, la classe d'intercepteur personnalisée que j'ai écrite n'est pas invoquée. (Je ne reçois pas d'erreurs aussi, les messages sont parfaitement publiés).

    I Haver appelé https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

    conseils S'il vous plaît à ce sujet.

  • Répondre

    0

    Le nom de la propriété est interceptor.classes, pas intercetors.classes

    +0

    merci pour la réponse Chris. Oui, il est correctement mentionné en tant que interceptor.classes dans producer.properties. Désolé pour la faute de frappe. –

    0

    Cette question est assez vieux, donc je suppose que vous avez trouvé une solution dans l'intervalle. Cependant, juste au cas où cela aiderait quelqu'un d'autre, j'ai trouvé que ma classe ProducerInterceptor, qui distribue des messages à différents sujets basés sur le contenu du message, n'a pas été invoquée à moins que mon flux ait déjà une sortie spécifiée.

    Ma première tentative ressemblait à ceci parce que je pensais que je n'avais pas besoin de spécifier un sujet de sortie. Cela ne fonctionne pas:

    val builder: KStreamBuilder = new KStreamBuilder 
    val input = builder.stream("input-topic") 
    
    val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
    stream.start() 
    

    Mais cela ne:

    val builder: KStreamBuilder = new KStreamBuilder 
    val input = builder.stream("input-topic").through("dummy-output-topic") 
    
    val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
    stream.start() 
    

    Il convient de noter que rien ne soit publié que dummy-output-topic dans le second exemple, et que l'utilisation to au lieu de through semble également travailler la de la même façon.

    Dans mon cas, j'invoquait map de modifier les enregistrements avant d'utiliser l'intercepteur pour les envoyer à différents sujets, donc mon code ressemble en fait plus comme ceci:

    val builder: KStreamBuilder = new KStreamBuilder 
    val input = builder.stream("input-topic") 
        .map(new CustomKeyValueMapper) 
        .through("dummy-output-topic") 
    
    val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
    stream.start() 
    

    J'espère que ces exemples aident tous ceux qui travaillent avec ProducerInterceptor s qui ont fait la même erreur que moi.