2017-06-14 2 views
0

En vertu d'un exécuteur séquentiel, j'ai un fichier DAG où je précise trois tâches qui sont nécessaires pour être exécuté de manière séquentielle (t1 -> t2 -> t3):assurer le fonctionnement séquentiel des tâches (Apache Airflow)

default_args = { 
    'owner': 'airflow', 
    'start_date': datetime(2017, 6, 14, 23 , 20), 
    'email_on_failure': False, 
    'email_on_retry': False, 
    } 

dag = DAG('test_dag', default_args=default_args, schedule_interval="*/5 * * * *") 

t1 = BashOperator(
    task_id='form_dataset', 
    bash_command='python 1.py', 
    dag=dag) 

t2 = BashOperator(
    task_id='form_features', 
    bash_command='python 2.py', 
    depends_on_past=True, 
    dag=dag) 

t3 = BashOperator(
    task_id='train', 
    bash_command='python 3.py', 
    depends_on_past=True, 
    dag=dag) 

t2.set_upstream(t1) 
t3.set_upstream(t2) 
t4.set_upstream(t3) 

Je suppose que le comportement séquentiel t1 -> t2 -> t3 est un comportement par défaut, je pensais que ce n'était pas le cas dans ma situation (l'ordre est plutôt aléatoire, par exemple t1 -> t2 -> t2- -> t1 -> t3). Quel genre d'argument me manque pour corriger le comportement?

Répondre

1

Vous devez ajouter à la fin du fichier la déclaration

t1 >> t2 >> t3 

. Plus de détails pour cela sont sur le lien suivant: https://airflow.incubator.apache.org/concepts.html#bitshift-composition

Pour être complet, vous pouvez également le faire en utilisant les méthodes set_upstream() ou set_downstream() pour les tâches.

+0

Merci pour votre réponse, je l'ai en fait (désolé je ne l'ai pas mis dans le code). Je suppose que le problème est que la quantité totale de temps d'exécution pour les trois tâches est supérieure à l'intervalle de planification. Je voudrais définir la priorité sur l'exécution de toutes les tâches dans un DAG plutôt que sur l'exécution d'un nouveau DAG, mais je n'ai pas trouvé d'attribut associé jusqu'à présent. –

+0

L'exécution de chaque exécution DAG dépend-elle de la réussite de l'exécution précédente? Dans ce cas, vous pouvez examiner le comportement du paramètre depends_on_true. Il y a une autre option pour assigner chaque tâche à un pool et limiter la taille de ce pool à 1. Ou peut-être que je ne comprends pas votre cas d'utilisation? En général, les tâches dag doivent être aussi indépendantes des autres runs que possible. – Him

+0

Dans airflow.cfg, il existe un paramètre max_active_runs_per_dag. Mettre ceci à 1 devrait également empêcher le même dag de recommencer avant que le précédent ne finisse. – Him