Nous avons récemment essayé d'adopter Airflow comme moteur de «flux de données», et même si je comprends la plupart des choses, je suis toujours dans la zone grise sur la façon dont le planificateur calcule quand déclencher les DAG.Déclenchement DAG Airflow
Jetez un oeil à ce simple dag:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now()
}
with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='date',
dag=dag)
Le calendrier reprendra ce, mais ne sera pas exécuter. Maintenant, si je change le "date_début" à:
datetime(year=xxxx,month=yyyy=day=zzzz)
où xxxx, yyyy, zzzz sont la date d'aujourd'hui, il va commencer l'exécution. Est-ce la cause de ce que le planificateur continue de relire cette dags du dossier source dag, en exécutant datetime.now() chaque fois, notant la date de début est différente de la file d'attente, en ajoutant ce dag et donc en reprogrammant/pousser la date d'exécution en avant (mon dag_dir_list_interval est 300)?
En outre, dans le flux d'air, si je comprends bien, quand un dag est non mis en pause (ou ajouté avec dags_are_paused_at_creation = False), le planificateur programmera l'exécution comme suit:
- 1ère exécution dag: instant après (start_date + intervalle)
- 2ème exécution dag: instant après (start_date + (intervalle * 2))
- 3ème exécution dag: instant après (start_date + (intervalle * 3))
Est-ce une supposition correcte?
MISE À JOUR (7/30/2017)
Sur la base de l'hypothèse ci-dessus, je créé aujourd'hui dag (30/07/2017):
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date':
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}
with DAG('test_dag_100', schedule_interval="*/10 * * * *",
default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task_100',
bash_command='date',
dag=dag)
qui devrait commencer à (UTC):
- 7/30/2017 20:20:00
- 7/30/2017 20:30:00
- 7/30/2017 20:40:00
Malheureusement, cela ne se produit pas. Voici quelques captures d'écran de mon tableau de bord:
Quelqu'un peut-il expliquer pourquoi 20:21:00 DAG n'a pas été exécuté? après 20:31:00 il n'a toujours pas exécuté ... Qu'est-ce que je manque ici? Au fait, j'ai aussi remarqué que, pour une raison ou une autre, à chaque fois que je pars et que lance un dag manuellement à travers le tableau de bord, il se trouve juste à l'étape "running". Pourquoi est-ce? Est-ce que le lancer manuellement a quelque chose à voir avec n'importe laquelle des options de timing de démarrage (start_date/interval/etc) ??
Merci pour toutes les précisions que vous pouvez fournir
intervalle d'horaire est crontab, vous pouvez essayer https://crontab.guru/ pour tester ce que l'intervalle signifie pour vous. Si vous utilisez 1.8 +, datetime.now() n'est pas considéré comme une bonne pratique, vous pouvez trouver plus de détails ici https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#less-forgiving- scheduler-on-dynamic-start_date – Chengzhi