J'écris une application dans Scala et j'utilise des flux Akka.Flux Akka - filtrage par le nombre d'éléments dans le flux
À un moment donné, j'ai besoin de filtrer les flux qui ont moins de N éléments, avec N donné. Ainsi, par exemple, avec N=5
:
Source(List(1,2,3)).via(myFilter) // => List()
Source(List(1,2,3,4)).via(myFilter) // => List()
deviendront flux vides, et
Source(List(1,2,3,4,5)).via(myFilter) // => List(1,2,3,4,5)
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6)
ne changera pas.
Bien sûr, nous ne pouvons pas connaître le nombre d'éléments dans le flux jusqu'à ce que ce soit fini, et attendre la fin avant de le pousser peut ne pas être la meilleure idée.
Ainsi, au lieu, j'ai pensé à l'algorithme suivant:
- pour la première N-1 éléments, il suffit de les tampons, sans passer plus loin;
- si le flux d'entrée se termine avant d'atteindre le Nième élément, sortir un flux vide;
- Si le flux d'entrée atteint le Nième élément, sortir les éléments N-1 tamponnés, puis sortir le Nième élément, puis passer tous les éléments suivants qui viennent.
Cependant, je ne sais pas comment construire un élément Flow
l'implémentant. Y a-t-il des éléments Akka intégrés que je pourrais utiliser?
Edit:
Bon, alors j'ai joué avec hier et je suis venu avec quelque chose comme ça:
Flow[Int].
prefixAndTail(N).
flatMapConcat {
case (prefix, tail) if prefix.length == N =>
Source(prefix).concat(tail)
case _ =>
Source.empty[Int]
}
aura-t-il faire ce que je veux?