2017-06-13 1 views
2

je voudrais utiliser la date d'exécution en tant que paramètre à mon fichier sql:Airflow: pass {{ds}} comme param pour PostgresOperator

i essayé

dt = '{{ ds }}' 

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift', 
    postgres_conn_id='redshift', 
    sql='s3_to_redshift.sql', 
    params={'file': dt}, 
    dag=dag 
) 

mais il ne fonctionne pas.

Répondre

5

dt = '{{ ds }}'

ne fonctionne pas parce que Jinja (le moteur de templating utilisé dans le flux d'air) ne traite pas l'intégralité du fichier de définition Dag.

Pour chaque Operator il y a des champs que Jinja va traiter, qui font partie de la définition de l'opérateur lui-même.

Dans ce cas, vous pouvez rendre le champ params (qui est en fait appelé parameters, assurez-vous de changer cela) basé sur un modèle si vous prolongez le PostgresOperator comme ceci:

class MyPostgresOperator(PostgresOperator): 
    template_fields = ('sql','parameters') 

Maintenant, vous devriez être en mesure de le faire :

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift', 
    postgres_conn_id='redshift', 
    sql='s3_to_redshift.sql', 
    parameters={'file': '{{ ds }}'}, 
    dag=dag 
)