2017-08-30 1 views
0

J'essaie d'écrire un graphique Akka Stream. Le code que je l'ai écrit estAlimentation de la sortie d'un flux vers une diffusion dans Akka Streams Graphique

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder => 
    (sink1, sink2) => 
     import GraphDSL.Implicits._ 
     val bcast = builder.add(Broadcast[Row](2)) 
     val flow = source ~> flow1 ~> flow2 
     flow.out ~> bcast.in 
     bcast.out(0) ~> sink1 
     bcast.out(1) ~> flow3 ~> flow4 ~> sink2 
     ClosedShape 
}) 

val (f1, f2) = graph.run() 
val consolidated = Future.sequence(List(f1, f2)) 
Await.result(consolidated, Duration.Inf) 

Ce code ne compile pas parce que je ne peux pas connecter le flux de la dans de BCAST.

Je peux connecter le hors de la source à l'in de la bcast, mais je ne peux pas faire cela parce qu'une partie est commune entre les deux branches. Donc je dois créer la branche dans le graphique seulement après flow2

Aussi ... Je ne suis pas sûr si j'écris le graphique correctement parce qu'il retourne deux futurs de fait et je dois les combiner manuellement dans un futur simple en utilisant la séquence.

Répondre

1

Vous ne pouvez pas câbler votre graphique en 2 étapes, car le combinateur ~> ne vous permet pas de revenir en arrière. C'est en fait une opération déclarative et déclarative.

Une meilleure approche ici serait de câbler votre graphique en une fois, par exemple.

source ~> flow1 ~> flow2 ~> bcast 
           bcast   ~>   sink1 
           bcast ~> flow3 ~> flow4 ~> sink2 

ou, en variante, on peut diviser les déclarations en ajoutant une étape au constructeur (et récupération de sa forme), par exemple

val flow2s = builder.add(flow2) 

    source ~> flow1 ~> flow2s.in 
    flow2s.out ~> bcast 
       bcast   ~>   sink1 
       bcast ~> flow3 ~> flow4 ~> sink2 

En ce qui concerne les matérialisées Future s, vous devez choisir ce qui est significatif en tant que valeur matérialisée de votre graphique dans son ensemble. Si vous avez seulement besoin de l'un des 2 Sink s matérialisés Future s, vous devez passer uniquement celui-là à la méthode GraphDSL.create. Si, par ailleurs, vous êtes intéressé à la fois Future, il est parfaitement logique de sequence ou zip ensemble.