2016-10-06 1 views
1

I ont un flux defiltre Streams & Akka groupe par une collection de clés

case class Msg(keys: Seq[Char], value: String) 

Maintenant, je veux filtrer un sous-ensemble de clés par exemple Puis, divisez-les pour que certaines clés soient traitées par des flux différents, puis fusionnées à la fin;

        /-key=k-> f1 --\ 
Source[Msg] ~> Filter ~> router |--key=f-> f2 ----> Merge --> f4 
           \-key=c-> f3 --/ 

Comment dois-je procéder? À l'ancienne façon semblait être un bon moyen d'aller mais dans la nouvelle API, je devine que je veux soit faire un GraphStage personnalisé ou créer mon propre graphique de la DSL comme je ne vois pas comment faire cela à travers les étapes intégrées ..?

Répondre

4

Petit Set Key Solution

Si votre clé est petite, et immuable, puis une combinaison de diffusion et le filtre serait probablement la mise en œuvre plus facile à comprendre. Vous devez d'abord définir le filtre que vous avez décrit:

def goodKeys(keySet : Set[Char]) = Flow[Msg] filter (_.keys exists keySet.contains) 

Cela peut ensuite alimenter un diffuseur comme décrit in the documentation. Toutes les Msg valeurs avec les bonnes clés seront retransmises à chacun des trois filtres, et chaque filtre ne permettront une clé particulière:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 

    val source : Source[Msg] = ??? 

    val goodKeyFilter = goodKeys(Set('k','f','c')) 

    val bcast = builder.add(BroadCast[Msg](3)) 
    val merge = builder.add(Merge[Msg](3)) 

    val kKey = goodKeys(Set('k')) 
    val fKey = goodKeys(Set('f')) 
    val cKey = goodKeys(Set('c')) 

    //as described in the question 
    val f1 : Flow[Msg, Msg, _] = ??? 
    val f2 : Flow[Msg, Msg, _] = ??? 
    val f3 : Flow[Msg, Msg, _] = ??? 

    val f4 : Sink[Msg,_] = ??? 

    source ~> goodKeyFilter ~> bcast ~> kKey ~> f1 ~> merge ~> f4 
          bcast ~> fKey ~> f2 ~> merge 
          bcast ~> cKey ~> f3 ~> merge 

Large Set Key Solution

Si vous définissez la clé est grande, alors groupBy est meilleur. Supposons que vous avez un Map de touches aux fonctions:

//e.g. 'k' -> f1 
val keyFuncs : Map[Set[Char], (Msg) => Msg] 

Cette carte peut être utilisée avec la fonction groupBy:

source 
    .via(goodKeys(Set('k','f','c')) 
    .groupBy(keyFuncs.size, _.keys) 
    .map(keyFuncs(_.keys)) //apply one of f1,f2,f3 to the Msg 
    .mergeSubstreams