2017-09-12 2 views
0

Je suis nouveau à Airflow et j'ai créé mon premier DAG. Voici mon code DAG. Je veux que le DAG démarre maintenant et qu'il s'exécute ensuite une fois par jour.Airflow DAG n'est pas programmé

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
} 

dag = DAG(
    'alamode', default_args=default_args, schedule_interval=timedelta(1)) 

create_command = "/home/ubuntu/scripts/makedir.sh " 

# t1 is the task which will invoke the directory creation shell script 
t1 = BashOperator(
    task_id='create_directory', 
    bash_command=create_command, 
    dag=dag) 

run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh " 
# t2 is the task which will invoke the spiders 
t2 = BashOperator(
    task_id='web_scrawl', 
    bash_command=run_spiders, 
    dag=dag) 

# To set dependency between tasks. 't1' should run before t2 
t2.set_upstream(t1) 

Le DAG n'est pas sélectionné par Airflow. J'ai vérifié le journal et voici ce qu'il dit.

[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue 
[2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode 
/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 
    'strategies for improved performance.' % expr) 
[2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat 
[2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105 
[2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds 

Que fais-je tort? J'ai essayé de changer la schedule_interval à schedule_interval = timedelta (minutes = 1) pour voir si elle commence immédiatement, mais toujours aucune utilisation. Je peux voir les tâches sous le DAG comme prévu dans Airflow UI mais avec le statut de l'état comme «aucun statut». S'il vous plaît aidez-moi ici.

+0

Avez-vous activé le DAG passer à l'interface utilisateur sur? – Chengzhi

+0

Oui, le bouton est activé. Pourtant, il ne se fait pas ramasser. – Anju

+0

Avez-vous appelé le gestionnaire de flux d'air et le planificateur de flux d'air? –

Répondre

0

Ce problème a été résolu en suivant les étapes ci-dessous:

1) J'utilisé une date beaucoup plus ancienne pour start_date et schedule_interval = timedelta (minutes = 10). En outre, utilisé une date réelle au lieu de datetime.now().
2) Ajout de catchup = True dans les arguments DAG.
3) Configurez la variable d'environnement comme exportation AIRFLOW_HOME = pwd/airflow_home.
4) Supprimé airflow.db
5) a déplacé le nouveau code de dossier DAGS
6) Ran 'initdb d'écoulement d'air de la commande pour créer la base de données à nouveau.
7) Tourné l 'ON' interrupteur de mon DAG par l'interface utilisateur
8) est le code qui fonctionne exécuté la commande 'ordonnanceur de flux d'air'

Voici maintenant:

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 9, 12), 
    'email': ['[email protected]'], 
    'retries': 0, 
    'retry_delay': timedelta(minutes=15) 
} 

dag = DAG(
    'alamode', catchup=False, default_args=default_args, schedule_interval="@daily") 

# t1 is the task which will invoke the directory creation shell script 
t1 = BashOperator(
    task_id='create_directory', 
    bash_command='/home/ubuntu/scripts/makedir.sh ', 
    dag=dag) 


# t2 is the task which will invoke the spiders 
t2 = BashOperator(
    task_id= 'web_crawl', 
    bash_command='/home/ubuntu/scripts/crawl_spiders.sh ', 
    dag=dag) 

# To set dependency between tasks. 't1' should run before t2 
t2.set_upstream(t1)