2

Bonjour.Apache Airflow - déclencher/programmer DAG relancer à la fin (File Sensor)

Je suis en train d'installer un DAG trop

  1. Watch/sens pour un fichier de frapper un dossier réseau
  2. Process le fichier
  3. Archive le fichier

Utilisation de la tutoriels en ligne et stackoverflow J'ai été capable de trouver le DAG et l'Opérateur suivants qui atteignent avec succès les objectifs, mais j'aimerais que le DAG soit reporté ou relancé à la fin pour qu'il commence Hing/détection pour un autre fichier.

J'ai tenté de définir une variable max_active_runs:1 puis un schedule_interval: timedelta(seconds=5) ce oui replie le DAG mais démarre la tâche de mise en file d'attente et verrouille le fichier.

Des idées sont les bienvenus sur la façon dont je pourrais réexécuter le DAG après la archive_task?

Merci

DAG CODE

from airflow import DAG 
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator 
from datetime import datetime, timedelta 
from airflow.models import Variable 

default_args = { 
    'owner': 'glsam', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'provide_context': True, 
    'retries': 100, 
    'retry_delay': timedelta(seconds=30), 
    'max_active_runs': 1, 
    'schedule_interval': timedelta(seconds=5), 
} 

dag = DAG('test_sensing_for_a_file', default_args=default_args) 

filepath = Variable.get("soucePath_Test") 
filepattern = Variable.get("filePattern_Test") 
archivepath = Variable.get("archivePath_Test") 

sensor_task = OmegaFileSensor(
    task_id='file_sensor_task', 
    filepath=filepath, 
    filepattern=filepattern, 
    poke_interval=3, 
    dag=dag) 


def process_file(**context): 
    file_to_process = context['task_instance'].xcom_pull(
     key='file_name', task_ids='file_sensor_task') 
    file = open(filepath + file_to_process, 'w') 
    file.write('This is a test\n') 
    file.write('of processing the file') 
    file.close() 


proccess_task = PythonOperator(
    task_id='process_the_file', python_callable=process_file, dag=dag) 

archive_task = ArchiveFileOperator(
    task_id='archive_file', 
    filepath=filepath, 
    archivepath=archivepath, 
    dag=dag) 

sensor_task >> proccess_task >> archive_task 

FICHIER SENSOR OPÉRATEUR

import os 
    import re 

    from datetime import datetime 
    from airflow.models import BaseOperator 
    from airflow.plugins_manager import AirflowPlugin 
    from airflow.utils.decorators import apply_defaults 
    from airflow.operators.sensors import BaseSensorOperator 


    class ArchiveFileOperator(BaseOperator): 
     @apply_defaults 
     def __init__(self, filepath, archivepath, *args, **kwargs): 
      super(ArchiveFileOperator, self).__init__(*args, **kwargs) 
      self.filepath = filepath 
      self.archivepath = archivepath 

     def execute(self, context): 
      file_name = context['task_instance'].xcom_pull(
       'file_sensor_task', key='file_name') 
      os.rename(self.filepath + file_name, self.archivepath + file_name) 


    class OmegaFileSensor(BaseSensorOperator): 
     @apply_defaults 
     def __init__(self, filepath, filepattern, *args, **kwargs): 
      super(OmegaFileSensor, self).__init__(*args, **kwargs) 
      self.filepath = filepath 
      self.filepattern = filepattern 

     def poke(self, context): 
      full_path = self.filepath 
      file_pattern = re.compile(self.filepattern) 

      directory = os.listdir(full_path) 

      for files in directory: 
       if not re.match(file_pattern, files): 
        return False 
       else: 
        context['task_instance'].xcom_push('file_name', files) 
        return True 


    class OmegaPlugin(AirflowPlugin): 
     name = "omega_plugin" 
     operators = [OmegaFileSensor, ArchiveFileOperator] 

Répondre

1

Set schedule_interval=None et utiliser la commande airflow trigger_dag de BashOperator pour lancer l'exécution suivante à la fin de la précédente.

trigger_next = BashOperator(task_id="trigger_next", 
      bash_command="airflow trigger_dag 'your_dag_id'", dag=dag) 

sensor_task >> proccess_task >> archive_task >> trigger_next 

Vous pouvez commencer votre première course manuellement avec la même commande airflow trigger_dag et trigger_next tâche déclenchera automatiquement la suivante. Nous utilisons cela en production depuis plusieurs mois maintenant et cela fonctionne parfaitement.

+0

Un grand merci méthode. –

4

La méthode Dmitris a parfaitement fonctionné.

J'ai aussi trouvé dans ma lecture mise en schedule_interval=None puis en utilisant la TriggerDagRunOperator a travaillé tout aussi bien

trigger = TriggerDagRunOperator(
    task_id='trigger_dag_RBCPV99_rerun', 
    trigger_dag_id="RBCPV99_v2", 
    dag=dag) 

sensor_task >> proccess_task >> archive_task >> trigger