2017-06-16 1 views
0

Comment configurer Airflow de sorte que toute défaillance du DAG entraîne (immédiatement) un message de ralentissement?Message relatif au débit insuffisant du débit d'air

En ce moment, je gère en créant une slack_failed_task:

slack_failed_task = SlackAPIPostOperator(
    task_id='slack_failed', 
    channel="#datalabs", 
    trigger_rule='one_failed', 
    token="...", 
    text = ':red_circle: DAG Failed', 
    icon_url = 'http://airbnb.io/img/projects/airflow3.png', 
    dag=dag) 

Et définir cette tâche (one_failed) en amont de l'autre tâche dans le DAG:

slack_failed_task << download_task_a 
slack_failed_task << download_task_b 
slack_failed_task << process_task_c 
slack_failed_task << process_task_d 
slack_failed_task << other_task_e 

Il fonctionne, mais il est erreur sujette depuis oublier d'ajouter la tâche va passer les notifications de relâchement et semble beaucoup de travail.

Existe-t-il un moyen de développer la propriété email_on_failure dans le groupe de disponibilité de base de données?

Bonus ;-) pour inclure un moyen de transmettre le nom de la tâche ayant échoué au message.

Répondre

5

peut-être il utile:

def slack_failed_task(contextDictionary, **kwargs): 
     failed_alert = SlackAPIPostOperator(
     task_id='slack_failed', 
     channel="#datalabs", 
     token="...", 
     text = ':red_circle: DAG Failed', 
     owner = '_owner',) 
     return failed_alert.execute 


task_with_failed_slack_alerts = PythonOperator(
task_id='task0', 
python_callable=<file to execute>, 
on_failure_callback=slack_failed_task, 
provide_context=True, 
dag=dag) 
2

Le paramètre supports BaseOperator de on_failure_callback ':

on_failure_callback (appelable) - une fonction à appeler quand une instance de tâche de cette tâche échoue. un dictionnaire de contexte est transmis en tant que paramètre unique à cette fonction. Contexte contient des références à des objets liés à l'instance de la tâche et est documentée dans la section macros de l'API.

Je n'ai pas testé cela, mais vous devriez pouvoir définir une fonction qui affiche un relâchement en cas de défaillance et la transmettre à chaque définition de tâche. Pour obtenir le nom de la tâche en cours, vous pouvez utiliser le modèle {{task_id}}.

+0

je ne pouvais pas obtenir le '{{task_id}}' travailler encore, mais votre aide a été apprécié –