2017-06-13 1 views
1

Puis-je utiliser des macros avec PythonOperator? J'ai essayé de suivre, mais j'étais incapable d'obtenir les macros rendues!Macros dans l'opérateur Python Airflow

dag = DAG(
    'temp', 
    default_args=default_args, 
    description='temp dag', 
    schedule_interval=timedelta(days=1)) 

def temp_def(a, b, **kwargs): 
    print '{{ds}}' 
    print '{{execution_date}}' 
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs)) 

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = PythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 

Répondre

7

Les macros ne sont traitées que pour les champs modélisés. Pour que Jinja traite ce champ, étendez le PythonOperator avec le vôtre.

class MyPythonOperator(PythonOperator): 
    template_fields = ('templates_dict','op_args') 

J'ajouté 'templates_dict' au template_fields parce que le PythonOperator a lui-même ce domaine basé sur un modèle: PythonOperator

Maintenant, vous devriez être en mesure d'utiliser une macro dans ce domaine:

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 
+1

pouvons-nous marquer cela comme la bonne réponse? Parce que c'est la bonne réponse –

+1

Pour la rétrocompatibilité, vous pouvez surcharger 'template_fields' comme ceci:' template_fields = PythonOperator.template_fields + ('op_args',) '. BTW, j'ai ouvert un [JIRA pour ajouter 'op_args' et' op_kwargs' champs '' PythonOperator' modèle]] (https://issues.apache.org/jira/browse/AIRFLOW-1814) –

1

Dans mon opinion d'une façon plus naturelle Airflow d'approcher ce serait d'utiliser le PythonOperator inclus et d'utiliser le paramètre provide_context=True en tant que tel.

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    provide_context=True, 
    dag=dag) 

Maintenant, vous avez accès à toutes les macros, les métadonnées de flux d'air et paramètres tâche dans le kwargs de votre appelable

def temp_def(**kwargs): 
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date'])) 

Si vous aviez des params défini sur mesure associée à la tâche que vous pouvez accéder à ces aussi bien par kwargs['params']

+0

Ceci est probablement le meilleur moyen de je le fais. Ma réponse était principalement ciblée sur la question spécifique de savoir pourquoi les macros n'étaient pas traitées. – jhnclvr