2016-12-02 1 views
1

J'essaie d'obtenir la liste des événements persistants et de l'envoyer en réponse (Action.async). Mais je ne suis pas en mesure de convertir PersistenceQuery résultats à Future objet. Voici le codeCadre de lecture, Akka Persistence, PersistenceQuery, impossible de convertir la source en objet futur pour Action.async

val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue) 
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e => 
     e.event match { 
     case l: String => 
      Future(Json.parse(l)) 
     } 
    } 
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => { 
     println(a) 
     a :+ b 
    }) 
finalResult 

Je suis en mesure de voir dans les impressions runFold, mais finalResult n'a jamais été retourné. J'ai même essayé de Await, même après avoir attendu quelques minutes, il n'est jamais revenu. Cette finalResult montre que toute l'activité de l'utilisateur voulait l'envoyer comme réponse Action.async. S'il vous plaît laissez-moi savoir quel est le moyen de convertir Source-Future objet

Répondre

1

correctif pour ce problème est préférable d'utiliser currentEventsByPersistenceId, cela renvoie tous les événements actuels. eventsByPersistenceId avec take termine le flux. Mais dans mon cas take fonction nécessite 10 événements à collecter, dans mon cas, j'ai eu moins de 10, de sorte que le flux ne se termine pas jusqu'à ce qu'il accumule au moins 10 événements. Donc, le résultat n'a jamais été retourné.

+0

Merci! Cela devrait être mis en évidence dans les docs. – Ikrom