2015-12-07 1 views
0

J'ai une intuition que je peux (devrait?) Utiliser scalaz-streams pour résoudre mon problème qui est comme ça.structure de flux scalaz pour les listes croissantes

I ont un point de départ A. J'ai une fonction qui prend un A et renvoie une liste de A.

def doSomething(a : A) : List[A] 

j'ai une file d'attente de travail qui commence par 1 point (le point de départ). Lorsque nous traitons (doSomething) chaque élément, il peut ajouter de nombreux éléments à la fin de la même file d'attente. À un certain moment cependant (après plusieurs millions d'articles), chaque article suivant que nous commencerons à ajouter des éléments de moins en moins à la file d'attente de travail et finalement aucun nouvel élément ne sera ajouté (doSomething retournera Nil pour ces éléments). C'est ainsi que nous savons que le calcul finira par se terminer.

En supposant que scalaz-streams est approprié pour cela, est-ce que quelque chose pourrait me donner quelques conseils quant à la structure générale ou les types que je devrais examiner pour implémenter ceci? Une fois qu'une implémentation simple avec un seul "worker" est effectuée, j'aimerais également utiliser plusieurs travailleurs pour traiter des éléments de file d'attente en parallèle, par ex. Avoir un pool de 5 travailleurs (et chaque travailleur cultivait sa tâche à un agent pour calculer doSomething) donc je devrais gérer les effets (tels que les échecs de travail) aussi bien dans cet algorithme.

+0

file d'attente de travail (QE) est une liste [A]. Comme je traite chaque élément de WQ avec doSomething (qui renvoie une liste [A] par élément) cela va ajouter à la fin de WQ d'une manière purement fonctionnelle? – blotto

+0

merci @ dk14. Je lisais un peu sur akka-streams et le graphique dsl aussi et peut-être que c'est un concurrent. Pour récapituler, nous commençons avec un seul item de type A, cet item passe par une fonction "doSomething" qui renvoie une liste d'items (Nil ou plus de items A) et chacun de ces items retourne ensuite dans la même fonction "doSomething" l'un après l'autre. Peut-être existe-t-il un moyen d'exprimer cela dans les flux akka-joliment avec doSomething agissant comme une fonction de flux? – blotto

Répondre

2

Donc la réponse au "comment?" est:

import scalaz.stream._ 
import scalaz.stream.async._ 
import Process._ 

def doSomething(i: Int) = if (i == 0) Nil else List(i - 1) 

val q = unboundedQueue[Int] 
val out = unboundedQueue[Int] 

q.dequeue 
.flatMap(e => emitAll(doSomething(e))) 
.observe(out.enqueue) 
.to(q.enqueue).run.runAsync(_ =>()) //runAsync can process failures, there is `.onFailure` as well 

q.enqueueAll(List(3,5,7)).run 
q.size.continuous 
.filter(0==) 
.map(_ => -1) 
.to(out.enqueue).once.run.runAsync(_ =>()) //call it only after enqueueAll 

import scalaz._, Scalaz._ 
val result = out 
    .dequeue 
    .takeWhile(_ != -1) 
    .map(_.point[List]) 
    .foldMonoid.runLast.run.get //run synchronously 

Résultat:

result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0) 

Cependant, vous remarquerez peut-être que:

1) Je dû résoudre le problème de terminaison. Même problème pour akka-stream et beaucoup plus difficile à résoudre car vous n'avez pas accès à la file d'attente et pas de back-pression naturelle pour garantir que la file d'attente ne sera pas vide juste à cause des lecteurs rapides.

2) J'ai dû introduire une autre file d'attente pour la sortie (et la convertir en List) car le travail devient vide à la fin du calcul. Donc, les deux bibliothèques ne sont pas très adaptées à de telles exigences (flux fini), cependant scalaz-stream (qui deviendra "fs2" après avoir supprimé la dépendance scalaz) est assez flexible pour implémenter votre idée. Le gros "mais" à propos de ça va être exécuté séquentiellement par défaut. 1) divisez votre doSomething en étapes, comme .flatMap(doSomething1).flatMap(doSomething2).map(doSomething3) puis mettez une autre file d'attente entre elles (environ 3 fois plus vite si les étapes prennent le même temps).

2) paralléliser le traitement de la file d'attente. Akka a mapAsync pour cela - il peut faire map s en parallèle automatiquement. Scalaz-stream a des blocs - vous pouvez grouper votre q en plusieurs morceaux, disons 5, puis traiter chaque élément à l'intérieur du bloc en parallèle. Quoi qu'il en soit, les deux solutions (akka vs scalaz) ne sont pas très adaptées pour utiliser une file d'attente à la fois en entrée et en sortie.

Mais, encore une fois, il est trop complexe et inutile car il est un moyen simple classique:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
    if (l.isEmpty) acc else { 
    val processed = l.flatMap(doSomething) 
    calculate(processed, acC++ processed) 
    } 

scala> calculate(List(3,5,7), Nil) 
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0) 

Et voici le parallélisés un:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
    if (l.isEmpty) acc else { 
    val processed = l.par.flatMap(doSomething).toList 
    calculate(processed, acC++ processed) 
    } 

scala> calculate(List(3,5,7), Nil) 
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0) 

Alors, oui Je dirais que ni les flux scalaz-stream ni les akka-streams ne correspondent à vos exigences; Cependant, les collections parallèles classiques de scala s'adaptent parfaitement.

Si vous avez besoin de calculs distribués sur plusieurs JVM - jetez un oeil à Apache Spark, son scala-dsl utilise le même style map/flatMap/fold. Il vous permet de travailler avec de grandes collections (en les mettant à l'échelle des JVM), qui ne rentrent pas dans la mémoire de la JVM, donc vous pouvez améliorer @tailrec def calculate en utilisant RDD au lieu de List. Il vous donnera également des intruments pour traiter les échecs à l'intérieur doSomething.

P.S. Alors voici pourquoi je n'aime pas l'idée d'utiliser des bibliothèques de streaming pour de telles tâches. Le streaming est plus proche de flux infinis provenant de systèmes externes (comme HttpRequests) que de calculs de données prédéfinies (même grandes).

P.S.2 Si vous avez besoin comme réactif (sans blocage) vous pouvez utiliser Future (ou scalaz.concurrent.Task) + Future.sequence

+0

merci @ dk14 pour avoir pris le temps de raisonner sur les mérites des différentes solutions auxquelles vous avez pensé. – blotto