2017-06-23 2 views
0

Où les fonctions auxiliaires doivent-elles être placées dans le flux d'air? Devrait-il être ajouté dans le répertoire du plugin? Je vais avoir un code pour répéter mes travaux ETL comme ceci:Fonctions auxiliaires dans le flux d'air

#create a sub dag containing DROP -> CREATE -> INSERT 
sub_create_dag = DAG('%s.%s' % (parent_dag_name, child_dag_name), default_args=args) 
pg_drop = DropPostgresOperator(task_id='drop_{}'.format(table), sql="", params={'schema': schema, 'table': table}, postgres_conn_id=args['connection_id'], autocommit=True, dag=sub_create_dag) 
pg_create = PostgresOperator(task_id='create_{}'.format(table), sql='{sql_path}/create_{name}.sql'.format(sql_path=sql_path, name=table), postgres_conn_id=args['connection_id'], autocommit=True, dag=sub_create_dag) 
pg_insert = PostgresOperator(task_id='insert_{}'.format(table), sql='{sql_path}/insert_{name}.sql'.format(sql_path=sql_path, name=table), postgres_conn_id=args['connection_id'], autocommit=True, dag=sub_create_dag) 
pg_drop >> pg_create >> pg_insert 
return dag 

Le problème est que je vais avoir des erreurs comme:

File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
raise TemplateNotFound(template) 

Répondre

1

Nous avons utilisé une combinaison des deux . Nous avons implémenté les tâches indépendantes en tant qu'opérateurs personnalisés dans le dossier plugin, tandis que les petites fonctions comme python fonctionnent simplement dans le dossier DAG lui-même. Concernant l'erreur, c'est parce que le jinja n'a pas pu trouver le dossier des modèles. Cette erreur ne doit pas apparaître si vous implémentez les fonctions d'assistance dans le dossier DAG ou plugins.