2017-09-11 6 views
0

Est-il possible dans Apache Flink, d'ajouter dynamiquement un nouveau flux de données pendant l'exécution sans redémarrer le Job?Apache Flink ajoute dynamiquement un nouveau flux

Pour autant que je compris, un programme Flink d'habitude ressemble à ceci:

val env = StreamExecutionEnvironment.getExecutionEnvironment() 
val text = env.socketTextStream(hostname, port, "\n") 
val windowCounts = text.map... 

env.execute("Socket Window WordCount") 

Dans mon cas, il est possible, que par exemple un nouveau périphérique est démarré et, par conséquent, un autre flux doit être traité. Mais comment ajouter ce nouveau flux à la volée?

Répondre

1

Il n'est pas possible d'ajouter de nouveaux flux lors de l'exécution à un programme Flink.

La manière de résoudre ce problème consiste à avoir un flux qui contient tous les événements entrants (par exemple un sujet Kafka dans lequel vous ingérer tous les flux individuels). Les événements doivent avoir une clé identifiant de quel flux ils proviennent. Cette clé peut ensuite être utilisée pour keyBy le flux et pour appliquer une logique de traitement par clé.

Si vous voulez lire à partir de plusieurs sockets, alors vous pouvez écrire votre propre SourceFunction qui lit à partir de certaines entrées (par exemple à partir d'une prise fixe) les ports pour ouvrir une socket pour. Ensuite, en interne, vous pouvez maintenir toutes ces prises ouvertes et lire d'une manière circulaire à partir d'eux.