2017-09-11 7 views
0

Je veux obtenir le nombre de mots avec la fonction fenêtré pour chaque mot:Comment obtenir compteur de chaque mot dans le flux fenêtré sur Flink

Si j'utilise ce code:

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 

I get sortie après 5 seconde (temps fenêtré) comme ceci:

entrée

:

first input : hello 
seconde input : hello 
third input : word 
fifth input : hello 
sixth input : word 

sortie

first output : hello : 3 | word : 2 

mais je veux avoir la sortie avec le compte pour chaque mot.

comme ça: entrée:

first input: hello 
seconde input:hello 
third input:word 
fifth input:hello 
sixth input:word 

eteint:

first output: hello : 1 
seconde output:hello : 2 
third output:word : 1 
fifth output:hello : 3 
sixth output:word : 2 

comment puis-je faire cela?

Répondre

0

Le programme d'exemple de Kafka Streaming API ne serait-il pas exactement ce que vous cherchez? https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#example-program

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 
+0

L'exemple d'imprimer tous les 5 deuxième et je veux imprimer pour chaque événement. – FlinkNoob

+0

Pour clarifier un peu, demandez-vous des fenêtres de temps de traitement (par opposition à des fenêtres d'événements), et les fenêtres sont-elles alignées (par exemple, voulez-vous que les compteurs de toutes les clés soient remis à zéro en même temps? cinq secondes)? –

+0

Oui, je demande des fenêtres de temps de traitement, je ne veux pas nettoyer les données à zéro. Je veux juste compter le nombre des mêmes mots des 5 dernières secondes pour chaque mot. – FlinkNoob