J'essaie d'écrire un simple consommateur de messages de kafka en utilisant des flux akka.Le consommateur a été interrompu avec WakeupException après expiration du délai. Message: null. La valeur actuelle de akka.kafka.consumer.wakeup-timeout est 3000 millisecondes
build.sbt
"com.typesafe.akka" %% "akka-stream-kafka" % "0.17"
Mon code
object AkkaStreamskafka extends App {
// producer settings
implicit val system = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer))
.withBootstrapServers("foo:9092")
.withGroupId("abhi")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val source = Consumer
.committableSource(consumerSettings, Subscriptions.topics("my-topic))
val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg =>
msg.committableOffset.commitScaladsl().map(_ => msg.record.value);
}
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder =>
s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
Await.result(future, Duration.Inf)
}
Mais je reçois une erreur
[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7]
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout.
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
Edit:
Je peux faire un ssh foo
puis tapez la co suivant mmand sur le terminal du serveur ./kafka-console-consumer --zookeeper localhost:2181 --topic my-topic
et je peux voir des données. Donc, je suppose que mon nom de serveur foo
est correct et kafka est opérationnel sur cette machine.
Edit2:
Sur le serveur Kafka je suis en cours d'exécution Cloudera 5.7.1. La version de Kafka est pots/kafka_2.10-0.9.0-kafka-2.0.0.jar