Vous pouvez définir le paramètre max_active_runs
de votre DAG sur 1, ce qui garantira qu'une seule exécution DAG pour ce DAG sera programmée en même temps. https://pythonhosted.org/airflow/code.html?highlight=concurrency#models
Si vous avez besoin de votre entière dag être complète avant d'aller de l'avant, vous pouvez ajouter un ExternalTaskSensor
au début de votre DAG et une tâche de collecte DummyOperator
à la fin. Définissez ensuite le ExternalTaskSensor à déclencher sur DummyOperator à la fin de l'exécution précédente.
dag = DAG(dag_id='dag')
wait_for_previous_operator = ExternalTaskSensor(\
task_id='wait_for_previous',
external_dag_id='dag',
external_task_id='collection',
execution_delta=schedule_interval,
dag=dag)
collection_operator = DummyOperator(\
task_id='collection',
dag=dag)
wait_for_previous_operator.set_downstream(your_other_tasks_list)
collection_operator.set_upstream(your_other_tasks_list)
Hi. max_active_runs est déjà défini sur 1. En fait, un seul dag s'exécute à la fois. Mais il essaie de terminer toutes les tâches du même type (à toutes les dates) avant de passer à la tâche suivante dans la file d'attente. –
J'ai mis à jour ma réponse pour refléter votre commentaire –
Les deux dans le même DAG? Laisse-moi essayer ça. Je pense que les ExternalTaskSensors sont censés être utilisés pour différents dags. Voyons voir! –