2017-08-15 3 views
0

Comme mentionné dans la documentation Flink, j'ai pu lire la saisie de texte à partir du serveur texte en utilisant une socket locale en utilisantenvoyer DATASTREAM de la prise VM/prise à distance au programme Flink en cours d'exécution sur l'hôte OS

[email protected]:~$ nc -l 12345 

et recevoir sur le programme Flink utilisant

DataStream<String> text = env.socketTextStream("localhost", 12345); 

text.print(); 

env.execute(); 

Cependant, comme je suis simulant un scénario, je veux obtenir le flux de données à partir d'une machine virtuelle (puis différents éventuellement VM) et l'envoyer au programme CEP en cours d'exécution sur le système d'exploitation hôte.

Alors, je l'ai installé VM, en utilisant Vagrant et SSH dans l'aide vagrant ssh

  1. le nom d'hôte OS invité precise64

  2. adresse IP à l'aide ifconfig = 10.0 .2.15

Maintenant, ce que je veux faire, pour l'instant, est de voir si je peux envoyer des données à partir de VM et les recevoir dans le programme Flink de la même manière que j'ai pu le faire dans l'environnement local.

J'ai ouvert la prise Netcat sur os invité à l'aide

[email protected]:~$ nc -l 12345 

et moi avons essayé de le recevoir sur le programme d'accueil à l'aide, mais nous avons eu erreur

DataStream<String> text = env.socketTextStream("precise64", 12345); 

text.print(); 

env.execute(); 

J'ai aussi essayé [email protected] ci-dessus, mais je pense que je le fais mal.

des idées, comment dois-je aborder envoyer DataStream de VM à l'hôte du programme Flink

Les suggestions sont les bienvenues, merci d'avance!

Répondre

1

Vous pouvez essayer ceci:

1.Programme:

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.api.windowing.time.Time 

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 

2.Après couru ci-dessus program.You pourrait commencer à ce sujet.

nc -lk 9999 

Cela fonctionnera.