2017-06-23 3 views
0

J'essaie de faire un peu de graphique avec Akka Stream et je rencontre des cas difficiles. Fondamentalement, je dois acheminer les messages vers et depuis le serveur Kafka et le serveur TCP. La partie difficile se produit parce que je dois parfois répondre au serveur TCP.Akka flux passerelle entre Kafka et TCP

  • Cas 1: TCP -> Kafka (ok)

  • Cas n ° 2: Kafka -> TCP (ok)

  • Cas n ° 3: TCP -> TCP

  • Case 4: rien -> TCP

Ca 3 se produit lorsque je reçois un message et que je devais demander des précisions au serveur avant de le publier sur Kafka

Le cas 4 se produit pour ouvrir la connexion TCP en envoyant la première prise de contact.

je pense à boucle de rétroaction, Source.actoRef, fan-out mais ne peut pas construire.

C'est surtout le graphique que je pense:

          +------------------+ 
              |     | 
              | TCP msg   | 
              | (Source.actorRef)| 
              |     | 
              +----+-------------+ 
               | 
               | 
+------------+  +-----------------+ +----v----+  +--------------+ 
|   +------->     +---->   +------>Kafka (Sink) | 
| TCP  |  | TLS   | | Router |  +--------------+ 
| (flow) |  | (bi-dir flow) | | (???) |  +--------------+ 
|   <-------+     <----+   <------+Kafka (Source)| 
+------------+  +-----------------+ +---------+  +--------------+ 

Je pense que je dois construire un graphe personnalisé avec GraphStage, mais manquer quelques referencies. Spécialement pour le Router, qui doit pouvoir accepter trois entrées et transmettre la réponse en 2 sorties différentes.

Si vous avez des indices, je l'adorerais.

Merci d'avance!

Répondre

0

Je l'ai finalement obtenu ... Voici un échantillon de mon code, si elle peut aider toute personne dans le

class Msg 
    case class Msg1(string: String) extends Msg 
    case class Msg2(string: String) extends Msg 
    case class Msg3(string: String) extends Msg 

def optionFilter[T]: Flow[Option[T], T, NotUsed] = { 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

val graph = GraphDSL.create() { implicit builder => 
    { 
    def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1) 
    val partition: UniformFanOutShape[Option[T], Option[T]] = 
     builder.add(Partition[Option[T]](2, partitioner)) 

    val flow = builder.add(Merge[T](1)) 

    partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> flow 
    partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> Sink.ignore 

    FlowShape(partition.in, flow.out) 
    } 
    } 
    Flow.fromGraph(graph) 
    } 

val msg1Filter = optionFilter[Msg1] 

def router(
    source1: Source[Msg, ActorRef], 
    source2: Source[Msg, _], 
    sink1: Sink[Msg1, _], 
    sink2: Sink[Msg2, _], 
    flow: Flow[Msg3, Msg, _] 
): ActorRef = { 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

    val graph: Graph[SinkShape[Msg], NotUsed] = GraphDSL.create() { implicit builder => 
    val unzipper: UnzipWith3[Msg, Option[Msg1], Option[Msg2], Option[Msg3]] = 
    UnzipWith { msg: Msg => 
     msg match { 
     case msg1: Msg1 => (Some(msg1), None, None) 
     case msg2: Msg2 => (None, Some(msg2), None) 
     case msg3: Msg3 => (None, None, Some(msg3)) 
     } 
    } 

    val merge = builder.add(Merge[Msg](3)) 
    val forward = builder.add(Merge[Msg](1)) 

    val unzip = builder.add(unzipper) 

    source2 ~> merge ~> unzip.in 
    forward.out ~> merge 

    unzip.out0 ~> optionFilter[Msg1] ~> sink1 
    unzip.out1 ~> optionFilter[Msg2] ~> sink2 
    unzip.out2 ~> optionFilter[Msg3] ~> flow ~> merge 

    SinkShape(forward.in(0)) 
} 

val sink: Sink[Msg, NotUsed] = Sink.fromGraph(graph) 

sink.runWith(source1) 
} 

val source1 = Source.actorRef(4096, OverflowStrategy.fail) 
val source2 = Source(List(Msg2("from source2"))) 

val sink1: Sink[Msg1, Future[Done]] = Sink.foreach((msg: Msg1) => println(s"sink1: $msg")) 
val sink2: Sink[Msg2, Future[Done]] = Sink.foreach((msg: Msg2) => println(s"sink2: $msg")) 

val flow = Flow.fromFunction((msg: Msg3) => { 
    val msg2 = Msg2("from the flow") 
    println(s"flow: forward msg $msg to $msg2") 
    msg2 
}) 

val actor = router(source1, source2, sink1, sink2, flow) 

actor ! Msg1("from source1 (actor)") 
actor ! Msg2("from source1 (actor)") 
actor ! Msg3("from source1 (actor)") 

Ce qui vous donne le résultat suivant

sink2: Msg2(from source2) 
sink1: Msg1(from source1 (actor)) 
sink2: Msg2(from source1 (actor)) 
flow: forward msg Msg3(from source1 (actor)) to Msg2(from the flow) 
sink2: Msg2(from the flow) 

aide quelqu'un espère que ce A l'avenir !