0

Je cours Kafka 0.10.0 sur CDH 5.9, le cluster est kerborisé. Ce que j'essaie de faire est d'écrire des messages d'une machine distante à mon courtier Kafka. Le cluster (où est installé Kafka) possède des adresses IP internes et externes. Les noms d'hôte des machines du cluster sont résolus en adresses IP privées, la machine distante les mêmes noms d'hôte que les adresses IP publiques. J'ai ouvert le port 9092 nécessaire (j'utilise le protocole SASL_PLAINTEXT) de la machine distante à Kafka Broker, vérifié à l'aide de telnet.Kafka Remote Producer - advertised.listeners

Première étape - en plus des propriétés standard pour le courtier Kafka, je configurer les paramètres suivants:

listeners=SASL_PLAINTEXT://0.0.0.0:9092 

advertised.listeners=SASL_PLAINTEXT://<hostname>:9092 

Je suis en mesure de commencer le consommateur avec la console

kafka-console-consumer --new consumer --topic <topicname> --from-beginning --bootstrap-server <hostname>:9092 --consumer.config consumer.properties 

Je suis capable d'utiliser mon producteur personnalisé à partir d'une autre machine dans le cluster. extrait pertinent des propriétés de production:

security.protocol=SASL_PLAINTEXT 

bootstrap.servers=<hostname>:9092 

Je ne suis pas en mesure d'utiliser mon producteur personnalisé de la machine distante:

Exception org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for <topicname>-<partition> 

en utilisant les mêmes propriétés de producteurs. Je suis capable de telnet le courtier de Kafka de la machine et/etc/hosts inclut des noms d'hôte et des adresses IP publiques.

Deuxième étape - Je modifié server.properties:

listeners=SASL_PLAINTEXT://0.0.0.0:9092 
advertised.listeners=SASL_PLAINTEXT://<kafkaBrokerInternalIP>:9092 
  • consommateur & producteur dans le même cluster fonctionnent toujours très bien (bootstrap serveurs sont maintenant l'adresse IP interne avec le port 9092)
  • comme attendu le producteur distant échoue (mais cela est évident étant donné qu'il ne connaît pas les adresses IP internes)

Troisième étape - où il obtient poilue :(

listeners=SASL_PLAINTEXT://0.0.0.0:9092 
advertised.listeners=SASL_PLAINTEXT://<kafkaBrokerPublicIP>:9092 

commencer ma consommation avec

kafka-console-consumer --new-consumer --topic <topicname> --from-beginning --bootstrap-server <hostname>:9092 --consumer.config consumer.properties 

me donne un avertissement, mais je ne pense pas que ce soit vrai ...

WARN clients.NetworkClient: Error while fetching metadata with correlation id 1 : {<topicname>=LEADER_NOT_AVAILABLE} 

commencer ma consommation avec

kafka-console-consumer --new-consumer --topic <topicname> --from-beginning --bootstrap-server <KafkaBrokerPublicIP>:9092 --consumer.config consumer.properties 

pend juste après les messages du journal:

INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0 
INFO utils.AppInfoParser: Kafka commitId : unknown 

Semble comme il ne peut pas trouver un coordonnateur comme dans le flux normal ce serait le prochain journal :

INFO internals.AbstractCoordinator: Discovered coordinator <hostname>:9092 (id: <someNumber> rack: null) for group console-consumer-<someNumber>. 

démarrage du producteur sur un nœud de cluster avec bootstrap.servers =: 9092 J'observe le même que celui avec le producteur:

WARN NetworkClient:600 - Error while fetching metadata with correlation id 0 : {<topicname>=LEADER_NOT_AVAILABLE} 

à partir du producteur sur un nœud de cluster avec bootstrap.servers =: 9092 je reçois

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 

à partir du producteur sur ma machine distante avec soit bootstrap.servers =: 9092 ou bootstrap.servers =: 9092 je reçois

NetworkClient:600 - Error while fetching metadata with correlation id 0 : {<topicname>=LEADER_NOT_AVAILABLE} 

Je suis st Ruging au cours des trois derniers jours pour obtenir ce travail, mais je suis à court d'idées:/Ma compréhension est que advertised.hostnames sert exactement dans ce but, mais soit je fais quelque chose de mal, ou il y a quelque chose de mal dans la configuration de la machine .

Tous les conseils sont très appréciés!

Répondre

0

J'ai récemment rencontré ce problème. Dans mon cas, j'ai activé Kafka ACL, et après l'avoir désactivé en commentant cette configuration, le problème a fonctionné.

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
super.users=User:kafka 

Et un fil peut vous aider, je pense: https://gist.github.com/jorisdevrede/a7933a99251452bb1867

Qu'est-ce qu'il mentionne à la fin:

Si vous utilisez uniquement un écouteur de SASL_PLAINTEXT sur le courtier Kafka, vous Assurez-vous que vous avez défini le security.inter.broker.protocol = SASL_PLAINTEXT aussi, sinon obtiendrez une erreur LEADER_NOT_AVAILABLE dans le client.