2017-01-19 1 views
1

J'utilise Kafka v0.10.0.0 et créé le producteur & Code Java grand public. Mais le code est bloqué sur producer.send sans aucune exception dans les journaux.Kafka Producer Consumer API Numéro

Quelqu'un peut-il s'il vous plaît aider. Merci d'avance. J'utilise/modifie le "programme exemple mapr - kakfa". Vous pouvez regarder le code complet ici. ** Important: J'ai changé la version kafka-client en 0.10.0.0 dans les dépendances maven et en exécutant Kafka 0.10.0.0 dans mon local.

public class Producer { 
public static void main(String[] args) throws IOException { 
    // set up the producer 
    KafkaProducer<String, String> producer; 
    System.out.println("Starting Producers...."); 
    try (InputStream props = Resources.getResource("producer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     producer = new KafkaProducer<>(properties); 
     System.out.println("Property loaded successfully ...."); 
    } 

    try { 
     for (int i = 0; i < 20; i++) { 
      // send lots of messages 
      System.out.println("Sending record one by one...."); 
      producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message.")); 

      System.out.println(i+" message sent...."); 
      // every so often send to a different topic 
      if (i % 2 == 0) { 
       producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message.")); 
       producer.send(new ProducerRecord<String, String>("summary-markers","sending message - "+i+" to summary-markers.")); 
       producer.flush(); 
       System.out.println("Sent msg number " + i); 
      } 
     } 
    } catch (Throwable throwable) { 
     System.out.printf("%s", throwable.getStackTrace()); 
     throwable.printStackTrace(); 
    } finally { 
     producer.close(); 
    } 

    } 
} 

public class Consumer { 
public static void main(String[] args) throws IOException { 

    // and the consumer 
    KafkaConsumer<String, String> consumer; 
    try (InputStream props = Resources.getResource("consumer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     if (properties.getProperty("group.id") == null) { 
      properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); 
     } 
     consumer = new KafkaConsumer<>(properties); 
    } 
    consumer.subscribe(Arrays.asList("fast-messages", "summary-markers")); 
    int timeouts = 0; 
    //noinspection InfiniteLoopStatement 
    while (true) { 
     // read records with a short timeout. If we time out, we don't really care. 
     ConsumerRecords<String, String> records = consumer.poll(200); 
     if (records.count() == 0) { 
      timeouts++; 
     } else { 
      System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts); 
      timeouts = 0; 
     } 
     for (ConsumerRecord<String, String> record : records) { 
      switch (record.topic()) { 
       case "fast-messages": 
        System.out.println("Record value for fast-messages is :"+ record.value());    
         break; 
     case "summary-markers": 
      System.out.println("Record value for summary-markers is :"+ record.value()); 
         break; 
       default: 
        throw new IllegalStateException("Shouldn't be possible to get message on topic "); 
      } 
     } 
    } 
    } 
} 
+0

Il y a beaucoup de choses là-bas - configuration de chargement, une boucle qui envoie plusieurs messages sur des sujets multiples, un appel de chasse, etc., vous pouvez réduire à quelque chose de plus petit qui produit la bug et/ou donner plus de détails - où exactement "se coince-t-il"? Le premier envoi réussit-il? Le 2ème? Comment savez-vous qu'il est coincé? Avez-vous ajouté la journalisation pour voir qui envoie le travail et qui ne fonctionne pas. –

+0

Vous avez mentionné que le producteur s'est retrouvé coincé mais a collé le code pour le consommateur? – amethystic

+0

J'ai ajouté Producteur et Consommateur. – PanwarS87

Répondre

0

Le code que vous utilisez est pour une démo de mapR qui n'est pas Kafka. MapR revendique la compatibilité API avec Kafka 0.9, mais même alors mapR traite différemment les décalages de messages que Kafka (les offsets sont des décalages d'octets de messages plutôt que des décalages incrémentaux), etc. L'implémentation de mapR est également très, très différente. Cela signifie que si vous avez de la chance, une application 0.9 de Kafka pourrait bien fonctionner sur mapR et inversement. Il n'y a pas de garantie pour les autres versions.