2015-04-30 1 views
3

J'essaye d'écrire un calcul dans Flink qui nécessite deux phases.Réutiliser les résultats du premier calcul en second calcul

Dans la première phase, je commence à partir d'un fichier texte, et effectue une estimation des paramètres, obtenant ainsi un objet Java représentant un modèle statistique des données.

Dans la deuxième phase, j'aimerais utiliser cet objet pour générer des données pour une simulation.

Je ne suis pas sûr comment faire ceci. J'ai essayé avec un LocalCollectionOutputFormat, et cela fonctionne localement, mais quand je déploie le travail sur un cluster, j'obtiens un NullPointerException - ce qui n'est pas vraiment surprenant.

Quelle est la manière de faire de Flink?

Voici mon code:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
GlobalConfiguration.includeConfiguration(configuration); 

// Phase 1: read file and estimate model 
DataSource<Tuple4<String, String, String, String>> source = env 
     .readCsvFile(args[0]) 
     .types(String.class, String.class, String.class, String.class); 

List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>(); 
// Processing here... 
....output(new LocalCollectionOutputFormat<>(bayesResult)); 

env.execute("Bayes"); 

DataSet<BTP> btp = env 
     .createInput(new BayesInputFormat(bayesResult.get(0))) 
// Phase 2: BayesInputFormat generates data for further calculations 
// .... 

Ceci est l'exception que je reçois:

Error: The program execution failed: java.lang.NullPointerException 
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86) 
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176) 
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) 
    at java.lang.Thread.run(Thread.java:745) 

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException 
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86) 
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176) 
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) 
    at java.lang.Thread.run(Thread.java:745) 

    at org.apache.flink.client.program.Client.run(Client.java:328) 
    at org.apache.flink.client.program.Client.run(Client.java:294) 
    at org.apache.flink.client.program.Client.run(Client.java:288) 
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55) 
    at it.list.flink.test.Test01.main(Test01.java:62) 
    ... 

Répondre

3

Avec la dernière version (0.9-étape-1) une méthode collect() a été ajouté à Flink

public List<T> collect() 

qui récupère un DataSet<T> en tant que List<T> dans le programme pilote. collect() déclenchera également une exécution immédiate du programme (pas besoin d'appeler ExecutionEnvironment.execute()). À l'heure actuelle, il existe une limite de taille pour les ensembles de données d'environ 10 Mo. Si vous n'évaluez pas les modèles dans le programme pilote, vous pouvez également chaîner les deux programmes ensemble et émettre le modèle sur le côté en attachant un collecteur de données. Ce sera plus efficace, car les données ne feront pas l'aller-retour sur la machine client.

+0

Merci C'est très utile. J'ai essayé de chaîner, mais le résultat de la première exécution est essentiellement un ensemble de paramètres, c'est-à-dire une seule ligne. Distribuant cette ligne aux clients, je peux générer des données en parallèle sur chaque client avec InputFormat, alors qu'avec le chaînage je ne pouvais pas trouver un moyen de faire la même chose. – Flavio

+0

Je ne sais pas si j'ai bien compris, mais vous pourriez peut-être aussi utiliser le truc suivant: –

+0

Placez les paramètres dans une variable de diffusion et distribuez-la à un ensemble d'opérateurs 'flatMap'. Chacun de ces opérateurs 'flatMap' aura accès à tous les paramètres et peut émettre autant d'enregistrements que vous le souhaitez. Cependant, vous devez déclencher l'exécution de 'flatMaps' avec un enregistrement d'entrée factice, qui pourrait être servi à partir d'un format d'entrée de collection contenant un' Integer' pour chaque instance 'flatMap' parallèle. Les données de la collection IF doivent être «rééquilibrées» avant d'entrer dans les opérateurs 'flatMap'. –

0

Si vous utilisez Flink avant 0,9 vous pouvez utiliser l'extrait ci-dessous pour recueillir votre ensemble de données à une collection locale:

val dataJavaList = new ArrayList[K] 
val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList) 
dataset.output(outputFormat) 
env.execute("collect()") 

K est le type d'objet que vous souhaitez collecter

+0

Je ne connais pas Scala, mais il me semble que ce que vous suggérez est égal à ce que j'ai rapporté dans ma question, et avec cette approche, je reçois un NPE. – Flavio