2017-08-17 2 views
2

J'ai un cas d'utilisation où j'ai une liste de clients. Le client peut être ajouté ou supprimé de la liste, et ils peuvent avoir des dates de début différentes et des paramètres initiaux différents. Je veux utiliser le flux d'air pour remplir toutes les données pour chaque client en fonction de leur date de début initiale + réexécuter si quelque chose échoue. Je pense à créer un sous-tag pour chaque client. Cela répondra-t-il à mon problème?Flux d'air: Création d'un sous-tag dynamique

Comment créer dynamiquement des sous-étiquettes en fonction du client_id?

Répondre

0

Vous pouvez certainement créer des objets DAG dynamiquement:

def make_client_dag(parent_dag, client): 
    return DAG(
    '%s.client_%s' % (parent_dag.dag_id, client.name), 
    start_date = client.start_date 
) 

Vous pouvez ensuite utiliser cette méthode dans un SubDagOperator de votre principale dag:

for client in clients: 
    SubDagOperator(
    task_id='client_%s' % client.name, 
    dag=main_dag, 
    subdag = make_client_dag(main_dag, client) 
) 

Cela va créer un subdag spécifique à chaque membre de la collection clients, et chacun fonctionnera pour l'invocation suivante du dag principal. Je ne suis pas sûr si vous obtiendrez le comportement de remplissage que vous voulez.