J'essaye d'exécuter le pot de flux de données par le biais du script de flux d'air. Pour cela, j'utilise DataFlowJavaOperator. Dans le pot param, je passe le chemin du fichier jar exécutable présent dans le system.But local lorsque je tente d'exécuter ce travail que je reçois erreurException de flux d'air: DataFlow a échoué avec le code retour 1
{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow
task_id, variables, dataflow, name, ["java", "-jar"])
File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
_Dataflow(cmd).wait_for_done()
File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
self._proc.returncode))
Exception: DataFlow failed with return code 1`
Mon script d'air est:
from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 03, 16),
'email': [<EmailID>],
'dataflow_default_options': {
'project': '<ProjectId>',
# 'zone': 'europe-west1-d', (i am not sure what should i pass here)
'stagingLocation': 'gs://spark_3/staging/'
}
}
dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2),
default_args=default_args)
dataflow1 = DataFlowJavaOperator(
task_id='dataflow_example',
jar ='/root/airflow_scripts/csvwriter.jar',
gcp_conn_id = 'GCP_smoke',
dag=dag)
Je ne sais pas ce que je fais erreur, Quelqu'un peut-il me s'il vous plaît aider à sortir de cette
Note :I am creating this jar while selecting option as Runnable JAR file by packaging all the external dependencies.