2017-06-19 2 views
2

Mon idée est d'avoir une tâche foo qui génère une liste d'entrées (utilisateurs, rapports, fichiers journaux, etc.), et une tâche est lancée pour chaque élément de la liste d'entrée . L'objectif est d'utiliser la nouvelle tentative d'Airflow et d'autres logiques, au lieu de la réimplémenter.Les tâches ajoutées au DAG pendant l'exécution ne sont pas planifiées

Donc, idéalement, mon DAG devrait ressembler à ceci: enter image description here

La seule variable est le nombre de tâches générées ici. Je souhaite effectuer d'autres tâches une fois toutes ces tâches terminées. Il ne semble donc pas judicieux de créer un nouveau groupe de disponibilité de base de données pour chaque tâche.

Ceci est mon code:

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2015, 6, 1) 
} 

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args) 

foo_operator = BashOperator(
    task_id='foo', 
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))), 
    xcom_push=True, 
    dag=dag) 

def gen_nodes(**kwargs): 
    ti = kwargs['ti'] 
    workers = json.loads(ti.xcom_pull(task_ids='foo')) 

    for wid in workers: 
     print("Iterating worker %s" % wid) 
     op = PythonOperator(
      task_id='test_op_%s' % wid, 
      python_callable=lambda: print("Dynamic task!"), 
      dag=dag 
     ) 

     op.set_downstream(bar_operator) 
     op.set_upstream(dummy_op) 

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes', 
    python_callable=gen_nodes, 
    provide_context=True, 
    dag=dag 
) 

gen_subdag_node_op.set_upstream(foo_operator) 

dummy_op = DummyOperator(
    task_id='dummy', 
    dag=dag 
) 

dummy_op.set_upstream(gen_subdag_node_op) 

bar_operator = DummyOperator(
    task_id='bar', 
    dag=dag) 

bar_operator.set_upstream(dummy_op) 

Dans les journaux, je peux voir que gen_nodes est exécuté correctement (à savoir Iterating worker 5, etc.). Cependant, les nouvelles tâches ne sont pas planifiées et il n'y a aucune preuve qu'elles ont été exécutées.

J'ai trouvé des exemples de code connexes en ligne, such as this, mais je n'ai pas réussi à le faire fonctionner. Est-ce que je manque quelque chose?

Sinon, y a-t-il une approche plus appropriée à ce problème (isoler les unités de travail)?

Répondre

2

A ce stade, le flux d'air ne prend pas en charge l'ajout/la suppression d'une tâche pendant l'exécution du dag.

L'ordre du flux de travail sera celui qui est évalué au début de l'exécution de dag.

See the second paragraph here.

Cela signifie que vous ne pouvez pas ajouter/supprimer des tâches en fonction de ce qui se passe dans la course. Vous pouvez ajouter des tâches X dans une boucle for en fonction de quelque chose qui n'est pas lié à l'exécution, mais une fois l'exécution commencée, il est impossible de modifier la forme/l'ordre du flux de travail.

Plusieurs fois, vous pouvez plutôt utiliser un BranchPythonOperator pour prendre une décision au cours d'une course dag, (et ces décisions peuvent être basées sur vos xcom valeurs), mais ils doivent être une décision de descendre une branche qui existe déjà dans le flux de travail .

Dag court, et les définitions Dag sont séparées dans le flux d'air d'une manière qui ne sont pas entièrement intuitive, mais plus ou moins tout ce qui est créé/généré à l'intérieur d'un cycle de dag (xcom, dag_run.conf, etc.) ne sont pas utilisables pour définir le dag lui-même.

+0

Merci pour votre réponse, @jhnclvr. Donc, essentiellement, si vous voulez parcourir it sur N éléments, en tirant la liste de 'xcom' et l'itérer dans une tâche est le seul moyen de faire cela? – Gediminas

+0

C'est une façon de le faire qui fonctionnerait à coup sûr. Vous pouvez également avoir X branches différentes que vous pouvez descendre en fonction de la valeur 'xcom' si cela peut correspondre à votre scénario. Ou, de la même manière, vous pourriez déclencher d'autres dags en utilisant un 'TriggerDagRunOperator' basé sur le' xcom'. – jhnclvr

+0

J'ai des tâches (décompresser un fichier) qui 1: peut prendre plusieurs minutes, et 2: devra être exécuté un nombre arbitraire de fois. Donc, si je vais créer une tâche qui itère sur cette liste de fichiers xcom à développer, elle ne peut pas avoir de délai, n'est-ce pas? –