2017-04-13 1 views
0

Je travaille avec apache airflow 1.8.0.Planificateur de flux d'air apache ne planifiant pas les travaux

Voici sortie quand je backfill le travail.

[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00  [scheduled]> 
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]> 
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2'] 

lorsque j'essaie de planifier DAG il émet une erreur.

Traceback (most recent call last): 
    File "/anaconda3/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill 
    pool=args.pool) 
    File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run 
    job.run() 
    File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run 
    self._execute() 
    File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute 
    raise AirflowException(err) 
airflow.exceptions.AirflowException: --------------------------------------------------- 

Voici une sortie sur les tâches.

BackfillJob is deadlocked. These tasks have succeeded: 
set() 
These tasks have started: 
{} 
These tasks have failed: 
set() 
These tasks are skipped: 
set() 
These tasks are deadlocked: 
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>} 

testé avec python 2.7 et python 3,5

utilisé SequentialExecutor et LocalExecutor

PS. Si je remplis le DAG à l'heure actuelle, il s'exécute pour une fois, puis renvoie l'erreur ci-dessus pour toutes les tâches planifiées.

Répondre

0

Votre instance de flux d'air est dans un état de blocage. La tâche qui a échoué n'autorise pas les prochaines étapes de la tâche.

Airflow lance chaque tâche dans chaque dag exécuter comme un nouveau processus et lorsque la tâche vacille et ce ne sont pas éxécutés situation de blocage se pose

Pour résoudre cette situation, vous pouvez faire une des opérations suivantes:

  1. use **airflow clear** <<dag_id>> Cela permettra de résoudre l'impasse et permettre aux futurs parcours de la DAG/tâche
  2. Si ne parvenez pas à résoudre le problème, vous devez use airflow resetdb Cela dégagerait la base de données de flux d'air et donc résoudre le problème

À l'avenir,

  • essayer et utiliser execution_timeout=timedelta(minutes=2) définir un certain délai afin que vous avez le contrôle explicite de l'opérateur
  • En outre, fournissent un on_failure_callback=handle_failure qui existerait proprement l'opérateur en cas d'échec

Espérons que cela aide,

Cheers!