2017-01-24 3 views
2

J'essaye de consommer des messages de Kafka en utilisant la bibliothèque kafka réactive d'Akka. Je reçois un message imprimé et après que je suis arrivéRéactif-Kafka Stream Consommateur: lettres mortes ont eu lieu

[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSourceConsumerMain/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://CommittableSourceConsumerMain/deadLetters] to Actor[akka://CommittableSourceConsumerMain/system/kafka-consumer-1#-1726905274] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

Ce code J'exécute

import akka.actor.ActorSystem 
import akka.kafka.scaladsl.Consumer 
import akka.kafka.{ConsumerSettings, Subscriptions} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import org.apache.kafka.clients.consumer.ConsumerConfig 
import play.api.libs.json._ 
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} 

object CommittableSourceConsumerMain extends App { 

    implicit val system = ActorSystem("CommittableSourceConsumerMain") 
    implicit val materializer = ActorMaterializer() 
    val consumerSettings =ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer).withBootstrapServers("localhost:9092").withGroupId("CommittableSourceConsumer").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 

    val done = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
     .mapAsync(1) { msg => 
     val record=(msg.record.value()) 
     val data=Json.parse(record) 

     val recordType=data \ "data" \"event" \"type" 

     val actualData=data \ "data" \ "row" 

     if(recordType.as[String]=="created"){ 
      "Some saving logic" 
     } 

     else{ 

     "Some logic" 

     } 
     msg.committableOffset.commitScaladsl() 
     } 
     .runWith(Sink.ignore) 
} 

Répondre

1

J'ai finalement compris la solution. En raison d'une exception d'exécution dans le flux, une erreur Future est renvoyée qui met immédiatement fin au flux. Akka-stream ne fournit pas ou affiche l'exception d'exécution. De manière à connaître l'exception

done.onFailure{ 
     case NonFatal(e)=>println(e) 
     } 

L'exception était dans le bloc if-else. En outre, on peut utiliser la stratégie d'acteur pour reprendre le flux si une exception se produit.

+0

Je suis également confronté au même problème. pourriez-vous me dire comment utiliser la stratégie d'acteur pour reprendre le flot. Merci. – Rajesh