2016-12-07 11 views

Répondre

1

Vous pouvez vérifier si le serveur est en cours d'exécution en utilisant ceci:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$); 
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); 
if (brokers.isEmpty()) { 
    // No brokers available 
} else { 
    // There are brokers available 
} 
+0

Ceci est un moyen de vérifier la connexion Zookeeper, pas le serveur Kafka. – dbustosp

3

J'ai eu la même question et je ne veux pas laisser cette question sans réponse. Je lis beaucoup sur la façon dont je peux vérifier la connexion et la plupart des réponses que j'ai trouvées vérifiait la connexion avec Zk, mais je veux vraiment vérifier la connexion directement avec le serveur Kafka.

Ce que je fait est de créer un simple, KafkaConsumer et la liste de tous les sujets avec listTopics(). Si la connexion est réussie, alors vous obtiendrez quelque chose comme un retour. Sinon, vous obtiendrez un TimeoutException.

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString) 
    props.put("group.id", kafkaParams.get("group.id").get.toString) 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    val simpleConsumer = new KafkaConsumer[String, String](props) 
    simpleConsumer.listTopics() 
    } 

alors vous pouvez envelopper cette méthode dans une phrase try-catch pour attraper l'exception.