2017-08-23 1 views
1

J'ai un DAG quigarantie que certains opérateurs seront exécutés sur le même travailleur d'air

  1. télécharge un fichier csv de stockage en nuage
  2. télécharge le fichier csv à une 3ème partie via https

le cluster d'air J'exécute sur utilise CeleryExecutor par défaut, donc je suis inquiet que, à un moment donné quand j'ÉCHELLE le nombre de travailleurs, ces tâches peuvent être exécutées sur différents travailleurs. par exemple. Travailleur A fait le téléchargement, le travailleur B essaie de télécharger, mais ne trouve pas le fichier (parce qu'il est sur le travailleur A)

Est-il possible de garantir en quelque sorte que les opérateurs de téléchargement et de téléchargement seront exécutés sur le même flux d'air ouvrier?

Répondre

1

Pour ce genre de cas d'utilisation, nous avons deux solutions:

  1. Utilisez un lecteur monté réseau qui est partagé entre les deux travailleurs de sorte que le tâches de téléchargement et le téléchargement ont accès au même système de fichiers
  2. Utilisez Airflow queue qui est spécifique travailleur. S'il n'y a qu'un seul travailleur écoutant cette file d'attente vous garantir que les deux auront accès au même système de fichiers. Notez que chaque travailleur peut écouter sur plusieurs files d'attente afin que vous puissiez avoir l'écoute sur la file d'attente « par défaut », ainsi que l'usage d'une destinée à cette tâche.
+0

Ces deux sont des solutions de contournement viables mais ils ont leurs inconvénients. En utilisant le stockage partagé exige d'abord que ce stockage partagé est disponible pour les travailleurs, qui peuvent ou peuvent ne pas être une contrainte importante; et même quand il est disponible, le stockage local peut avoir de meilleures performances en fonction de l'application. La deuxième approche - lier les deux tâches à une file d'attente particulière - semble renoncer à une certaine flexibilité dans les tâches où peuvent être programmées, en ajoutant des travailleurs supplémentaires, etc. – gcbenison

1

Put étape 1 (le téléchargement csv) et l'étape 2 (le téléchargement csv) dans un subdag, puis déclencher via le SubDagOperator avec l'option executor définie sur une SequentialExecutor - cela fera en sorte que les étapes 1 et 2 courir sur le même travailleur.

Voici un fichier DAG de travail illustrant ce concept (avec les opérations réelles écrasa comme DummyOperators), les étapes download/upload dans le cadre de certains processus plus large:

from datetime import datetime, timedelta 
from airflow.models import DAG 
from airflow.operators.dummy_operator import DummyOperator 
from airflow.operators.subdag_operator import SubDagOperator 
from airflow.executors.sequential_executor import SequentialExecutor 

PARENT_DAG_NAME='subdaggy' 
CHILD_DAG_NAME='subby' 

def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval): 
    dag = DAG(
     '%s.%s' % (parent_dag_name, child_dag_name), 
     schedule_interval=schedule_interval, 
     start_date=start_date 
     ) 

    task_download = DummyOperator(
     task_id = 'task_download_csv', 
     dag=dag 
     ) 

    task_upload = DummyOperator(
     task_id = 'task_upload_csv', 
     dag=dag 
     ) 

    task_download >> task_upload 

    return dag 
main_dag = DAG(
    PARENT_DAG_NAME, 
    schedule_interval=None, 
    start_date=datetime(2017,1,1) 
) 

main_task_1 = DummyOperator(
    task_id = 'main_1', 
    dag = main_dag 
) 

main_task_2 = SubDagOperator(
    task_id = CHILD_DAG_NAME, 
    subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval), 
    executor=SequentialExecutor(), 
    dag=main_dag 
) 

main_task_3 = DummyOperator(
    task_id = 'main_3', 
    dag = main_dag 
) 

main_task_1 >> main_task_2 >> main_task_3