2017-09-28 4 views
0

Une partie de mon code déclenche l'exception EOF lorsque le courtier Kafka écoute sur SSL avec le port 9093, dans l'écouteur de texte clair, l'extrait de code fonctionne correctement.Obtention de l'exception EOF lors de la lecture de BlockingChannel sur Kafka s'exécutant sur SSL/TLS Listener

Des idées ce qui pourrait être mauvais ici ??

 public KafkaMetadataHelper(String kafkaConnect) throws Exception { 
    // use lowlevel kafka.api to query consumer group metadata (ie max committed offset) 
    String[] hostAndPort = kafkaConnect.split(":"); 
    String host = hostAndPort[0]; 
    int port = Integer.parseInt(hostAndPort[1]); 
    channel = new BlockingChannel(host, port, 
            BlockingChannel.UseDefaultBufferSize(), 
            BlockingChannel.UseDefaultBufferSize(), 
            10000); 
    channel.connect(); 
    GroupCoordinatorRequest request = new GroupCoordinatorRequest(MY_GROUP, 
                    GroupCoordinatorRequest.CurrentVersion(), 
                    correlationId++, 
                    MY_CLIENTID); 
    channel.send(request); 
    GroupCoordinatorResponse metadataResponse = null; 
    try { 
     metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());// This is where the exception is thrown 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

}

Le message d'erreur que je reçois est cela.

java.io.EOFException 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103) 
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) 
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) 

Répondre

0

Pour vous connecter via TLS, votre client a besoin de certains paramètres! BlockingChannel ne permet pas à l'appelant de spécifier des paramètres.

Je vous suggère de regarder ConsumerGroupCommand.scala [1] et de voir comment il utilise AdminClient [2] pour récupérer des détails sur les groupes de consommateurs.

  1. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L496
  2. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala#L197
+0

Merci, je vais étudier cela et partager mes entrées ... –