2017-07-26 5 views
0

Nous avons récemment essayé d'adopter Airflow comme moteur de «flux de données», et même si je comprends la plupart des choses, je suis toujours dans la zone grise sur la façon dont le planificateur calcule quand déclencher les DAG.Déclenchement DAG Airflow

Jetez un oeil à ce simple dag:

from airflow import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 

dag_options = {     
      'owner':    'Airflow', 
      'depends_on_past':  False,  
      'start_date':   datetime.now() 
} 

with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag: 
       task1 = BashOperator(  
       task_id='task1', 
       bash_command='date',     
       dag=dag)  

Le calendrier reprendra ce, mais ne sera pas exécuter. Maintenant, si je change le "date_début" à:

datetime(year=xxxx,month=yyyy=day=zzzz) 

où xxxx, yyyy, zzzz sont la date d'aujourd'hui, il va commencer l'exécution. Est-ce la cause de ce que le planificateur continue de relire cette dags du dossier source dag, en exécutant datetime.now() chaque fois, notant la date de début est différente de la file d'attente, en ajoutant ce dag et donc en reprogrammant/pousser la date d'exécution en avant (mon dag_dir_list_interval est 300)?

En outre, dans le flux d'air, si je comprends bien, quand un dag est non mis en pause (ou ajouté avec dags_are_paused_at_creation = False), le planificateur programmera l'exécution comme suit:

  • 1ère exécution dag: instant après (start_date + intervalle)
  • 2ème exécution dag: instant après (start_date + (intervalle * 2))
  • 3ème exécution dag: instant après (start_date + (intervalle * 3))

Est-ce une supposition correcte?

MISE À JOUR (7/30/2017)

Sur la base de l'hypothèse ci-dessus, je créé aujourd'hui dag (30/07/2017):

from airflow import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 

dag_options = {     
      'owner':    'Airflow', 
      'depends_on_past': False,  
      'start_date': 
datetime(year=2017,month=7,day=30,hour=20,minute=10) 
} 

with DAG('test_dag_100', schedule_interval="*/10 * * * *", 
default_args=dag_options) as dag: 
       task1 = BashOperator(  
       task_id='task_100', 
       bash_command='date',     
       dag=dag)  

qui devrait commencer à (UTC):

  • 7/30/2017 20:20:00
  • 7/30/2017 20:30:00
  • 7/30/2017 20:40:00

Malheureusement, cela ne se produit pas. Voici quelques captures d'écran de mon tableau de bord:

Quelqu'un peut-il expliquer pourquoi 20:21:00 DAG n'a pas été exécuté? après 20:31:00 il n'a toujours pas exécuté ... Qu'est-ce que je manque ici? Au fait, j'ai aussi remarqué que, pour une raison ou une autre, à chaque fois que je pars et que lance un dag manuellement à travers le tableau de bord, il se trouve juste à l'étape "running". Pourquoi est-ce? Est-ce que le lancer manuellement a quelque chose à voir avec n'importe laquelle des options de timing de démarrage (start_date/interval/etc) ??

Merci pour toutes les précisions que vous pouvez fournir

+0

intervalle d'horaire est crontab, vous pouvez essayer https://crontab.guru/ pour tester ce que l'intervalle signifie pour vous. Si vous utilisez 1.8 +, datetime.now() n'est pas considéré comme une bonne pratique, vous pouvez trouver plus de détails ici https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#less-forgiving- scheduler-on-dynamic-start_date – Chengzhi

Répondre

1

Vos hypothèses sont correctes. Airflow planifie la première exécution du DAG après que l'intervalle de planification spécifié s'est écoulé depuis la date de début. L'utilisation de datetime.now() comme date de début entraînera rarement, voire jamais, le déclenchement d'un DAG par Airflow. Il est mentionné dans les documents de planification.

Si vous deviez spécifier une date de début spécifique, telle que datetime (2017,7,27,1,0) avec un intervalle de planification de "5 * * * *", puis à 1:05 am le 7/27 le DAG sera déclenché pour s'exécuter la première fois. Il continuera à courir toutes les cinq minutes après cela.