2017-05-15 1 views
0

J'ai le fichier dag comme suit. Ici, je n'ai pas de nouvelles tentatives. Cependant, je veux m'assurer que des fichiers particuliers (bash1, bash2) devraient avoir des tentatives 1. mais pas d'autres fichiers.comment s'assurer qu'un seul bashoperator est réessayé mais pas d'autre flux d'air

Voici les arguments par défaut.

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 3, 24, 19, 00), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': True, 
    'retries': 0, 
    'retry_delay': timedelta(minutes=5), 
    'backfill': False, 
} 

I définie dag comme suit: dag = DAG ('x', default_args = default_args, schedule_interval = « 15 0,1,2,3,13,14,15,16,17,18, 19,20,21,22,23 * * * ")

Mon premier opérateur est de définir comme suit:

bash1 = BashOperator(
     task_id= 'bash1', 
     bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript bash1.R ", 
     dag=dag 
    ) 

Mon deuxième opérateur est de définir comme suit:

bash2 = BashOperator(
     task_id= 'bash2', 
     bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript bash2.R ", 
     dag=dag 
    ) 

Mon opérateur final est de définir comme suit:

Test_join = BashOperator(
     task_id= 'Test_join', 
     bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript Test_join.R ", 
     dag=dag 
    ) 

Test_join dépend de bash1 et bash 2.

Test_join.set_upstream(bash1) 
Test_join.set_upstream(bash2) 

Que dois-je faire pour faire bash1 sûr et bash2 à la retraite 2 mais pas Test_join.

Répondre

0

Vous pouvez utiliser le paramètre 'retries' lors de la déclaration d'une tâche.