Si votre but ultime est « répondre aux questions sur les derniers événements N » alors vous pouvez écrire un scaning débit à l'aide d'un circular buffer:
import scala.collection.immutable
type CircularBuffer[T] = immutable.Vector[T]
def emptyCircularBuffer[T] : CircularBuffer[T] = immutable.Vector.empty[T]
def addToCircularBuffer[T](maxSize : Int)(buffer : CircularBuffer[T], item : T) : CircularBuffer[T] =
if(maxSize > 0)
buffer.drop(buffer.size - maxSize + 1) :+ item
else
buffer
import akka.stream.scaladsl.Flow
def circularBufferFlow[T](N : Int) =
Flow[T].scan(emptyCircleBuffer[T])(addToCircleBuffer[T](N))
Ce flux comportera entre 0 et N éléments, et émet un nouveau tampon avec chaque mise à jour:
Source(1 to 10).via(circularBufferFlow[Int](3))
.runWith(Sink.foreach(println))
//Vector()
//Vector(1)
//Vector(1, 2)
//Vector(1, 2, 3)
//Vector(2, 3, 4)
//Vector(3, 4, 5)
//...
pour obtenir le dernier tampon, vous pouvez utiliser exclusivement Sink.last
pour matérialiser l'ensemble du flux dans un Future
.
Si, cependant, votre objectif est vraiment d'inverser un flux alors je considérerais cette sage car un flux pourrait durer infini qui entraînerait votre cache consommer éventuellement l'ensemble de la mémoire de la machine virtuelle Java et écraser l'application .