2017-06-15 1 views
1

J'essaie de faire intentionnellement échouer une tâche Airflow et de la renvoyer par erreur en passant une ligne Bash (thisshouldnotrun) qui ne fonctionne pas. AirFlow émettre en sortie les éléments suivants:Comment configurer la configuration de messagerie d'Airflow pour envoyer un e-mail sur les erreurs?

[2017-06-15 17:44:17,869] {bash_operator.py:94} INFO - /tmp/airflowtmpLFTMX7/run_bashm2MEsS: line 7: thisshouldnotrun: command not found 
[2017-06-15 17:44:17,869] {bash_operator.py:97} INFO - Command exited with return code 127 
[2017-06-15 17:44:17,869] {models.py:1417} ERROR - Bash command failed 
Traceback (most recent call last): 
    File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run 
    result = task_copy.execute(context=context) 
    File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute 
    raise AirflowException("Bash command failed") 
AirflowException: Bash command failed 
[2017-06-15 17:44:17,871] {models.py:1433} INFO - Marking task as UP_FOR_RETRY 
[2017-06-15 17:44:17,878] {models.py:1462} ERROR - Bash command failed 
Traceback (most recent call last): 
    File "/home/ubuntu/.local/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 585, in test 
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) 
    File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper 
    result = func(*args, **kwargs) 
    File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run 
    result = task_copy.execute(context=context) 
    File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute 
    raise AirflowException("Bash command failed") 
airflow.exceptions.AirflowException: Bash command failed 

Est-ce Débit d'air envoyer un email pour ce genre d'erreurs? Si non, quelle serait la meilleure façon d'envoyer un courriel pour ces erreurs? Je ne suis même pas sûr si airflow.cfg est configuré correctement ... Puisque le but ultime est de tester la notification d'alerte par email, je veux m'assurer que airflow.cfg est configuré correctement. Voici la configuration:

[email] 
email_backend = airflow.utils.email.send_email_smtp 


[smtp] 
# If you want airflow to send emails on retries, failure, and you want to use 
# the airflow.utils.email.send_email_smtp function, you have to configure an 
# smtp server here 
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True 
smtp_ssl = False 
# Uncomment and set the user/pass settings if you want to use SMTP AUTH 
# smtp_user = airflow_data_user 
# smtp_password = password 
smtp_port = 587 
smtp_mail_from = [email protected] 

Qu'est-ce que smtp_starttls? Je ne trouve aucune information dans la documentation ou en ligne. Si nous avons besoin d'une authentification à deux facteurs pour voir les courriels, cela sera-t-il un problème pour Airflow?

Voici mon commande Bash:

task1_bash_command = """ 
export PATH=/home/ubuntu/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin 
export rundate=`TZ='America/Los_Angeles' date +%F -d "yesterday"` 
export AWS_CONFIG_FILE="/home/ubuntu/.aws/config" 

/home/ubuntu/bin/snowsql -f //home/ubuntu/sql/script.sql 1> /home/ubuntu/logs/"$rundate"_dev.log 2> /home/ubuntu/logs/"$rundate"_error_dev.log 

if [ -e /home/ubuntu/logs/"$rundate"_error_dev.log ] 
then 
    exit 64 
fi 

Et ma tâche:

task1 = BashOperator(
    task_id = 'run_bash', 
    bash_command = task1_bash_command, 
    dag = dag, 
    retries = 2, 
    email_on_failure = True, 
    email = '[email protected]') 

Répondre

1

smtp_starttls signifie essentiellement utilisation TLS

Réglez ce paramètre False et définissez smtp_ssl-True si vous souhaitez utiliser SSL à la place. Vous avez probablement besoin de smtp_user et smtp_password pour l'un ou l'autre.

Airflow ne gère pas l'authentification en 2 étapes. Cependant, si vous utilisez AWS, vous n'en avez probablement pas besoin car vos informations d'identification SMTP (SES) sont différentes de vos informations d'identification AWS.

Voir here.

EDIT: Pour flux d'air d'envoyer un e-mail en cas d'échec, il y a deux choses qui doivent être mis sur votre tâche, email_on_failure et email.

Voir ici par exemple:

def throw_error(**context): 
    raise ValueError('Intentionally throwing an error to send an email.') 



t1 = PythonOperator(task_id='throw_error_and_email', 
        python_callable=throw_error, 
        provide_context=True, 
        email_on_failure=True, 
        email='[email protected]', 
        dag=dag) 
+0

Merci pour la clarification. J'essaie toujours de comprendre quels types d'erreurs Airflow va ou ne va pas saisir - mon exemple est-il hors de portée d'Airflow? – simplycoding

+0

Il doit détecter tout échec de tâche, mais vous devez définir votre tâche d'une certaine manière. S'il vous plaît voir ma modification à ma réponse pour un exemple. – jhnclvr

+0

Ouais je pense que mon problème est d'essayer de faire Bash jeter une erreur, donc rien Airflow liés – simplycoding