J'essaye d'écrire un calcul dans Flink qui nécessite deux phases.Réutiliser les résultats du premier calcul en second calcul
Dans la première phase, je commence à partir d'un fichier texte, et effectue une estimation des paramètres, obtenant ainsi un objet Java représentant un modèle statistique des données.
Dans la deuxième phase, j'aimerais utiliser cet objet pour générer des données pour une simulation.
Je ne suis pas sûr comment faire ceci. J'ai essayé avec un LocalCollectionOutputFormat
, et cela fonctionne localement, mais quand je déploie le travail sur un cluster, j'obtiens un NullPointerException
- ce qui n'est pas vraiment surprenant.
Quelle est la manière de faire de Flink?
Voici mon code:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);
// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
.readCsvFile(args[0])
.types(String.class, String.class, String.class, String.class);
List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));
env.execute("Bayes");
DataSet<BTP> btp = env
.createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....
Ceci est l'exception que je reçois:
Error: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.client.program.Client.run(Client.java:328)
at org.apache.flink.client.program.Client.run(Client.java:294)
at org.apache.flink.client.program.Client.run(Client.java:288)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
at it.list.flink.test.Test01.main(Test01.java:62)
...
Merci C'est très utile. J'ai essayé de chaîner, mais le résultat de la première exécution est essentiellement un ensemble de paramètres, c'est-à-dire une seule ligne. Distribuant cette ligne aux clients, je peux générer des données en parallèle sur chaque client avec InputFormat, alors qu'avec le chaînage je ne pouvais pas trouver un moyen de faire la même chose. – Flavio
Je ne sais pas si j'ai bien compris, mais vous pourriez peut-être aussi utiliser le truc suivant: –
Placez les paramètres dans une variable de diffusion et distribuez-la à un ensemble d'opérateurs 'flatMap'. Chacun de ces opérateurs 'flatMap' aura accès à tous les paramètres et peut émettre autant d'enregistrements que vous le souhaitez. Cependant, vous devez déclencher l'exécution de 'flatMaps' avec un enregistrement d'entrée factice, qui pourrait être servi à partir d'un format d'entrée de collection contenant un' Integer' pour chaque instance 'flatMap' parallèle. Les données de la collection IF doivent être «rééquilibrées» avant d'entrer dans les opérateurs 'flatMap'. –