2017-04-11 1 views
2

Lorsque nous faisons un dagrun, sur l'interface utilisateur de flux d'air, dans la «vue graphique», nous obtenons des détails sur chaque travail.Comment obtenir le JobID pour les courses dag de flux d'air?

JobID est quelque chose comme "scheduled__2017-04-11T10: 47: 00".

J'ai besoin de ce JobID pour le suivi et la création de journal dans lequel je maintiens le temps que chaque tâche/dagrun a pris.

Donc, ma question est comment puis-je obtenir le JobID dans le même dag qui est en cours d'exécution.

Merci, Chetan

Répondre

2

Cette valeur est en fait appelée run_id et est accessible via le contexte ou les macros.

Dans l'opérateur python, on y accède via le contexte, et dans l'opérateur bash, on y accède via jinja templating sur le champ bash_command.

Plus d'informations sur ce qui est disponible dans les macros:

https://airflow.incubator.apache.org/code.html#macros

Plus d'info sur Jinja:

https://airflow.incubator.apache.org/concepts.html#jinja-templating

from airflow.models import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.python_operator import PythonOperator 


dag = DAG(
    dag_id='run_id', 
    schedule_interval=None, 
    start_date=datetime(2017, 2, 26) 
) 

def my_func(**kwargs): 
    context = kwargs 
    print(context['dag_run'].run_id) 

t1 = PythonOperator(
    task_id='python_run_id', 
    python_callable=my_func, 
    provide_context=True, 
    dag=dag 
    ) 

t2 = BashOperator(
    task_id='bash_run_id', 
    bash_command='echo {{run_id}}', 
    dag=dag) 

t1.set_downstream(t2) 

Utilisez cette dag comme exemple, et vérifiez le journal chaque opérateur, vous devriez voir le run_id imprimé dans le journal.