Je dois créer un akka.stream.scaladsl.Source[T, Unit]
à partir d'une collection de Future[T]
.Dans akka-stream comment créer une source non ordonnée à partir d'une collection de futures
ayant par exemple une collection de futurs entiers de retour,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
comment créer un
val source: Source[Int, Unit] = ???
de lui.
Je ne peux pas utiliser le combinateur Future.sequence
, car j'attendrais que chaque futur se termine avant d'obtenir quoi que ce soit de la source. Je veux obtenir des résultats dans toute commande dès que tout futur se termine.
Je comprends que Source
est une API purement fonctionnelle et qu'elle ne devrait pas fonctionner avant de la matérialiser. Donc, mon idée est d'utiliser un Iterator
(qui est paresseux) pour créer une source:
Source {() =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
Mais ce serait une source de contrats à terme, et non des valeurs réelles. Je pourrais également bloquer sur next
en utilisant Await.result(future)
mais je ne suis pas sûr quel fil de discussion sera bloqué. Aussi, cela appellera les futurs séquentiellement, alors que j'ai besoin d'une exécution parallèle.
MISE À JOUR 2: il est avéré qu'il y avait un moyen de faire beaucoup plus facile il (grâce à Viktor Klang):
Source(futures).mapAsync(1)(identity)
MISE À JOUR: voici ce que j'ai basé sur @sschaef réponse:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
def run(actor: ActorRef): Unit = {
futures.foreach { future =>
future.onComplete {
case Success(value) =>
actor ! value
case Failure(NonFatal(t)) =>
actor ! Status.Failure(t) // to signal error
}
}
Future.sequence(futures).onSuccess { case _ =>
actor ! Status.Success(()) // to signal stream's end
}
}
Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}
// ScalaTest tests follow
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"futuresToSource" should "convert futures collection to akka-stream source" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future(3)
whenReady {
futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
} { results =>
results should contain theSameElementsAs Seq(1, 2, 3)
}
}
it should "fail on future failure" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future.failed(new RuntimeException("future failed"))
whenReady {
futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
} { t =>
t shouldBe a [RuntimeException]
t should have message "future failed"
}
}
Et si mes contrats à terme sont de type 'Future [Source [T, Unité]]' - puis-je faire quelque chose de mieux que 'Source (futures) .mapAsyncUnordered (1) (identité) .flatten (FlattenStrategy.concat)'? Je voudrais que cet aplatisse soit non ordonné et supporte aussi le niveau de parallélisme. – Tvaroh
Je suis actuellement (chaque fois que je trouve des heures ou deux) travaillant sur 'flatten (FlattenStrategy.merge)' qui ferait ce que vous voulez. Dans le même temps, vous pouvez utiliser 'mapAsyncUnordered (par) (identity)' + une implémentation FlexiMerge? –
Viktor, je n'ai pas regardé FlexiMerge, je vais essayer. Je vous remercie. – Tvaroh