J'essaie de mettre en œuvre un flux de filtrage de flux akka qui recueille des statistiques sur les données traitées et matérialise les statistiques qui en résultent.État dynamique matérialisant l'état final
class SFilter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("SFilter.in")
val out = Outlet[A]("SFilter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var positive: Long = 0
var negative: Long = 0
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) {
push(out, elem)
positive += 1
} else {
pull(in)
negative += 1
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Jusqu'à présent, si bon, mais je me SFilter[A]
être de type Flow[A,A,(Long,Long)]
. Comment puis-je matérialiser (positive,negative)
à la fin de la comnputation?
Peut-être que vous n'êtes pas à la recherche de cette réponse, mais ce que vous semblez faire est essentiellement un 'fold'. Donc, sans écrire votre propre scène, vous pourriez simplement réutiliser le combinateur 'fold' existant. – jrudolph
@jrudolph oui j'y pense, mais je préfère utiliser fold avec un accumulateur immuable. Je compare cette approche avec une version mutable (mais sûre). – paradigmatic