2017-10-04 13 views
0

J'ai un opérateur Airflow qui donne le coup d'un emploi sur un service 3ème partie surveille alors les progrès de ce travail. Dans le code, l'exécution ressembleétat récupération dans un redémarré Airflow TaskInstance

def execute(self, context): 
    external_id = start_external_job() 
    wait_until_external_job_completes(external_id) 

Si le travailleur Airflow est redémarré (généralement en raison d'un deploy code) lorsqu'une instance de cette tâche est en cours d'exécution, je voudrais que l'instance redémarrée de cette tâche à capable de reprendre où le précédent s'est arrêté (surveillance du travail sur le service de tiers). Existe-t-il un moyen de partager cet ID de travail tiers lors d'exécutions ultérieures de la même instance de tâche?

Un exemple de renforcer la méthode execute ressemblerait à ceci:

def execute(self, context): 
    external_id = load_external_id_for_task_instance() 
    if external_id is None: 
     external_id = start_external_job(args) 
     persist_external_id_for_task_instance(external_id) 

    wait_until_external_job_completes(external_id) 

Et je dois mettre en œuvre load_external_id_for_task_instance et persist_external_id_for_task_instance.

Répondre

0

Je suggère de diviser cela en deux tâches avec l'utilisation de XComs et Sensors.

Vous pouvez avoir un opérateur qui soumet le travail et enregistre l'identifiant à un XCom:

class SubmitJobOperator(BaseOperator): 

    def execute(self, context): 
     external_id = start_external_job() 
     return external_id # return value will be stored in XCom 

Ensuite, un capteur qui va chercher l'id de XCom et les sondages jusqu'à la fin:

class JobCompleteSensor(BaseSensor): 

    @apply_defaults 
    def __init__(self, submit_task_id, *args, **kwargs): 
     self.submit_task_id = submit_task_id # so we know where to fetch XCom value from 
     super(JobCompleteSensor, self).__init__(*args, **kwargs) 

    def poke(self, context): 
     external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id) 
     return check_if_external_job_is_complete(external_id): 

Alors votre DAG ressemblerait à ceci:

submit_job = SubmitJobOperator(
    dag=dag, 
    task_id='submit_job', 
) 

wait_for_job_to_complete = JobCompleteSensor(
    dag=dag, 
    task_id='wait_for_job_to_complete', 
    submit_task_id=submit_job.task_id, 
) 

submit_job >> wait_for_job_to_complete 

Les XComs sont conservés dans la base de ensor sera toujours en mesure de trouver le external_id précédemment soumis.

+0

Merci! Je viens de mettre en œuvre et cela fonctionne comme un charme! –