je suis venu avec une solution un peu gnarly mais je pense qu'il fait le travail.
L'idée essentielle est d'utiliser la méthode keepAlive
de Source comme temporisateur qui déclenchera l'achèvement. Mais pour ce faire, nous devons d'abord abstraire un peu les données. La minuterie devra envoyer le déclencheur ou d'une autre valeur de la source tuple d'origine, donc:
sealed trait Data
object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
convertir ensuite notre source de tuples à une source de valeurs. Nous allons encore utiliser groupBy
pour faire des regroupements similaires à votre cas de flux fini:
val originalSource : Source[(Long, String, Int), Unit] = ???
type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid
val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)
La partie la plus délicate est la manipulation des groupes qui sont juste TUPLES: (String, Source[Value,Unit])
. Nous avons besoin de la minuterie pour nous informer si le temps est écoulé pour que nous besoin d'une autre abstraction pour savoir si nous sommes encore ou calcul, nous avons terminé le calcul en raison d'un délai d'attente:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum
val zeroSum : Sum = StillComputing(0)
Maintenant, nous pouvons drainer la source de chaque groupe. Le keepAlive
enverra un TimerTrigger
si la source de valeurs ne produit pas quelque chose après le timeOut
. Le Data
du keepAlive est alors modèle en correspondance avec soit un TimerTrigger ou une nouvelle valeur de la source originale:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum
type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid
def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut,() => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)
La collection est appliquée à une fonction partielle qui correspond seulement une somme finie, donc l'évier est atteint seulement après que la minuterie a tiré.
Nous appliquons ensuite ce gestionnaire à chaque groupe qui sort:
val timeOut = FiniteDuration(5, MINUTES)
val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)
Nous avons maintenant une source de (String,Future[Int])
qui est le session_uid et un avenir de la somme du trafic pour cet identifiant.
Comme je l'ai dit, alambiqué mais répond aux exigences. En outre, je ne suis pas tout à fait sûr de ce qui se passe si un uid qui était déjà groupé et a été expiré, mais ensuite une nouvelle valeur avec le même uid vient.
J'utilise 'akka-streams' (comme mentionné dans les balises). –
Au lieu d'une limite de temps, y a-t-il un certain nombre de mises à jour? Par exemple. produire un regroupement toutes les 10 000 mises à jour. –
Si vous avez une vapeur infinie, comment le groupe pourrait-il "traiter tous les flux entrants et seulement après cela sera prêt à retourner le résultat"? Si le flux est vraiment infini, cela n'arrivera jamais. –