2017-09-12 1 views
1

J'utilise le flux d'air sur une instance EC2 en utilisant l'option LocalScheduler. J'ai invoqué airflow scheduler et airflow webserver et tout semble fonctionner correctement. Cela dit, après avoir fourni la chaîne cron à schedule_interval pour "faire ceci toutes les 10 minutes", '*/10 * * * *', le travail continue à s'exécuter toutes les 24 heures par défaut. Voici l'en-tête du code:Exécution d'un DAG Airflow toutes les X minutes

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('PREPROC_PATH') 

if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import workers 
else: 
    print('Define PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

default_args = { 
    'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México 
    'max_active_runs': 1, 
    'concurrency': 4, 
    'schedule_interval': '*/10 * * * *' #..every 10 minutes 
} 

DAG = DAG(
    dag_id='dash_update', 
    default_args=default_args 
) 

... 

Répondre

4

default_args ne vise qu'à remplir params passé aux opérateurs dans un DAG. max_active_runs, concurrency et schedule_interval sont tous les paramètres pour initialiser votre DAG, pas les opérateurs. Voici ce que vous voulez:

DAG = DAG(
    dag_id='dash_update', 
    start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México 
    max_active_runs=1, 
    concurrency=4, 
    schedule_interval='*/10 * * * *', #..every 10 minutes 
    default_args=default_args, 
) 

Je les ai mélangé avant aussi, donc pour référence (note il y a des chevauchements):

paramètres DAG: https://airflow.incubator.apache.org/code.html?highlight=dag#airflow.models.DAG paramètres de l'opérateur: https://airflow.incubator.apache.org/code.html#baseoperator

+0

makes bon sens, totalement manqué cela. merci @Daniel – Aaron