2016-11-05 1 views
6

je la hiérarchie de classe simple cas suivant:Akka flux partiels flux par type

sealed trait Message 
case class Foo(bar: Int) extends Message 
case class Baz(qux: String) extends Message 

Et j'ai un Flow[Message, Message, NotUsed] (à partir d'un protocole basé Websocket avec codec déjà en place).

Je souhaite démultiplexer ce Flow[Message] en flux séparés pour les types Foo et Baz, car ceux-ci sont traités par des chemins complètement différents.

Quelle est la manière la plus simple de le faire? Devrait être évident, mais il me manque quelque chose ...

Répondre

5

Une façon est d'utiliser créer un RunnableGraph qui inclut les flux pour chaque type de message.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 

    val in = Source(...) // Some message source 
    val out = Sink.ignore 

    val foo = builder.add(Flow[Message].map (x => x match { case [email protected](_) => f })) 
    val baz = builder.add(Flow[Message].map (x => x match { case [email protected](_) => b })) 
    val partition = builder.add(Partition[Message](2, { 
    case Foo(_) => 0 
    case Baz(_) => 1 
    })) 

    partition ~> foo ~> // other Flow[Foo] here ~> out 
    partition ~> baz ~> // other Flow[Baz] here ~> out 

    ClosedShape 
} 

g.run() 
+0

Droite, Partition. OK, je pourrais le faire. Il serait probablement avantageux d'avoir un combinateur intégré pour cela; peut-être, je ferai une demande de traction. –

+0

@AlexanderTemerev Ceci peut être d'intérêt: http://doc.akka.io/api/akka/2.4/?_ga=1.34091558.643806930.1478315511#akka.stream.scaladsl.Partition – Brian