2017-06-16 3 views
0

Salut J'ai essayé d'apprendre KAFKA et d'avoir des problèmes avec mon sondeur/consommateur à distance.KAFKA REMOTE AWS consumer.poll

J'ai mis en place KAFKA dans l'instance AWS EC2 avec ip privée et publique. mon server.properties ressemble à ceci.

auditeurs = texte brut: //172.31.31.58: 9092 #AWS Private IP

advertised.listeners = texte brut: // 35 ?? ?? ??:... 9092 #AWS IP publique Masked

Mon groupe de sécurité AWS EC2 est configuré pour autoriser le trafic sur n'importe quelle adresse IP sur n'importe quel port à des fins de test.

Quand je produis/consommer localement les messages dans mon instance EC2 à l'aide des scripts suivants, il fonctionne parfaitement

bin/kafka-console-producer.sh --broker liste localhost: 9092 Test --topic

bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 --topic test --de-début

Mais quand j'essaie de me connecter à la même instance de kafka depuis mon ordinateur portable distant Code Eclipse exécutant mon Java API, mon code se bloque pour toujours dans consumer.poll (100). Est-ce que je fais quelque chose de mal ici?

 Properties props = new Properties(); 
    props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners 
    props.put("group.id", "test123"); 
    props.put("enable.auto.commit", "false"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("test")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 

    } } 
+0

Pouvez-vous poster le fichier journal pour votre consommation? Il serait utile de voir les messages de débogage. – PragmaticProgrammer

Répondre

1

Etes-vous sûr qu'il se bloque dans poll()? ou est poll() vient de retourner un vide ConsumerRecords et il boucle dans le while(true)?

Par défaut, si vous n'avez pas validé d'offsets pour le groupe, le consommateur commence à la fin du sujet, il ne recevra donc que les nouveaux messages. Dans ce cas, si vous voulez consommer des messages déjà dans le sujet, vous devez définir auto.offset.reset à earliest (comme vous l'avez fait dans la console consommateur --from-beginning)

Edit:

Si elle est effectivement coincé dans poll() , cela pourrait être un problème de connexion. Pour le savoir, le meilleur moyen est d'exécuter votre client avec la journalisation activée. Créez un fichier contenant:

log4j.rootLogger=DEBUG, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n 

et commencez votre client avec -Dlog4j.configuration=file:PATH_TO_FILE

+0

Oui Mickael, Il se bloque dans le sondage, fondamentalement, il ne passe pas à la ligne suivante, je l'ai vérifié. –

+0

Salut Mickael, ceci est définitivement un problème de réseau. J'ai essayé cela plus tôt dans mon réseau de bureau et cela n'a pas fonctionné et fonctionne parfaitement dans un réseau différent. En fait, j'ai exécuté mon programme avec le journal de débogage activé selon votre suggestion et rien n'a été enregistré qui suggérait un problème de réseau. Mais cela fonctionne maintenant dans un réseau différent, et votre suggestion sur le problème de réseau m'a conduit à penser à courir ceci à travers un réseau différent. Merci beaucoup. –