2017-08-03 2 views
0

J'utilise le service Druid Kafka Indexing pour charger mes propres flux de Kafka. J'utilise Load from Kafka tutorial pour l'implémenter.Druide Kafka ingestion (impliquer-2.2.3): erreur kafka NoReplicaOnlineException

Kafka a tous les paramètres par défaut (juste extraits de tgz).

Quand je commence à implique-2.2.3 (Druide) avec des données vides (après var de dossier supprimer) tout fonctionne correctement.

Mais quand je me arrête Kafka 2.11-0.10.2.0 et commence il se produit à nouveau et erreur Druide ingestion Kafka ne travaille pas plus jusqu'à ce que j'arrête Implique (Druide) et supprimer toutes les données (à savoir supprimer dossier var).

Parfois, Druid n'injecte pas de données de Kafka, même pas d'erreurs dans Kafka. Lorsque je supprime le dossier var dans Druide tout est réparé jusqu'à la prochaine erreur.

Erreur:

kafka.common.NoReplicaOnlineException: No replica for partition [__consumer_offsets,19] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] 
    at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:73) ~[kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:339) ~[kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:200) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:115) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:112) [kafka_2.11-0.10.2.0.jar:?] 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) [scala-library-2.11.8.jar:?] 
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) [scala-library-2.11.8.jar:?] 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.8.jar:?] 
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:112) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:67) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:342) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:51) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:681) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.controller.KafkaController.startup(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.KafkaServer.startup(KafkaServer.scala:224) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.Kafka$.main(Kafka.scala:67) [kafka_2.11-0.10.2.0.jar:?] 
    at kafka.Kafka.main(Kafka.scala) [kafka_2.11-0.10.2.0.jar:?] 

Les étapes que je faisais:

1. Démarrez Implique:

bin/supervise -c conf/supervise/quickstart.conf 

2. Démarrer Kafka:

./bin/kafka-server-start.sh config/server.properties 

3. Créer un sujet:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikiticker 

4. Activer Druide ingestion Kafka:

curl -XPOST -H'Content-Type: application/json' -d @quickstart/wikiticker-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor 

5. événements post au sujet Kafka qui ont ensuite été ingérée en Druide par le service d'indexation Kafka

Dans tous les .properties fichiers (common.runtime.properties, courtier, coordonnateur, historique, middlemanager, Overlord) a ajouté la propriété:

druid.extensions.loadList=["druid-caffeine-cache", "druid-histogram", "druid-datasketches", "druid-kafka-indexing-service"] 

qui comprend "Druid-kafka-indexation service" pour fournir ingérer le service. Je crois que de tels problèmes ne devraient pas se produire avec Druid Kafka Indexing.

Existe-t-il des moyens de résoudre ce problème?

Répondre

0

Le message indique que le courtier avec l'ID 0 est hors service et parce qu'il est le seul courtier hébergeant cette partition, vous ne pouvez pas utiliser cette partition pour le moment. Vous devez vous assurer que le courtier 0 est opérationnel.

0

On dirait que vous avez un cluster Kafka à un seul nœud et que le seul nœud de courtier est en panne. Ce n'est pas une configuration très tolérante aux fautes. Vous devriez avoir 3 courtiers Kafka et configurer tous les sujets avec un facteur de réplication de 3 afin que le système fonctionne même si un ou deux courtiers Kafka sont en panne. Les clusters à nœud unique sont généralement utilisés uniquement pour le développement.

0

Je l'ai corrigé en ajoutant 3 courtiers Kafka et j'ai configuré tous les sujets avec un facteur de réplication de 3 pour la stabilité de Kafka.

Dans Druide j'ai résolu le problème en augmentant druid.worker.capacity dans middleManager et en diminuant taskDuration en ioConfig de la spécification du superviseur.

Détails dans another question.