2015-09-06 1 views
7

Je dois créer un akka.stream.scaladsl.Source[T, Unit] à partir d'une collection de Future[T].Dans akka-stream comment créer une source non ordonnée à partir d'une collection de futures

ayant par exemple une collection de futurs entiers de retour,

val f1: Future[Int] = ??? 
val f2: Future[Int] = ??? 
val fN: Future[Int] = ??? 
val futures = List(f1, f2, fN) 

comment créer un

val source: Source[Int, Unit] = ??? 

de lui.

Je ne peux pas utiliser le combinateur Future.sequence, car j'attendrais que chaque futur se termine avant d'obtenir quoi que ce soit de la source. Je veux obtenir des résultats dans toute commande dès que tout futur se termine.

Je comprends que Source est une API purement fonctionnelle et qu'elle ne devrait pas fonctionner avant de la matérialiser. Donc, mon idée est d'utiliser un Iterator (qui est paresseux) pour créer une source:

Source {() => 
    new Iterator[Future[Int]] { 
    override def hasNext: Boolean = ??? 
    override def next(): Future[Int] = ??? 
    } 
} 

Mais ce serait une source de contrats à terme, et non des valeurs réelles. Je pourrais également bloquer sur next en utilisant Await.result(future) mais je ne suis pas sûr quel fil de discussion sera bloqué. Aussi, cela appellera les futurs séquentiellement, alors que j'ai besoin d'une exécution parallèle.

MISE À JOUR 2: il est avéré qu'il y avait un moyen de faire beaucoup plus facile il (grâce à Viktor Klang):

Source(futures).mapAsync(1)(identity) 

MISE À JOUR: voici ce que j'ai basé sur @sschaef réponse:

def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = { 
    def run(actor: ActorRef): Unit = { 
    futures.foreach { future => 
     future.onComplete { 
     case Success(value) => 
      actor ! value 
     case Failure(NonFatal(t)) => 
      actor ! Status.Failure(t) // to signal error 
     } 
    } 

    Future.sequence(futures).onSuccess { case _ => 
     actor ! Status.Success(()) // to signal stream's end 
    } 
    } 

    Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run) 
} 

// ScalaTest tests follow 

import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

"futuresToSource" should "convert futures collection to akka-stream source" in { 
    val f1 = Future(1) 
    val f2 = Future(2) 
    val f3 = Future(3) 

    whenReady { 
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _) 
    } { results => 
    results should contain theSameElementsAs Seq(1, 2, 3) 
    } 
} 

it should "fail on future failure" in { 
    val f1 = Future(1) 
    val f2 = Future(2) 
    val f3 = Future.failed(new RuntimeException("future failed")) 

    whenReady { 
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed 
    } { t => 
    t shouldBe a [RuntimeException] 
    t should have message "future failed" 
    } 
} 

Répondre

6

Création d'une source de contrats à terme, puis "aplatir" via mapAsync:

scala> Source(List(f1,f2,fN)).mapAsync(1)(identity) 
res0: akka.stream.scaladsl.Source[Int,Unit] = [email protected] 
+0

Et si mes contrats à terme sont de type 'Future [Source [T, Unité]]' - puis-je faire quelque chose de mieux que 'Source (futures) .mapAsyncUnordered (1) (identité) .flatten (FlattenStrategy.concat)'? Je voudrais que cet aplatisse soit non ordonné et supporte aussi le niveau de parallélisme. – Tvaroh

+0

Je suis actuellement (chaque fois que je trouve des heures ou deux) travaillant sur 'flatten (FlattenStrategy.merge)' qui ferait ce que vous voulez. Dans le même temps, vous pouvez utiliser 'mapAsyncUnordered (par) (identity)' + une implémentation FlexiMerge? –

+0

Viktor, je n'ai pas regardé FlexiMerge, je vais essayer. Je vous remercie. – Tvaroh

5

l'une des façons les plus faciles à nourrir une source est par un acteur:

import scala.concurrent.Future 
import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 

implicit val system = ActorSystem("MySystem") 

def run(actor: ActorRef): Unit = { 
    import system.dispatcher 
    Future { Thread.sleep(100); actor ! 1 } 
    Future { Thread.sleep(200); actor ! 2 } 
    Future { Thread.sleep(300); actor ! 3 } 
} 

val source = Source 
    .actorRef[Int](0, OverflowStrategy.fail) 
    .mapMaterializedValue(ref ⇒ run(ref)) 
implicit val m = ActorMaterializer() 

source runForeach { int ⇒ 
    println(s"received: $int") 
} 

L'acteur est créé via la méthode Source.actorRef et rendu disponible via la méthode mapMaterializedValue. run prend simplement l'acteur et lui envoie toutes les valeurs terminées, qui peuvent ensuite être accessibles par source. Dans l'exemple ci-dessus, les valeurs sont envoyées directement dans le futur, mais cela peut bien sûr être fait partout (par exemple dans l'appel onComplete sur le Futur).

+0

BTW, pourquoi le premier argument 'actorRef' est' 0'? Est-ce que ça importe? – Tvaroh

+0

Si le consommateur peut prendre tous les éléments hors de la source, vous n'avez sûrement pas besoin d'un cache, par conséquent, il est 0. – sschaef

+0

J'ai essayé, mais zéro ne fonctionnait pas (était lancer exception). La taille égale à la taille de la collection de contrats à terme a bien fonctionné. – Tvaroh