Donc la réponse au "comment?" est:
import scalaz.stream._
import scalaz.stream.async._
import Process._
def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)
val q = unboundedQueue[Int]
val out = unboundedQueue[Int]
q.dequeue
.flatMap(e => emitAll(doSomething(e)))
.observe(out.enqueue)
.to(q.enqueue).run.runAsync(_ =>()) //runAsync can process failures, there is `.onFailure` as well
q.enqueueAll(List(3,5,7)).run
q.size.continuous
.filter(0==)
.map(_ => -1)
.to(out.enqueue).once.run.runAsync(_ =>()) //call it only after enqueueAll
import scalaz._, Scalaz._
val result = out
.dequeue
.takeWhile(_ != -1)
.map(_.point[List])
.foldMonoid.runLast.run.get //run synchronously
Résultat:
result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
Cependant, vous remarquerez peut-être que:
1) Je dû résoudre le problème de terminaison. Même problème pour akka-stream et beaucoup plus difficile à résoudre car vous n'avez pas accès à la file d'attente et pas de back-pression naturelle pour garantir que la file d'attente ne sera pas vide juste à cause des lecteurs rapides.
2) J'ai dû introduire une autre file d'attente pour la sortie (et la convertir en List
) car le travail devient vide à la fin du calcul. Donc, les deux bibliothèques ne sont pas très adaptées à de telles exigences (flux fini), cependant scalaz-stream (qui deviendra "fs2" après avoir supprimé la dépendance scalaz) est assez flexible pour implémenter votre idée. Le gros "mais" à propos de ça va être exécuté séquentiellement par défaut. 1) divisez votre doSomething en étapes, comme .flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)
puis mettez une autre file d'attente entre elles (environ 3 fois plus vite si les étapes prennent le même temps).
2) paralléliser le traitement de la file d'attente. Akka a mapAsync
pour cela - il peut faire map
s en parallèle automatiquement. Scalaz-stream a des blocs - vous pouvez grouper votre q en plusieurs morceaux, disons 5, puis traiter chaque élément à l'intérieur du bloc en parallèle. Quoi qu'il en soit, les deux solutions (akka vs scalaz) ne sont pas très adaptées pour utiliser une file d'attente à la fois en entrée et en sortie.
Mais, encore une fois, il est trop complexe et inutile car il est un moyen simple classique:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.flatMap(doSomething)
calculate(processed, acC++ processed)
}
scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
Et voici le parallélisés un:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.par.flatMap(doSomething).toList
calculate(processed, acC++ processed)
}
scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
Alors, oui Je dirais que ni les flux scalaz-stream ni les akka-streams ne correspondent à vos exigences; Cependant, les collections parallèles classiques de scala s'adaptent parfaitement.
Si vous avez besoin de calculs distribués sur plusieurs JVM - jetez un oeil à Apache Spark, son scala-dsl utilise le même style map/flatMap/fold. Il vous permet de travailler avec de grandes collections (en les mettant à l'échelle des JVM), qui ne rentrent pas dans la mémoire de la JVM, donc vous pouvez améliorer @tailrec def calculate
en utilisant RDD au lieu de List
. Il vous donnera également des intruments pour traiter les échecs à l'intérieur doSomething
.
P.S. Alors voici pourquoi je n'aime pas l'idée d'utiliser des bibliothèques de streaming pour de telles tâches. Le streaming est plus proche de flux infinis provenant de systèmes externes (comme HttpRequests) que de calculs de données prédéfinies (même grandes).
P.S.2 Si vous avez besoin comme réactif (sans blocage) vous pouvez utiliser Future
(ou scalaz.concurrent.Task
) + Future.sequence
file d'attente de travail (QE) est une liste [A]. Comme je traite chaque élément de WQ avec doSomething (qui renvoie une liste [A] par élément) cela va ajouter à la fin de WQ d'une manière purement fonctionnelle? – blotto
merci @ dk14. Je lisais un peu sur akka-streams et le graphique dsl aussi et peut-être que c'est un concurrent. Pour récapituler, nous commençons avec un seul item de type A, cet item passe par une fonction "doSomething" qui renvoie une liste d'items (Nil ou plus de items A) et chacun de ces items retourne ensuite dans la même fonction "doSomething" l'un après l'autre. Peut-être existe-t-il un moyen d'exprimer cela dans les flux akka-joliment avec doSomething agissant comme une fonction de flux? – blotto