2017-05-19 1 views
1

Peut-on créer un nom de fichier unique chaque fois que le flux d'air est exécuté et accéder à ce fichier depuis toutes les tâches? J'ai essayé de créer une variable globale (nom_fichier_sortie) et un horodatage ajouté. Mais quand j'accède à ce nom de fichier dans les tâches, chaque tâche génère un nom de fichier différent car elle calcule l'horodatage dans chaque tâche. Ci-dessous un exemple de code:Créer un nom de fichier unique et accéder à ce fichier dans toutes les tâches de flux d'air

table_name = 'Test_ABC' 
start_date = datetime.now() 
cur_tmpstp = start_date.strftime('%Y_%m_%d') 

output_filename = table_name + "_" + cur_tmpstp + ".csv" 
S3_landing_path = "s3://abc/" 

def clean_up(): 
    if os.path.exists(output_filename): 
     os.remove(output_filename) 


task_1 = BashOperator(
    task_id='task_1', 
    bash_command="aws s3 cp %s %s/ " %(output_filename, S3_landing_path,), 
    dag=dag) 

task_2_cleanup = PythonOperator(
    task_id='task_2_cleanup', 
    python_callable=clean_up, 
    dag=dag) 

Nous avons plus de tâches où nous devons accéder output_filename. Comment peut-on accéder à la variable globale output_filename dans toutes les tâches?

Répondre

1

Si vous avez uniquement besoin de l'horodatage avec la granularité du jour, vous pouvez utiliser les variables par défaut avec la mise en forme. Quelques exemples de telles variables (tirées de http://airflow.readthedocs.io/en/latest/code.html#default-variables) sont

{{ ds }} the execution date as YYYY-MM-DD 
{{ ds_nodash }}  the execution date as YYYYMMDD 
{{ execution_date }} the execution_date, (datetime.datetime)