2017-07-25 3 views
4

Dans mon fichier DAG, j'ai défini une fonction on_failure_callback() pour publier un Slack en cas d'échec.Airflow par défaut on_failure_callback

Il fonctionne bien si je précise pour chaque opérateur dans mon DAG: l'envoi à on_failure_callback = on_failure_callback()

Y at-il un moyen d'automatiser (via default_args par exemple, ou via mon objet DAG) tous mes les opérateurs?

+0

question intéressante, le on_failure_callback a été défini à BaseOperator, la seule façon que je peux penser est de créer votre propre opérateur et hériter de BaseOperator, puis passer votre on_failure_callback() Là. Je voudrais voir comment les autres pensent – Chengzhi

+0

Merci pour votre avis, mais je n'étais pas confiant de changer quelque chose d'aussi élémentaire que le BaseOperator. Je préfère l'ajouter manuellement à chaque opérateur mais ne pas manquer une mise à jour de BaseOperator (moins de maintenance) –

Répondre

5

J'ai finalement trouvé un moyen de le faire.

Vous pouvez passer votre on_failure_callback comme default_args

class Foo: 
    @staticmethod 
    def get_default_args(): 
     """ 
     Return default args 
     :return: default_args 
     """ 

     default_args = { 
      'on_failure_callback': Foo.on_failure_callback 
     } 

     return default_args 

    @staticmethod 
    def on_failure_callback(context): 
    """ 
    Define the callback to post on Slack if a failure is detected in the Workflow 
    :return: operator.execute 
    """ 

    operator = SlackAPIPostOperator(
     task_id='failure', 
     text=str(context['task_instance']), 
     token=Variable.get("slack_access_token"), 
     channel=Variable.get("slack_channel") 
    ) 

    return operator.execute(context=context)