2017-07-12 1 views
1

Existe-t-il un moyen d'utiliser la macro Airflow en dehors de tout opérateur?Flux d'air: utilisation d'une macro en dehors des opérateurs

Par exemple, dans le DAG je une action:

datestamp = '{{ ds }}' 

print(datestamp) # prints string not the date when I run it for any date 

scanner = S3KeySensor(
     task_id='scanner', 
     poke_interval=60, 
     timeout=24 * 60 * 60, 
     soft_fail=False, 
     wildcard_match=True, 
     bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date 
     bucket_name=bucketName, 
     dag=dag) 

Donc, lorsque le scanner appelant, la valeur « ds » sera remplacé par date d'exécution qui est prévu, mais je veux utiliser la valeur « ds » dans quelques autres endroits. Mais dans ce cas, il ne remplace pas la valeur, à la place il reçoit la chaîne entière comme "{{ds}}". Dans l'exemple ci-dessus. L'instruction print imprime "{{ds}}" et non la date d'exécution.

Répondre

-2

Utilisez des guillemets doubles.

datestamp = "{{ ds }}" 
print datestamp 
1

Heureusement pour vous bucket_key quels Templated, il suffit de mettre le modèle de Jinja à l'intérieur.

… 
bucket_key=getPath() + '{{ ds }}', 
… 

Complètement en dehors d'un opérateur, vous ne pouvez pas utiliser ces macros. Parce que le fichier est interprété régulièrement par le planificateur, et pas seulement lors d'une exécution dag. Alors, quelle serait la valeur de ds quand le dag ne fonctionne pas?

Cependant, comme il est peu probable que vous vouliez faire quoi que ce soit en dehors des tâches, vous pouvez le mettre dans un champ de modèle. Vous pouvez également étendre un autre champ à modéliser.

class MySensor(S3KeySensor): 
    template_fields = ('bucket_key', 'bucket_name', 'my_thing') 

    def __init__(self, my_thing=None, *args, **kwargs): 
     super(MySensor, self).__init__(*args, **kwargs) 
     self.my_tyhing = my_thing 

    def post_execute(self, context): 
     logging.info(
      "I probably wanted to over-ride poke to use {}".format(my_thing) 

scanner = MySensor(
    my_thing='{{ ds }}', 
    task_id='scanner', 
    poke_interval=60, 
    timeout=24 * 60 * 60, 
    soft_fail=False, 
    wildcard_match=True, 
    bucket_key=getPath() + '{{ ds }}', 
    bucket_name=bucketName, 
    dag=dag)