2016-01-14 2 views
0

J'ai un PersistentActor que persist s événements et je voudrais les lire dans l'ordre inverse. Plus précisément, je voudrais obtenir un Source pour les événements qui émettent les événements dans l'ordre inverse. Ceci est utile pour répondre aux questions sur les N derniers événements qui se sont produits pour ce PersistentActor. Quelle est la bonne approche pour ce problème? Cette information devrait être gardée en cache dans la vue ou il y a un moyen que je peux paresseusement interroger sur les événements dans l'ordre inverse?Utiliser eventsByPersistenceId pour obtenir la source des événements dans l'ordre inverse

Merci!

Répondre

1

Si votre but ultime est « répondre aux questions sur les derniers événements N » alors vous pouvez écrire un scaning débit à l'aide d'un circular buffer:

import scala.collection.immutable 

type CircularBuffer[T] = immutable.Vector[T] 

def emptyCircularBuffer[T] : CircularBuffer[T] = immutable.Vector.empty[T] 

def addToCircularBuffer[T](maxSize : Int)(buffer : CircularBuffer[T], item : T) : CircularBuffer[T] = 
    if(maxSize > 0) 
    buffer.drop(buffer.size - maxSize + 1) :+ item 
    else 
    buffer 

import akka.stream.scaladsl.Flow 

def circularBufferFlow[T](N : Int) = 
    Flow[T].scan(emptyCircleBuffer[T])(addToCircleBuffer[T](N)) 

Ce flux comportera entre 0 et N éléments, et émet un nouveau tampon avec chaque mise à jour:

Source(1 to 10).via(circularBufferFlow[Int](3)) 
       .runWith(Sink.foreach(println)) 

//Vector() 
//Vector(1) 
//Vector(1, 2) 
//Vector(1, 2, 3) 
//Vector(2, 3, 4) 
//Vector(3, 4, 5) 
//... 

pour obtenir le dernier tampon, vous pouvez utiliser exclusivement Sink.last pour matérialiser l'ensemble du flux dans un Future.

Si, cependant, votre objectif est vraiment d'inverser un flux alors je considérerais cette sage car un flux pourrait durer infini qui entraînerait votre cache consommer éventuellement l'ensemble de la mémoire de la machine virtuelle Java et écraser l'application .