2015-11-21 1 views
5

J'ai un flux infini d'événements:Comment regrouper des événements entrants provenant d'un flux infini?

(timestamp, session_uid, traffic) 

à savoir

... 
(1448089943, session-1, 10) 
(1448089944, session-1, 20) 
(1448089945, session-2, 50) 
(1448089946, session-1, 30) 
(1448089947, session-2, 10) 
(1448089948, session-3, 10) 
... 

Ces événements que je veux groupe par session_uid et calculer la somme du trafic pour chaque session.

J'ai écrit un flux akka-streams qui fonctionne bien avec l'utilisation de flux fini groupBy (mon code de base sur this exemple de livre de cuisine). Mais avec un flux infini cela ne fonctionnera pas car la fonction devrait traiter tous les flux entrants et seulement après cela sera prêt à retourner le résultat.

Je pense que je devrais mettre en œuvre le groupement avec timeout, c'est-à-dire si je ne reçois pas d'événement avec stream_uid spécifié à plus de 5 minutes de la dernière, je devrais retourner les événements groupés pour ce session_uid. Mais comment l'implémenter utiliser akka-streams seulement?

+0

J'utilise 'akka-streams' (comme mentionné dans les balises). –

+0

Au lieu d'une limite de temps, y a-t-il un certain nombre de mises à jour? Par exemple. produire un regroupement toutes les 10 000 mises à jour. –

+0

Si vous avez une vapeur infinie, comment le groupe pourrait-il "traiter tous les flux entrants et seulement après cela sera prêt à retourner le résultat"? Si le flux est vraiment infini, cela n'arrivera jamais. –

Répondre

3

je suis venu avec une solution un peu gnarly mais je pense qu'il fait le travail.

L'idée essentielle est d'utiliser la méthode keepAlive de Source comme temporisateur qui déclenchera l'achèvement. Mais pour ce faire, nous devons d'abord abstraire un peu les données. La minuterie devra envoyer le déclencheur ou d'une autre valeur de la source tuple d'origine, donc:

sealed trait Data 

object TimerTrigger extends Data 
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data 

convertir ensuite notre source de tuples à une source de valeurs. Nous allons encore utiliser groupBy pour faire des regroupements similaires à votre cas de flux fini:

val originalSource : Source[(Long, String, Int), Unit] = ??? 

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid 

val groupedDataSource : Source[IDGroup, Unit] = 
    originalSource.map(t => Value(t._1, t._2, t._3)) 
       .groupBy(_.session_uid) 

La partie la plus délicate est la manipulation des groupes qui sont juste TUPLES: (String, Source[Value,Unit]). Nous avons besoin de la minuterie pour nous informer si le temps est écoulé pour que nous besoin d'une autre abstraction pour savoir si nous sommes encore ou calcul, nous avons terminé le calcul en raison d'un délai d'attente:

sealed trait Sum { 
    val sum : Int 
} 
case class StillComputing(val sum : Int) extends Sum 
case class ComputedSum(val sum : Int) extends Sum 

val zeroSum : Sum = StillComputing(0) 

Maintenant, nous pouvons drainer la source de chaque groupe. Le keepAlive enverra un TimerTrigger si la source de valeurs ne produit pas quelque chose après le timeOut. Le Data du keepAlive est alors modèle en correspondance avec soit un TimerTrigger ou une nouvelle valeur de la source originale:

val evaluateSum : ((Sum , Data)) => Sum = { 
    case (runningSum, data) => { 
    data match { 
     case TimerTrigger => ComputedSum(runningSum.sum) 
     case v : Value => StillComputing(runningSum.sum + v.traffic) 
    } 
    } 
}//end val evaluateSum 

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid 

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
    idGroup._1 -> idGroup._2.keepAlive(timeOut,() => TimerTrigger) 
          .scan(zeroSum)(evaluateSum) 
          .collect {case c : ComputedSum => c.sum} 
          .runWith(Sink.head) 

La collection est appliquée à une fonction partielle qui correspond seulement une somme finie, donc l'évier est atteint seulement après que la minuterie a tiré.

Nous appliquons ensuite ce gestionnaire à chaque groupe qui sort:

val timeOut = FiniteDuration(5, MINUTES) 

val sumSource : Source[SumResult, Unit] = 
    groupedDataSource map handleGroup(timeOut) 

Nous avons maintenant une source de (String,Future[Int]) qui est le session_uid et un avenir de la somme du trafic pour cet identifiant.

Comme je l'ai dit, alambiqué mais répond aux exigences. En outre, je ne suis pas tout à fait sûr de ce qui se passe si un uid qui était déjà groupé et a été expiré, mais ensuite une nouvelle valeur avec le même uid vient.

+0

Merci beaucoup! Votre réponse me donne plusieurs idées utiles mais elle pose plusieurs problèmes dans le contexte de ma tâche. Par exemple, il a un [problème de mémoire de consommation] (http://stackoverflow.com/questions/33865423/is-groupby-leaking-in-akka-stream) dans la fonction 'groupBy'. –

+0

@maxd Toute solution aura un problème potentiel de mémoire car le nombre d'uids actifs peut augmenter et donc le nombre de sommes en cours pourrait augmenter. Mais vous êtes les bienvenus, joyeux piratage. –

-1

peut-être vous pouvez simplement mettre en œuvre par l'acteur

case class SessionCount(name: String) 

class Hello private() extends Actor { 
    var sessionMap = Map[String, Int]() 

    override def receive: Receive = { 
    case (_, session: String, _) => 
     sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1)) 

    case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0) 
    } 
} 


object Hello { 
    private val actor = ActorSystem.apply().actorOf(Props(new Hello)) 
    private implicit val timeOver = Timeout(10, TimeUnit.SECONDS) 
    type Value = (String, String, String) 

    def add(value: Value) = actor ! value 

    def count(name:String) = (actor ? SessionCount(name)).mapTo[Int] 
} 
+0

Je savais que possible de résoudre ma tâche utiliser seulement les acteurs Akka, mais je pense que akka-streams devrait avoir une solution pour ma tâche aussi. Et je veux le trouver. FYI: Votre exemple de code calcule le nombre de sessions mais pas le trafic total pour chaque session. De plus, il n'a pas géré correctement le flux infini. –

+0

comment est-il utiliser 'words.forearch (e => Hello.add (e))' –

1

Cela semble être le cas d'utilisation pour Source.groupedWithin:

def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat] 

« Chunk jusqu'à ce flux en groupes d'éléments reçus dans une fenêtre de temps, ou limité par le nombre donné d'éléments, quoi qu'il arrive en premier. »

Here's the link to the docs

+0

Ceci n'est pas acceptable car la session peut être en plusieurs parties. –