2017-08-14 4 views
0

Je suis nouveau en utilisant les flux kakka akka (et les flux akka en général). J'essaie de construire un graphique afin de publier un message sur différents sujets. Comment puis-je connecter le producteur en tant que flux afin de valider les messages traités? J'ai essayé d'utiliser Producer.flow mais je ne peux pas obtenir le commitScaladslconnecter le flux du producteur au graphique

object TestFoo { 
    import akka.kafka.ProducerMessage.Message 
    implicit val system = ActorSystem("test-kafka") 
    implicit val materializer = ActorMaterializer() 
    val evenNumbersTopic = "even_numbers" 
    val allNumbersTopic = "all_numbers" 
    lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int]) 
    .withBootstrapServers("localhost:9092") 
    .withGroupId("group1") 
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
    lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic))) 
    val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer()) 
    .withBootstrapServers("localhost:9092") 
    val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 
    type TypedMessage = Message[String, Int,CommittableOffset] 
    val bcast = b.add(Broadcast[TypedMessage](2)) 
    val merge = b.add(Merge[TypedMessage](2)) 

    val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0) 
    val justEven = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int]("general", pr.value()) 
     Message(r, offset) 
    } 
    val allNumbers = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value()) 
     Message(r, offset) 
    } 

    val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg => 
     val r = new ProducerRecord[String, Int]("general", msg.record.value()) 
     Message(r, msg.committableOffset) 
    } 
    source ~> toMsg ~> bcast 

    bcast ~> evenFilter ~> justEven ~> merge 
    bcast ~> allNumbers ~> merge 
    merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result => 
     result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl() 
    } 
    ClosedShape 
    })} 
+0

Pour le moment cet exemple est truqué avec beaucoup d'autres erreurs de compilation. Pourriez-vous l'amender pour rendre votre erreur de compilation facilement reproductible? –

+0

@StefanoBonetti oui, j'ai mis à jour le code avec moins d'erreurs de compilation, merci – igx

Répondre

0

Parce que vous utilisez le GraphDSL, le compilateur ne peut pas déduire le type PassThrough de l'étape précédente. Essayez de transmettre explicitement les paramètres de type à la fonction Producer.flow, par ex.

merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result => 
    result.message.passThrough.commitScaladsl() 
} 

J'ai laissé K et V comme non liée param, s'il vous plaît s'y ajuster quelle que soit la clé/types de valeur votre producteur est tenu de produire. Si vous voulez que le code ci-dessus soit correctement câblé, vous devrez faire correspondre les types producerSettings avec ce qui vient de l'étape de fusion. Vous aurez besoin de quelque chose comme:

val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int]) 
    .withBootstrapServers("localhost:9092") 
+0

Merci, mais il donne en fait les mêmes résultats: '' 'val sink = Producer.flow [Chaîne, Chaîne, CommittableOffset] (producerSettings) .mapAsync (producerSettings.parallelism) {resultat => result.message.passThrough.commitScaladsl() // ne compile toujours pas, ne peut pas obtenir le .commitScaladsl() } '' ' – igx

+0

Pour moi, il semble que les types' producerSettings' sont '[String, String]', mais vous lui fournissez un enregistrement de type '[String, Int]'. J'ai modifié la réponse pour suggérer d'autres changements –