2016-06-15 2 views
1

Je travaille sur le projet poc en java en utilisant kafka -> flink -> recherche élastique.Comment compter le nombre de "clients" dans un état avec flink de manière distribuée en fonction des événements de changement d'état? J'aurais besoin d'objets avec état

Sur kafka sera produit un nombre imprévisible d'événements de 0 jusqu'à des milliers d'événements/sec comme sur un sujet spécifique.

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."} 

Flink consommera cette événement et devrait couler chaque seconde dans élastique recherche le nombre d'événements dans chaque ex Etat:

{"stateA":54, "stateB":100, ... "stateJ":34} 

J'ai 10 états: [Created, ... , Deleted] avec un cycle de vie moyenne de 15 minutes . L'état peut changer deux fois par seconde. Théoriquement, de nouveaux états pourraient être ajoutés.

Pour couler cours d'eau chaque seconde je pense à utiliser les fenêtres de temps de Flink https://flink.apache.org/news/2015/12/04/Introducing-windows.html

Le problème est que j'ai besoin des objets stateful avec des informations sur guid->previous-state et stateX->count afin de pouvoir augmenter/diminuer le nombre lorsqu'un nouvel événement se produit

Je trouve un projet de document sur le traitement à la vapeur stateful https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing

Je suis nouveau Flink et le traitement des flux, je n'ai pas creuser dans le traitement des flux de Flink stateful encore. Pour la première phase, je pense utiliser des objets statiques pour cela, mais cette approche ne fonctionnera pas lorsque plusieurs instances de flink seront lancées.

Je veux vous demander:

  1. Que pensez-vous de cette approche?
  2. Le flink est-il adapté à ce type de traitement de flux?
  3. Quelle sera votre approche pour résoudre ce problème?

De même, j'apprécierais certains extraits de code pour la solution de flux avec état fenêtré (ou d'autres solutions).

Merci,

Répondre

1

Que diriez-vous quelque chose comme ce qui suit?

Il utilise des fenêtres de 15 minutes, après quoi l'état de la fenêtre sera nettoyé. Il utilise également un déclencheur personnalisé qui évalue la fenêtre toutes les secondes. En ce qui concerne l'opération de fenêtrage, il existe une fonction ReduceFunction qui conserve simplement le dernier état pour chaque guid et une WindowFunction qui émet un tuple (state, 1). Nous clavons ensuite par cet état et somme cela. Je pense que cela devrait vous donner le résultat que vous recherchez.

val env = StreamExecutionEnvironment.getExecutionEnvironment() 
val stream = env.addSource(new FlinkKafkaProducer(...)) 

val results = stream 
    .keyBy(_.guid) 
    .timeWindow(Time.minutes(15)) 
    .trigger(ProcessingTimeTriggerWithPeriodicFirings(1000)) 
    .apply(
    (e1, e2) => e2, 
    (k, w, i, c: Collector[(String, Long)]) => { 
     if (i.head != null) c.collect((i.head.state, 1)) 
    } 
) 
    .keyBy(0) 
    .timeWindow(Time.seconds(1)) 
    .sum(1) 
    .addSink(new ElasticsearchSink<>(...)) 

env.execute("Count States") 

ProcessingTimeTriggerWithPeriodicFirings est définie comme suit:

object ProcessingTimeTriggerWithPeriodicFirings { 
    def apply(intervalMs: Long) = { 
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs) 
    } 
} 

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long) 
    extends Trigger[Event, TimeWindow] { 

    private val startTimeDesc = 
    new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L) 

    override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { 
    val startTime = ctx.getPartitionedState(startTimeDesc) 
    if (startTime.value == 0) { 
     startTime.update(window.getStart) 
     ctx.registerProcessingTimeTimer(window.getEnd) 
     ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs) 
    } 
    TriggerResult.CONTINUE 
    } 

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { 
    if (time == window.getEnd) { 
     TriggerResult.PURGE 
    } 
    else { 
     ctx.registerProcessingTimeTimer(time + intervalMs) 
     TriggerResult.FIRE 
    } 
    } 

    override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { 
    TriggerResult.CONTINUE 
    } 
}