Je veux mettre en forme ce flux: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5,. .. à ces séances:Comment faire un flux de session avec Apache Flink?
1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5
J'ai écrit CustomTrigger pour détecter lorsque les éléments de flux changent de 1 à 2 (2 à 3, 3 à 0 et ainsi de suite), puis tirer sur la gâchette. Mais ce n'est pas la solution, car quand je traite le premier élément de 2, et déclenche le trigger, la fenêtre sera [1,1,1,2] mais je dois déclencher le trigger sur le dernier élément de 1.
Voici le pesudo de ma fonction onElement dans ma classe de déclenchement personnalisée:
override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (prevState == element.value) {
prevState = element.value
TriggerResult.CONTINUE
} else {
prevState = element.value
TriggerResult.FIRE
}
}
Comment puis-je résoudre ce problème?