2017-07-18 1 views
0

J'utilise Kafka producteur 10.2.1 pour créer un sujet et d'écrire au sujet, quand je crée le sujet je reçois ce qui suit erreur, mais le sujet est créé:Obtient une erreur d'erreur dans Kafka producteur lors de la création de sujet mais le sujet est créé sur le serveur Kafka

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:774) 
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:494) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360) 
    at kafka.AvroProducer.produce(AvroProducer.java:47) 
    at samples.TestMqttSource.messageReceived(TestMqttSource.java:89) 
    at mqtt.JsonConsumer.messageArrived(JsonConsumer.java:132) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:477) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:380) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:184) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
msg org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
loc org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
cause org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
excep java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 

Toutes les suggestions sont très appréciés.

+0

Un producteur ne peut pas créer de sujet. L'API Admin Client peut le faire. Le sujet est créé car il existe la rubrique Création automatique (propriété auto.create.topics.enable) activée sur le courtier (par défaut). Pouvez-vous montrer le code? – ppatierno

+0

merci pour votre commentaire. J'ai trouvé une solution, et j'ai essayé d'expliquer le "problème" dans un commentaire à la réponse ci-dessous. – Margit

Répondre

0

Vous ne pouvez pas utiliser KafkaProducer pour créer un sujet (donc je ne suis pas tout à fait sûr de savoir comment vous avez réussi à créer le sujet, à moins que vous l'avez fait précédemment par une autre méthode, comme les scripts shell d'administration de kafka). Au lieu de cela, vous utilisez les AdminUtils fournis par la bibliothèque Kafka.

J'ai récemment atteint les deux exigences que vous recherchez, et vous seriez surpris de voir à quel point il est facile à atteindre. Voici un exemple de code simple qui vous montre comment créer un sujet via AdminUtils, et comment y écrire.

class Foo { 

    private String TOPIC = "testingTopic"; 
    private int NUM_OF_PARTITIONS = 10; 
    private int REPLICATION_FACTOR = 1; 

    public Foo() { 


     ZkClient zkClient = new ZkClient("localhost:2181", 15000, 10000, ZKStringSerializer$.MODULE$); 
     ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection("localhost:2181"), false); 

     if (!AdminUtils.topicExists(zkUtils, TOPIC)) { 
      try { 
       AdminUtils.createTopic(zkUtils, TOPIC, NUM_OF_PARTITIONS, REPLICATION_FACTOR, new Properties(), Enforced$.MODULE$); 

       Properties producerConfig = new Properties(); 

       producerConfig.put(ProducerConfig.BOOTSTRAP_SERVER_CONFIG, "localhost:9092"); 
       producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
       producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 

       KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); 

       // This is just to show you how to write but you could be more elaborate 
       int i = 0; 

       while (i < 11) { 
        ProducerRecord<String, String> rec = new ProducerRecord<>(TOPIC, ("This is line number " + i)); 
        producer.send(rec); 
        i++; 
       } 

       producer.closer(); 
      } catch (AdminOperationException aoe) { 
       aoe.printStackTrace(); 
      } 
     } 

    } 

} 

Rappelez-vous que si vous souhaitez supprimer des sujets, cette option est désactivée par défaut dans les paramètres. Le fichier de configuration que vous utilisez au démarrage de Kafka (par défaut c'est $ {kafka_home} /config/server.properties), ajoutez la ligne suivante si elle n'existe pas et est définie sur false ou commentée:

delete.topic.enabled=true 

Vous devrez ensuite redémarrer le serveur et supprimer les sujets via Java ou les outils de ligne de commande fournis.

NB

Il est toujours une bonne idée de fermer les producteurs/consommateurs lorsque vous avez terminé avec eux, comme le montre l'exemple de code.

+0

Merci pour la réponse et les commentaires. Notre Kafka est configuré pour générer automatiquement des sujets, à partir des messages. J'ai découvert quel était le problème, je cours le client sur mon PC Windows et j'utilisais l'adresse IP du serveur Kafka lors de la connexion au Kafka, mais je pense que d'une façon ou d'une autre dans les méta-données de la création du sujet, le Le nom kafkaserver est renvoyé, j'ai donc ajouté l'adresse IP et le nom d'hôte du serveur Kafka à mon fichier hôte, et cela fonctionne. – Margit

+0

Ahh d'accord, c'est un peu inhabituel que ça soit arrivé! Mais content que tout cela fonctionne –