2015-10-12 2 views
2

J'essayais d'intégrer kafka-storm. Je viens de commencer avec quelques exemples.Ecrire un sujet de kafka quand le consommateur est en panne

J'ai été capable d'exécuter des exemples à partir de GitHub. Ensuite, j'essaie d'écrire une classe Producer dans eclipse pour publier des messages sur un sujet kafka en utilisant l'API KAFKA PRODUCER.

Scénario1:

Quand ma coquille du consommateur est en cours d'exécution à l'aide dire test sujet, et le test i exécuter ma classe de producteur. Je suis capable de voir mon shell consommateur avec tous les messages publiés.

Scénario 2

Je n'ai pas commencé ma coquille à la consommation (dit consommateur est en baisse). Et je cours ma classe de producteurs. Les messages sont publiés dans la kafka.

Maintenant, si les messages sont publiés, et maintenant après l'arrêt si je démarre mon shell du consommateur, Il ne lit pas les messages déjà publiés rubrique.

Pourquoi? Je suppose qu'il tient un journal pour la consommation du sujet. Ne devrait-il pas lire les messages?

Y at-il un paramètre de configuration dont j'ai besoin de mentionner?

Properties props = new Properties(); 
props.put("metadata.broker.list", "localhost:9092"); 
props.put("zk.connect", "localhost:2181"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 
props.put("request.required.acks", "1"); 

ProducerConfig config = new ProducerConfig(props); 
Producer<String, String> producer = new Producer<String, String>(config); 

     for (int nEvents=0; nEvents<events;nEvents++) 
     { 
      String ip="192.168.2."+rnd.nextInt(255); 
      String msg=getNextTradeData(); // Class to generate data 
      KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg); 
      Thread.sleep(100); 
      System.out.println(msg); 
      producer.send(data); 

     } 
producer.close(); 

} 

Ou y at-il quelque chose que je dois faire pour changer le consommateur. J'utilise le shell consommateur fourni dans le paquet, et à partir à l'aide

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic 

Répondre

1

Lorsque vous démarrez le kafka-console-consume il lit à partir du courant de décalage. C'est, le décalage NOW() et pas du passé.

Pour voir si les messages que vous avez publiés deux options:

  1. Utilisez l'option --from-beginning, lire depuis le début du sujet

    bin/kafka-console-consumer.sh - -zookeeper localhost: 2181 --topic premier sujet --from-commençant

  2. persist l'état de la console consommateur dans le Zookeeper/kafka en utilisant l'option --consumer.config

    bin/kafka-console-consumer.sh --zookeeper localhost: 2181 --topic premier sujet --consumer.config /home/sql-injection/consumer-config.txt

selon ce nice page les paramètres dont vous avez besoin sur la configuration du consommateur sont: consumer.id, client.id.

+0

Merci, ça marche pour moi dans ce scénario. Maintenant, dans ce cas, chaque fois qu'il commencera à partir du bignning du journal je suppose (bien que j'ai besoin de vérifier encore). Mais si je souhaite lire depuis où il est parti ou peut-être dire le (dernier décalage lire + 1). Quelle serait mon approche? – NishantM

+0

@NishantM utilise l'option de configuration du consommateur, qui devrait vous permettre de reprendre à partir du dernier décalage –