Je suis nouveau en utilisant les flux kakka akka (et les flux akka en général). J'essaie de construire un graphique afin de publier un message sur différents sujets. Comment puis-je connecter le producteur en tant que flux afin de valider les messages traités? J'ai essayé d'utiliser Producer.flow mais je ne peux pas obtenir le commitScaladsl
connecter le flux du producteur au graphique
object TestFoo {
import akka.kafka.ProducerMessage.Message
implicit val system = ActorSystem("test-kafka")
implicit val materializer = ActorMaterializer()
val evenNumbersTopic = "even_numbers"
val allNumbersTopic = "all_numbers"
lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092")
val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
type TypedMessage = Message[String, Int,CommittableOffset]
val bcast = b.add(Broadcast[TypedMessage](2))
val merge = b.add(Merge[TypedMessage](2))
val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0)
val justEven = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int]("general", pr.value())
Message(r, offset)
}
val allNumbers = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
Message(r, offset)
}
val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
val r = new ProducerRecord[String, Int]("general", msg.record.value())
Message(r, msg.committableOffset)
}
source ~> toMsg ~> bcast
bcast ~> evenFilter ~> justEven ~> merge
bcast ~> allNumbers ~> merge
merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
}
ClosedShape
})}
Pour le moment cet exemple est truqué avec beaucoup d'autres erreurs de compilation. Pourriez-vous l'amender pour rendre votre erreur de compilation facilement reproductible? –
@StefanoBonetti oui, j'ai mis à jour le code avec moins d'erreurs de compilation, merci – igx