2

Vous avez une erreur en utilisant le composant kafka pour Apache Camel (version 2.19.1), j'essaie juste d'imprimer les messages entrants dans le sujet, mon pipeline est ainsi composé:Composant Camel-Kafka ne fonctionne pas pour l'erreur: "en raison de courtiers doivent être configurés"

... 
context.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("kafka://localhost:9092?topic=test&groupId=testing") 
       .to("stream:out"); 
    context.start(); 
    } 
} 

essayé avec et sans "//" dans l'extrémité.

Ce que je suis arrivé est:

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka://localhost:9092?topic=test&groupI... because of Brokers must be configured 
at org.apache.camel.impl.RouteService.warmUp(RouteService.java:147) 
at org.apache.camel.impl.DefaultCamelContext.doWarmUpRoutes(DefaultCamelContext.java:3762) 
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3669) 
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3455) 
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3309) 
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:202) 
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3093) 
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3089) 
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3112) 
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3089) 
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61) 
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3026) 
at org.apache.camel.MainApp.main(MainApp.java:60) 
Caused by: java.lang.IllegalArgumentException: Brokers must be configured 
at org.apache.camel.component.kafka.KafkaConsumer.<init>(KafkaConsumer.java:62) 
at org.apache.camel.component.kafka.KafkaEndpoint.createConsumer(KafkaEndpoint.java:76) 
at org.apache.camel.impl.EventDrivenConsumerRoute.addServices(EventDrivenConsumerRoute.java:69) 
at org.apache.camel.impl.DefaultRoute.onStartingServices(DefaultRoute.java:103) 
at org.apache.camel.impl.RouteService.doWarmUp(RouteService.java:172) 
at org.apache.camel.impl.RouteService.warmUp(RouteService.java:145) 
... 12 more 

Process finished with exit code 1 

Je suis en train de le comprendre, mais je ne comprends vraiment pas ce qui est le problème, mon groupe de kafka est un seul courtier et tout est en cours d'exécution (zookeeper et serveur), ty pour l'aide

Répondre

1

Ajouter brokers=localhost:9092 au consommateur uri.

+0

Maintenant, je ne reçois aucune erreur, mais le consommateur ne reste pas, il quitte avec le code 0, mais à la place, il devrait rester jusqu'à ce qu'un contexte contextuel.stop() en attente de messages entrants, pourquoi? – Giuseppe

+1

Non, par défaut (si vous avez une application Camel autonome), il ne devrait pas. Il existe des solutions pour garder le CamelContext en cours d'exécution, vérifiez ici: http://camel.apache.org/running-camel-standalone-and-have-it-keep-running.html – mgyongyosi

+0

Utilisez la classe Camel Main comme indiqué par @mgyongyosi – Gautam

0

En regardant this example la première partie de l'URL est le topic, puis en tant que paramètre, vous pouvez passer le brokers. Donc, la documentation officielle semble être un peu trompeuse pour moi.

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" 
        + "&maxPollRecords={{consumer.maxPollRecords}}" 
        + "&consumersCount={{consumer.consumersCount}}" 
        + "&seekTo={{consumer.seekTo}}" 
        + "&groupId={{consumer.group}}") 
        .routeId("FromKafka") 
       .log("${body}"); 

Mais comme des conseils généraux: Camel est open source, donc vous pouvez toujours jeter un oeil sur le code et les exemples là GitHub. Vous pouvez également trouver les lignes de la trace de la pile que vous avez posté là, puis traquer ce qui manque.

+0

avec cette structure je ne reçois aucune erreur, mais le consommateur ne reste pas en attente de messages, mais quitte immédiatement avec le code 0 – Giuseppe

+0

Vous n'avez pas besoin d'un consommateur d'interrogation? http://camel.apache.org/polling-consumer.html –

+0

J'ai utilisé le composant RabbitMQ et j'avais juste besoin d'informations sur le point de terminaison pour aller, rester debout et écouter le flux entrant, et d'autres composants que j'avais déjà utilisés dans le passé faisaient de même, il a un intervalle de temps d'interrogation par défaut dans le composant lui-même, donc je pense que ce n'est pas nécessaire. – Giuseppe