2015-09-10 1 views
2

J'essaie d'envelopper certains appels de blocage dans Future. Le type de retour est Seq[User]User est un case class. Ce qui suit ne serait tout simplement pas compiler avec des plaintes de diverses versions surchargées étant présents. Aucune suggestion? J'ai essayé presque toutes les variations est Source.apply sans aucune chance.Scala Akka Stream: Comment passer à travers un Seq

// All I want is Seq[User] => Future[Seq[User]] 

def findByFirstName(firstName: String) = { 
    val users: Seq[User] = userRepository.findByFirstName(firstName) 

    val sink = Sink.fold[User, User](null)((_, elem) => elem) 

    val src = Source(users) // doesn't compile 

    src.runWith(sink) 
} 

Répondre

5

Tout d'abord, je suppose que vous utilisez la version 1.0 de akka-http-experimental depuis l'API peut changé de la version précédente.

La raison pour laquelle votre code ne compile pas est que le akka.stream.scaladsl.Source$.apply() exige scala.collection.immutable.Seq au lieu de scala.collection.mutable.Seq.

Par conséquent, vous devez convertir une séquence mutable en une séquence immuable en utilisant la méthode to[T].

Document: akka.stream.scaladsl.Source

De plus, comme vous voyez le document, Source$.apply() accepte ()=>Iterator[T] que vous pouvez également passer ()=>users.iterator comme argument.

Puisque Sink.fold(...) renvoie la dernière expression évaluée, vous pouvez donner un Seq() vide comme premier argument, parcourir sur le users en ajoutant l'élément à la séquence, et enfin obtenir le résultat.

Cependant, il pourrait y avoir une meilleure solution qui peut créer un Sink qui place chaque expression évaluée dans Seq, mais je ne l'ai pas trouvé.

Le code suivant fonctionne.

import akka.actor._ 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Source,Sink} 
import scala.concurrent.ExecutionContext.Implicits.global 

case class User(name:String) 

object Main extends App{ 
    implicit val system = ActorSystem("MyActorSystem") 
    implicit val materializer = ActorMaterializer() 
    val users = Seq(User("alice"),User("bob"),User("charlie")) 

    val sink = Sink.fold[Seq[User], User](Seq())(
     (seq, elem) => 
     {println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem}) 

    val src = Source(users.to[scala.collection.immutable.Seq]) 
    // val src = Source(()=>users.iterator) // this also works 

    val fut = src.runWith(sink) // Future[Seq[User]] 
    fut.onSuccess({ 
     case x=>{ 
     println(s"result => ${x}") 
     } 
    }) 
} 

La sortie du code ci-dessus est

elem => User(alice)  | seq => List() 
elem => User(bob)  | seq => List(User(alice)) 
elem => User(charlie) | seq => List(User(alice), User(bob)) 
result => List(User(alice), User(bob), User(charlie)) 
+0

Oui, j'utilise la version 1.0. Et ta réponse est exactement ce que je cherchais. Il continue de m'amuser que certaines personnes ne répondent tout simplement pas à la question posée avant de faire d'autres suggestions. Si c'est une consolation, apparemment, je ne suis pas le premier à avoir trébuché sur le cas que 'Seq' tel que défini dans l'objet package scala n'est * pas * un alias pour la version immuable. [Here's] (https://groups.google.com/forum/#!topic/scala-internals/g_-gIWgB8Os) une grande discussion à ce sujet impliquant Martin Odersky. –

2

Si vous avez juste besoin d'avenir [Seq [utilisateurs]] DonT utiliser AKKA flux mais futures

import scala.concurrent._ 
import ExecutionContext.Implicits.global 
val session = socialNetwork.createSessionFor("user", credentials) 
val f: Future[List[Friend]] = Future { 
    session.getFriends() 
} 
+0

Où est le contrôle d'accès concurrentiel, la gestion des exceptions, l'exploitation forestière, AKKA dispose ainsi magnifiquement? De plus j'utilise 'akka-http' pour ma ressource, j'ai déjà Akka Streams, je pourrais aussi bien l'utiliser. –

+0

Utiliser 'Future's semble être plus approprié pour' userRepository' ou tout autre référentiel. La raison en est que votre référentiel représente une sorte de mécanisme d'interrogation et qu'il renvoie vos données dans leur ensemble. Attendu que les cours d'eau devraient être utilisés dans des endroits où les données arrivent dans des mandrins ou des portions plus petites et peuvent être fournies à l'infini. –

+0

@VanyaStanislavciuc Rien dans la spécification Reactive Streams ou Akka Streams ne dit qu'un flux doit être infini ou fragmenté. Un flux segmenté est un cas d'utilisation, pas une nécessité. D'ailleurs, même si je ne fais que passer le 'Seq' maintenant, je pourrais très bien ajouter une logique de traitement dans le futur. Il est toujours préférable de concevoir pour l'avenir, surtout lorsque les outils sont déjà disponibles. –