J'utilise le service Druid Kafka Indexing pour charger mes propres flux de Kafka. J'utilise Load from Kafka tutorial pour l'implémenter.Druide Kafka ingestion (impliquer-2.2.3): erreur kafka NoReplicaOnlineException
Kafka a tous les paramètres par défaut (juste extraits de tgz).
Quand je commence à implique-2.2.3 (Druide) avec des données vides (après var de dossier supprimer) tout fonctionne correctement.
Mais quand je me arrête Kafka 2.11-0.10.2.0 et commence il se produit à nouveau et erreur Druide ingestion Kafka ne travaille pas plus jusqu'à ce que j'arrête Implique (Druide) et supprimer toutes les données (à savoir supprimer dossier var).
Parfois, Druid n'injecte pas de données de Kafka, même pas d'erreurs dans Kafka. Lorsque je supprime le dossier var dans Druide tout est réparé jusqu'à la prochaine erreur.
Erreur:
kafka.common.NoReplicaOnlineException: No replica for partition [__consumer_offsets,19] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:73) ~[kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:339) ~[kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:200) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:115) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:112) [kafka_2.11-0.10.2.0.jar:?]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.8.jar:?]
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) [scala-library-2.11.8.jar:?]
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) [scala-library-2.11.8.jar:?]
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) [scala-library-2.11.8.jar:?]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) [scala-library-2.11.8.jar:?]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) [scala-library-2.11.8.jar:?]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.8.jar:?]
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:112) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:67) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:342) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:51) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:49) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:681) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) [kafka_2.11-0.10.2.0.jar:?]
at kafka.controller.KafkaController.startup(KafkaController.scala:677) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.KafkaServer.startup(KafkaServer.scala:224) [kafka_2.11-0.10.2.0.jar:?]
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39) [kafka_2.11-0.10.2.0.jar:?]
at kafka.Kafka$.main(Kafka.scala:67) [kafka_2.11-0.10.2.0.jar:?]
at kafka.Kafka.main(Kafka.scala) [kafka_2.11-0.10.2.0.jar:?]
Les étapes que je faisais:
1. Démarrez Implique:
bin/supervise -c conf/supervise/quickstart.conf
2. Démarrer Kafka:
./bin/kafka-server-start.sh config/server.properties
3. Créer un sujet:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikiticker
4. Activer Druide ingestion Kafka:
curl -XPOST -H'Content-Type: application/json' -d @quickstart/wikiticker-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor
5. événements post au sujet Kafka qui ont ensuite été ingérée en Druide par le service d'indexation Kafka
Dans tous les .properties fichiers (common.runtime.properties, courtier, coordonnateur, historique, middlemanager, Overlord) a ajouté la propriété:
druid.extensions.loadList=["druid-caffeine-cache", "druid-histogram", "druid-datasketches", "druid-kafka-indexing-service"]
qui comprend "Druid-kafka-indexation service" pour fournir ingérer le service. Je crois que de tels problèmes ne devraient pas se produire avec Druid Kafka Indexing.
Existe-t-il des moyens de résoudre ce problème?