Mon idée est d'avoir une tâche foo
qui génère une liste d'entrées (utilisateurs, rapports, fichiers journaux, etc.), et une tâche est lancée pour chaque élément de la liste d'entrée . L'objectif est d'utiliser la nouvelle tentative d'Airflow et d'autres logiques, au lieu de la réimplémenter.Les tâches ajoutées au DAG pendant l'exécution ne sont pas planifiées
Donc, idéalement, mon DAG devrait ressembler à ceci:
La seule variable est le nombre de tâches générées ici. Je souhaite effectuer d'autres tâches une fois toutes ces tâches terminées. Il ne semble donc pas judicieux de créer un nouveau groupe de disponibilité de base de données pour chaque tâche.
Ceci est mon code:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op = DummyOperator(
task_id='dummy',
dag=dag
)
dummy_op.set_upstream(gen_subdag_node_op)
bar_operator = DummyOperator(
task_id='bar',
dag=dag)
bar_operator.set_upstream(dummy_op)
Dans les journaux, je peux voir que gen_nodes
est exécuté correctement (à savoir Iterating worker 5
, etc.). Cependant, les nouvelles tâches ne sont pas planifiées et il n'y a aucune preuve qu'elles ont été exécutées.
J'ai trouvé des exemples de code connexes en ligne, such as this, mais je n'ai pas réussi à le faire fonctionner. Est-ce que je manque quelque chose?
Sinon, y a-t-il une approche plus appropriée à ce problème (isoler les unités de travail)?
Merci pour votre réponse, @jhnclvr. Donc, essentiellement, si vous voulez parcourir it sur N éléments, en tirant la liste de 'xcom' et l'itérer dans une tâche est le seul moyen de faire cela? – Gediminas
C'est une façon de le faire qui fonctionnerait à coup sûr. Vous pouvez également avoir X branches différentes que vous pouvez descendre en fonction de la valeur 'xcom' si cela peut correspondre à votre scénario. Ou, de la même manière, vous pourriez déclencher d'autres dags en utilisant un 'TriggerDagRunOperator' basé sur le' xcom'. – jhnclvr
J'ai des tâches (décompresser un fichier) qui 1: peut prendre plusieurs minutes, et 2: devra être exécuté un nombre arbitraire de fois. Donc, si je vais créer une tâche qui itère sur cette liste de fichiers xcom à développer, elle ne peut pas avoir de délai, n'est-ce pas? –