3

Je construis un client Apache Kafka pour m'abonner à un autre Kafka déjà en cours d'exécution. Maintenant, mon problème est que lorsque mon producteur envoie un message à un serveur ... mon client ne les reçoit pas. Ici, je donne le code producteur,Le consommateur ne reçoit pas de message dans Apache Kafka

 Properties properties = new Properties(); 
     properties.put("metadata.broker.list","Running kafka ip addr:9092"); 
     properties.put("serializer.class","kafka.serializer.StringEncoder"); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
     String filePath="filepath"; 
     File rootFile= new File(filePath); 
     Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE); 
     for(File file : allFiles) { 
      StringBuilder sb = new StringBuilder(); 
      sb.append(file); 
      KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString()); 
      System.out.println("sending msg from producer.."+sb.toString()); 
      producer.send(message); 
     } 
      producer.close(); 

Ici Code de la consommation,

  properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 
     properties.put("group.id","test-group"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("enable.auto.commit", "false"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 
     consumer.subscribe(Collections.singletonList(topicName)); 
     while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) 
       { 
        System.out.println("topic = "+record.topic()); 
        System.out.println("topic = "+record.partition()); 
        System.out.println("topic = "+record.offset()); 
       } 
       try { 
        consumer.commitSync(); 
       } catch (CommitFailedException e) { 
        System.out.printf("commit failed", e) ; 
       } 
      } 

J'utilise cette dépendance:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

Je reçois toutes les informations de ce lien:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Lorsque nous avons lancé Consumer, nous n'avons reçu aucune notification du côté consommateur. S'il vous plaît donnez-moi une idée.

+0

essayez de comprendre où est le problème: sur la taille du consommateur ou du producteur. Pour cela: vérifiez les décalages dans le sujet. Cela peut être fait à partir de la ligne de commande – Natalia

+0

Vous l'exécutez en tant que fichier jar sur un cluster? .. vérifiez votre port zookeeper. –

+0

@Natalia: Je suis capable de poster les messages via le producteur. Je peux voir le nombre de message augmentant avec la taille de notation ... mais le décalage n'augmente pas ... –

Répondre

0

Pour producteur:

properties.put("metadata.broker.list","Running kafka ip addr:9092"); 

Je suppose, cela devrait être "bootstrap.servers".

Pour le consommateur:

properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 

bootstrap.servers doit pointer vers un courtier, pas ZK. Le "problème" est que le consommateur attendra simplement un courtier mais ne manquera pas s'il n'y a pas de courtier à l'hôte/au port spécifié.

+0

Nous courons à la fois courtier et zookeeper dans la même adresse IP. C'est une installation à un seul noeud. D'où le même ip.Do je dois exécuter zookeeper et courtier dans différents vms ?? –

+0

Vous n'avez pas besoin de fonctionner sur des serveurs différents - mais c'est recommandé. Quoi qu'il en soit, ZK et courtier utilisent des ports différents, et '2181' est le port par défaut de ZK - donc je suppose que vous avez besoin d'un point de courtier (par défaut:' 9092') –

+0

@ Matthias J. Sax - fait pareil mais toujours pas n'importe quel message du côté du consommateur –

0

Je suis un newb à Kafka et Java, mais je vais vous suggérer l'approche suivante

  • Vérifiez que le producteur est en train d'écrire sur le sujet en utilisant la commande suivante /usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginning.
  • Si c'est le cas, vous devrez probablement vous concentrer sur votre code de consommateur. Les guides de Confluent sont très utiles.
+0

@ Zigmaphi-Thnaks pour le commentaire, j'ai déjà vérifié. Le producteur écrit parfaitement dans le sujet et le consommateur court aussi mais ne reçoit toujours aucun message –