Put étape 1 (le téléchargement csv) et l'étape 2 (le téléchargement csv) dans un subdag, puis déclencher via le SubDagOperator avec l'option executor
définie sur une SequentialExecutor
- cela fera en sorte que les étapes 1 et 2 courir sur le même travailleur.
Voici un fichier DAG de travail illustrant ce concept (avec les opérations réelles écrasa comme DummyOperators), les étapes download/upload dans le cadre de certains processus plus large:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.sequential_executor import SequentialExecutor
PARENT_DAG_NAME='subdaggy'
CHILD_DAG_NAME='subby'
def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date
)
task_download = DummyOperator(
task_id = 'task_download_csv',
dag=dag
)
task_upload = DummyOperator(
task_id = 'task_upload_csv',
dag=dag
)
task_download >> task_upload
return dag
main_dag = DAG(
PARENT_DAG_NAME,
schedule_interval=None,
start_date=datetime(2017,1,1)
)
main_task_1 = DummyOperator(
task_id = 'main_1',
dag = main_dag
)
main_task_2 = SubDagOperator(
task_id = CHILD_DAG_NAME,
subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval),
executor=SequentialExecutor(),
dag=main_dag
)
main_task_3 = DummyOperator(
task_id = 'main_3',
dag = main_dag
)
main_task_1 >> main_task_2 >> main_task_3
Ces deux sont des solutions de contournement viables mais ils ont leurs inconvénients. En utilisant le stockage partagé exige d'abord que ce stockage partagé est disponible pour les travailleurs, qui peuvent ou peuvent ne pas être une contrainte importante; et même quand il est disponible, le stockage local peut avoir de meilleures performances en fonction de l'application. La deuxième approche - lier les deux tâches à une file d'attente particulière - semble renoncer à une certaine flexibilité dans les tâches où peuvent être programmées, en ajoutant des travailleurs supplémentaires, etc. – gcbenison