Bonjour.Apache Airflow - déclencher/programmer DAG relancer à la fin (File Sensor)
Je suis en train d'installer un DAG trop
- Watch/sens pour un fichier de frapper un dossier réseau
- Process le fichier
- Archive le fichier
Utilisation de la tutoriels en ligne et stackoverflow J'ai été capable de trouver le DAG et l'Opérateur suivants qui atteignent avec succès les objectifs, mais j'aimerais que le DAG soit reporté ou relancé à la fin pour qu'il commence Hing/détection pour un autre fichier.
J'ai tenté de définir une variable max_active_runs:1
puis un schedule_interval: timedelta(seconds=5)
ce oui replie le DAG mais démarre la tâche de mise en file d'attente et verrouille le fichier.
Des idées sont les bienvenus sur la façon dont je pourrais réexécuter le DAG après la archive_task?
Merci
DAG CODE
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime.now(),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30),
'max_active_runs': 1,
'schedule_interval': timedelta(seconds=5),
}
dag = DAG('test_sensing_for_a_file', default_args=default_args)
filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")
sensor_task = OmegaFileSensor(
task_id='file_sensor_task',
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids='file_sensor_task')
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file', python_callable=process_file, dag=dag)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
archivepath=archivepath,
dag=dag)
sensor_task >> proccess_task >> archive_task
FICHIER SENSOR OPÉRATEUR
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
def execute(self, context):
file_name = context['task_instance'].xcom_pull(
'file_sensor_task', key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_pattern, files):
return False
else:
context['task_instance'].xcom_push('file_name', files)
return True
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]
Un grand merci méthode. –