J'ai un opérateur Airflow qui donne le coup d'un emploi sur un service 3ème partie surveille alors les progrès de ce travail. Dans le code, l'exécution ressembleétat récupération dans un redémarré Airflow TaskInstance
def execute(self, context):
external_id = start_external_job()
wait_until_external_job_completes(external_id)
Si le travailleur Airflow est redémarré (généralement en raison d'un deploy code) lorsqu'une instance de cette tâche est en cours d'exécution, je voudrais que l'instance redémarrée de cette tâche à capable de reprendre où le précédent s'est arrêté (surveillance du travail sur le service de tiers). Existe-t-il un moyen de partager cet ID de travail tiers lors d'exécutions ultérieures de la même instance de tâche?
Un exemple de renforcer la méthode execute ressemblerait à ceci:
def execute(self, context):
external_id = load_external_id_for_task_instance()
if external_id is None:
external_id = start_external_job(args)
persist_external_id_for_task_instance(external_id)
wait_until_external_job_completes(external_id)
Et je dois mettre en œuvre load_external_id_for_task_instance
et persist_external_id_for_task_instance
.
Merci! Je viens de mettre en œuvre et cela fonctionne comme un charme! –