2016-05-20 1 views
1

J'essaie d'envoyer des données à kafka, mais quand je lance mon code i aKafkaProducer Connection refused

13:20:17.688 [kafka-producer-network-thread | producer-1] 
DEBUG org.apache.kafka.clients.NetworkClient - Node -1 disconnected. 
13:20:17.784 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request 
13:20:17.784 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at kafkaAdress:2181. 
13:20:18.781 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with kafkaAdress/addressId disconnected 
java.net.ConnectException: Connection refused: no further information 
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) 
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) 
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72) 
at org.apache.kafka.common.network.Selector.poll(Selector.java:274) 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) 
at java.lang.Thread.run(Unknown Source) 

code:

String topic = "TST"; 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "kafkaAdress:2181"); 
    props.put("key.serializer",  "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    KafkaProducer<String, String> producer = new KafkaProducer<>(props); 
    for(int i = 0; i < 100; i++) 
     producer.send(new ProducerRecord<String, String>(topic, "TestMessage"));  
    producer.close(); 

Est-ce que quelqu'un sait comment résoudre ce problème?

J'utilise kafka 0.9.1

Répondre

5

Depuis Kafka 0.9 l'API producteur n'utilise Zookeeper.

La propriété bootstrap.servers doit contenir une liste de courtiers pour établir la connexion initiale au cluster Kafka.

2181 est le port de zookeeper. Le port par défaut pour un courtier est 9092.

+0

Ok, j'ai changé mes propriétés bootsrap.servers à ma liste de courtiers. L'erreur est apparue mais je n'envoie aucune donnée – Lukaszaq

+0

Les messages sont envoyés de manière asynchrone. vous devez essayer d'ajouter un rappel à la méthode d'envoi pour consigner l'exception renvoyée ou le décalage attaché à chaque message. – fhussonnois

1

Vous avez une erreur sur votre configuration, bootstrap.server est l'adresse du courtier Kafka, pas zookeeper. Le producteur utilise toujours l'adresse du courtier pour publier les messages.