2017-09-12 2 views
1

J'essaye d'exécuter le pot de flux de données par le biais du script de flux d'air. Pour cela, j'utilise DataFlowJavaOperator. Dans le pot param, je passe le chemin du fichier jar exécutable présent dans le system.But local lorsque je tente d'exécuter ce travail que je reçois erreurException de flux d'air: DataFlow a échoué avec le code retour 1

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete. 
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run 
    result = task_copy.execute(context=context) 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute 
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar) 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow 
    task_id, variables, dataflow, name, ["java", "-jar"]) 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow 
    _Dataflow(cmd).wait_for_done() 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done 
    self._proc.returncode)) 
Exception: DataFlow failed with return code 1` 

Mon script d'air est:

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator 
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook 
from airflow.models import BaseOperator 
from airflow.utils.decorators import apply_defaults 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'airflow', 
'start_date': datetime(2017, 03, 16), 
'email': [<EmailID>], 

'dataflow_default_options': { 
     'project': '<ProjectId>', 
     # 'zone': 'europe-west1-d', (i am not sure what should i pass here) 
     'stagingLocation': 'gs://spark_3/staging/' 
    } 
} 

dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2), 
default_args=default_args) 

dataflow1 = DataFlowJavaOperator(
task_id='dataflow_example', 
jar ='/root/airflow_scripts/csvwriter.jar', 
gcp_conn_id = 'GCP_smoke', 
dag=dag) 

Je ne sais pas ce que je fais erreur, Quelqu'un peut-il me s'il vous plaît aider à sortir de cette

Note :I am creating this jar while selecting option as Runnable JAR file by packaging all the external dependencies.

Répondre

1

le problème était avec le pot que j'utilisais. Avant d'utiliser le pot, assurez-vous que le pot est en cours d'exécution.

Exemple: Si votre pot était dataflow_job1.jar, Execute le pot en utilisant

java -jar dataflow_job_1.jar --parameters_if_any 

Une fois que votre pot fonctionne avec succès, passez à l'aide du pot dans Airflow pot DataflowJavaOperator.

En outre, Si vous rencontrez des erreurs liées à Coders, vous devrez peut-être créer votre propre codeur pour exécuter le code. Par exemple, j'ai eu un problème avec la classe TableRow comme il didnot un codeur par défaut et donc je devais faire cette place:

TableRowCoder:

public class TableRowCoder extends Coder<TableRow> { 
private static final long serialVersionUID = 1L; 
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of(); 
@Override 
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException { 
    tableRow.encode(value, outStream); 

} 
@Override 
public TableRow decode(InputStream inStream) throws CoderException, IOException { 
    return new TableRow().set("F1", tableRow.decode(inStream)); 
} 
@Override 
public List<? extends Coder<?>> getCoderArguments() { 
    // TODO Auto-generated method stub 
    return null; 
} 
@Override 
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { 


} 
} 

Alors inscrivez-vous ce codeur dans votre code à l'aide

pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder()) 

S'il y a encore des erreurs (qui ne sont pas liés à des codeurs) Accédez à:

*.jar\META-INF\services\FileSystemRegistrar 

et ajoutez les dépendances qui peuvent se produire.

Par exemple, il pourrait y avoir une erreur de mise en scène comme:

Unable to find registrar for gs 

je devais ajouter la ligne suivante pour le faire fonctionner.

org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar