2017-08-02 1 views
2

J'écris une application dans Scala et j'utilise des flux Akka.Flux Akka - filtrage par le nombre d'éléments dans le flux

À un moment donné, j'ai besoin de filtrer les flux qui ont moins de N éléments, avec N donné. Ainsi, par exemple, avec N=5:

Source(List(1,2,3)).via(myFilter)  // => List() 
Source(List(1,2,3,4)).via(myFilter)  // => List() 

deviendront flux vides, et

Source(List(1,2,3,4,5)).via(myFilter) // => List(1,2,3,4,5) 
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6) 

ne changera pas.

Bien sûr, nous ne pouvons pas connaître le nombre d'éléments dans le flux jusqu'à ce que ce soit fini, et attendre la fin avant de le pousser peut ne pas être la meilleure idée.

Ainsi, au lieu, j'ai pensé à l'algorithme suivant:

  1. pour la première N-1 éléments, il suffit de les tampons, sans passer plus loin;
  2. si le flux d'entrée se termine avant d'atteindre le Nième élément, sortir un flux vide;
  3. Si le flux d'entrée atteint le Nième élément, sortir les éléments N-1 tamponnés, puis sortir le Nième élément, puis passer tous les éléments suivants qui viennent.

Cependant, je ne sais pas comment construire un élément Flow l'implémentant. Y a-t-il des éléments Akka intégrés que je pourrais utiliser?

Edit:

Bon, alors j'ai joué avec hier et je suis venu avec quelque chose comme ça:

Flow[Int]. 
    prefixAndTail(N). 
    flatMapConcat { 
    case (prefix, tail) if prefix.length == N => 
     Source(prefix).concat(tail) 
    case _ => 
     Source.empty[Int] 
    } 

aura-t-il faire ce que je veux?

Répondre

1

Cela peut être l'un de ces cas où un petit "état" peut aller loin. Même si la solution n'est pas "purement fonctionnelle", l'état de mise à jour sera isolé et inaccessible par le reste du système. Je pense que c'est l'une des beautés de scala: quand une solution FP n'est pas évidente, vous pouvez toujours revenir à l'impératif d'une manière isolée ...

Le Flow complété sera une combinaison de plusieurs sous-parties.Le premier flux sera tout simplement regrouper vos éléments en séquences de taille N:

Maintenant, pour la partie non fonctionnelle, un filtre qui ne permettra groupés Seq valeurs par si la première séquence était la bonne taille:

val minSizeRequirement : Int => Seq[Int] => Boolean = 
    (minSize) => { 
    var isFirst : Boolean = True 

    var passedMinSize : Boolean = False 

    (testSeq) => { 
     if(isFirst) { 
     isFirst = False 
     passedMinSize = testSeq.size >= minSize 
     passedMinSize 
     } 
     else 
     passedMinSize 
     } 
    } 
    } 

val minSizeFilter : Int => Flow[Seq[Int], Seq[Int], _] = 
    (minSize) => Flow[Seq[Int]].filter(minSizeRequirement(minSize)) 

La dernière étape consiste à convertir les valeurs Seq[Int] de nouveau dans Int valeurs:

val flatten = Flow[Seq[Int]].flatMapConcat(l => Source(l)) 

Enfin, les combiner tous ensemble:

val combinedFlow : Int => Flow[Int, Int, _] = 
    (minSize) => 
    group(minSize) 
     .via(minSizeFilter(minSize)) 
     .via(flatten) 
0

Peut-être statefulMapConcat pourrait vous aider:

import akka.actor.ActorSystem 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.{ActorMaterializer, Materializer} 

import scala.collection.mutable.ListBuffer 
import scala.concurrent.ExecutionContext 

object StatefulMapConcatExample extends App { 

    implicit val system: ActorSystem = ActorSystem() 
    implicit val materializer: Materializer = ActorMaterializer() 
    implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global 

    def filterLessThen(threshold: Int): (Int) => List[Int] = { 
    var buffering = true 
    val buffer: ListBuffer[Int] = ListBuffer() 
    (elem: Int) => 
     if (buffering) { 
     buffer += elem 
     if (buffer.size < threshold) { 
      Nil 
     } else { 
      buffering = false 
      buffer.toList 
     } 
     } else { 
     List(elem) 
     } 
    } 

    //Nil 
    Source(List(1, 2, 3)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Nil 
    Source(List(1, 2, 3, 4)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Vector(1,2,3,4,5) 
    Source(List(1, 2, 3, 4, 5)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Vector(1,2,3,4,5,6) 
    Source(List(1, 2, 3, 4, 5, 6)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 
}