2017-06-23 1 views
0

J'essaie d'utiliser Apache Airflow pour créer un flux de travail. Donc, fondamentalement, j'ai installé Airflow manuellement dans mon propre noyau anaconda dans le serveur.Airflow ne peut pas exécuter DAG car les tâches en amont ont échoué

Voici la façon dont je lance simple DAG

export AIRFLOW_HOME=~/airflow/airflow_home # my airflow home 
export AIRFLOW=~/.conda/.../lib/python2.7/site-packages/airflow/bin 
export PATH=~/.conda/.../bin:$AIRFLOW:$PATH # my kernel 

Quand je fais la même chose en utilisant le test d'air, il a travaillé pour tâche particulière indépendamment. Par exemple, dans dag1: task1 >> task2

airflow test dag1 task2 2017-06-22 

Je suppose qu'il exécutera d'abord tâche1 puis exécutera tâche2. Mais il suffit de lancer task2 indépendamment.

Avez-vous une idée à ce sujet? Merci beaucoup d'avance!

Voici mon code:

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.python_operator import PythonOperator 
from datetime import datetime, timedelta 


default_args = { 
    'owner': 'txuantu', 
    'depends_on_past': False, 
    'start_date': datetime(2015, 6, 1), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1)) 


def python_op1(ds, **kwargs): 
    print(ds) 
    return 0 


def python_op2(ds, **kwargs): 
    print(str(kwargs)) 
    return 0 

# t1, t2 and t3 are examples of tasks created by instantiating operators 
# t1 = BashOperator(
#  task_id='bash_operator', 
#  bash_command='echo {{ ds }}', 
#  dag=dag) 
t1 = PythonOperator(
    task_id='python_operator1', 
    python_callable=python_op1, 
    # provide_context=True, 
    dag=dag) 


t2 = PythonOperator(
    task_id='python_operator2', 
    python_callable=python_op2, 
    # provide_context=True, 
    dag=dag) 

t2.set_upstream(t1) 

Airflow: v1.8.0 Utilisation de l'exécuteur SequentialExecutor avec SqlLite

airflow run tutorial python_operator2 2015-06-01 

Voici un message d'erreur:

[2017-06-28 22:49:15,336] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags 
[2017-06-28 22:49:16,069] {base_executor.py:50} INFO - Adding to queue: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py 
[2017-06-28 22:49:16,072] {sequential_executor.py:40} INFO - Executing command: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py 
[2017-06-28 22:49:16,765] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py 
[2017-06-28 22:49:16,986] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --job_id 1 --raw -sd DAGS_FOLDER/tutorial.py'] 
[2017-06-28 22:49:17,373] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,373] {__init__.py:57} INFO - Using executor SequentialExecutor 
[2017-06-28 22:49:17,694] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,693] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py 
[2017-06-28 22:49:17,899] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,899] {models.py:1120} INFO - Dependencies not met for <TaskInstance: tutorial.python_operator2 2015-06-01 00:00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'failed': 0, 'upstream_failed': 0, 'skipped': 0, 'done': 0}, upstream_task_ids=['python_operator1'] 
[2017-06-28 22:49:22,011] {jobs.py:2083} INFO - Task exited with return code 0 
+0

Description Ajout (ou extrait de code) de task2 pourrait vous aider à obtenir plus de réponses. Cela semble être un problème avec path, si votre task2 est un BashOperator. – Him

+0

Merci pour votre réponse @Him. Task1 est BashOperator, task2 est PythonOperator. Je vais ajouter mon code pour plus de détails. Merci –

Répondre

0

Si vous voulez seulement Pour exécuter python_operator2, vous devez exécuter:

airflow run tutorial python_operator2 2015-06-01 --ignore_dependencies=False 

Si vous souhaitez exécuter l'ensemble dag et d'exécuter les tâches, utilisez trigger_dag:

airflow trigger_dag tutorial 

Pour référence, airflow test va « exécuter une tâche sans vérifier les dépendances. »

Documentation pour les trois commandes sont disponibles à https://airflow.incubator.apache.org/cli.html

+0

Merci Anthony pour la réponse! Juste une question rapide, comment puis-je vérifier le journal du flux de travail après (tutoriel airflow trigger_dag)? Je ne vois aucun journal dans les journaux/dossier. –

+0

J'ai aussi vérifié l'état de trigger_dag: airflow dag_state tutorial 2017-06-28 => résultat: en cours d'exécution. Mais l'état de la tâche: airflow task_state tutorial python_operator2 2017-06-28 => résultat: Aucun –

+0

Est-ce que la première tâche python se termine réellement? Un scénario possible est que python_opeartor1 est toujours en cours d'exécution au moment de la vérification et que python_opérateur2 n'a pas démarré. En outre, je ne suis pas sûr des journaux. :( –

0

Enfin, j'ai trouvé au sujet d'une réponse à mon problème. Fondamentalement, je pensais que le flux d'air est une charge paresseuse, mais il semble que non. Donc, la réponse est au lieu de:

t2.set_upstream(t1) 

Il devrait être:

t1.set_downstream(t2)