2017-09-28 1 views
1

J'essaie d'écrire un simple consommateur de messages de kafka en utilisant des flux akka.Le consommateur a été interrompu avec WakeupException après expiration du délai. Message: null. La valeur actuelle de akka.kafka.consumer.wakeup-timeout est 3000 millisecondes

build.sbt

"com.typesafe.akka" %% "akka-stream-kafka" % "0.17" 

Mon code

object AkkaStreamskafka extends App { 

    // producer settings 
    implicit val system = ActorSystem() 
    implicit val actorMaterializer = ActorMaterializer() 
    val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer)) 
     .withBootstrapServers("foo:9092") 
     .withGroupId("abhi") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") 

    val source = Consumer 
     .committableSource(consumerSettings, Subscriptions.topics("my-topic)) 
    val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg => 
     msg.committableOffset.commitScaladsl().map(_ => msg.record.value); 
     } 
    val sink = Sink.foreach[String](println) 
    val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => 
     s => 
     import GraphDSL.Implicits._ 
     source ~> flow ~> s.in 
     ClosedShape 
    }) 
    val future = graph.run() 
    Await.result(future, Duration.Inf) 
} 

Mais je reçois une erreur

[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7] 
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. 
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds 

Edit:

Je peux faire un ssh foo puis tapez la co suivant mmand sur le terminal du serveur ./kafka-console-consumer --zookeeper localhost:2181 --topic my-topic et je peux voir des données. Donc, je suppose que mon nom de serveur foo est correct et kafka est opérationnel sur cette machine.

Edit2:

Sur le serveur Kafka je suis en cours d'exécution Cloudera 5.7.1. La version de Kafka est pots/kafka_2.10-0.9.0-kafka-2.0.0.jar

Répondre

0

J'ai été capable de résoudre le problème moi-même.

La bibliothèque "com.typesafe.akka" %% "akka-stream-kafka" ne fonctionne que pour Kafka 0.10 et au-delà. cela ne fonctionne pas pour les versions antérieures de Kafka. Quand j'ai énuméré les pots de kafka sur mon serveur de kafka j'ai trouvé que j'utilise Cloudera 5.7.1 qui vient avec Kafka 0.9.

Afin de créer une source Akka Streams pour cette version. Je devais utiliser

"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0" 

Ils ont aussi un exemple, il https://github.com/kciesielski/reactive-kafka

Ce code a parfaitement fonctionné pour moi

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer() 
val kafka = new ReactiveKafka() 
val consumerProperties = ConsumerProperties(
    bootstrapServers = "foo:9092", 
    topic = "my-topic", 
    groupId = "abhi", 
    valueDeserializer = new StringDeserializer() 
) 

val source = Source.fromPublisher(kafka.consume(consumerProperties)) 
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value()) 
val sink = Sink.foreach[String](println) 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder => 
s => 
    import GraphDSL.Implicits._ 
    source ~> flow ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
future.onComplete{_ => 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf)