2017-06-18 5 views
1

Je veux mettre en forme ce flux: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5,. .. à ces séances:Comment faire un flux de session avec Apache Flink?

1,1,1 
2,2,2,2,2 
3,3,3,3,3,3,3 
0 
3,3,3 
5 

J'ai écrit CustomTrigger pour détecter lorsque les éléments de flux changent de 1 à 2 (2 à 3, 3 à 0 et ainsi de suite), puis tirer sur la gâchette. Mais ce n'est pas la solution, car quand je traite le premier élément de 2, et déclenche le trigger, la fenêtre sera [1,1,1,2] mais je dois déclencher le trigger sur le dernier élément de 1.

Voici le pesudo de ma fonction onElement dans ma classe de déclenchement personnalisée:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { 
    if (prevState == element.value) { 
     prevState = element.value 
     TriggerResult.CONTINUE 
    } else { 
     prevState = element.value 
     TriggerResult.FIRE 
    } 
} 

Comment puis-je résoudre ce problème?

Répondre

2

Je pense qu'un FlatMapFunction avec un ListState est le moyen le plus simple d'implémenter ce cas d'utilisation.

Lorsqu'un nouvel élément arrive (c'est-à-dire que la méthode flatMap() est appelée), vous vérifiez si la valeur a changé. Si la valeur n'a pas changé, vous ajoutez l'élément à l'état. Si la valeur a changé, vous émettez l'état actuel de la liste en tant que session, effacez la liste et insérez le nouvel élément en tant que premier état de la liste. Cependant, vous devez garder à l'esprit que cela suppose que l'ordre des éléments est préservé. Le flink assure dans une partition, c'est-à-dire, tant que les éléments ne sont pas mélangés et que tous les opérateurs fonctionnent avec le même parallélisme.