J'ai un dag que nous déploierons à plusieurs instances de flux d'air différentes et dans notre airflow.cfg nous avons dags_are_paused_at_creation = True
mais pour ce dag spécifique, nous voulons qu'il soit activé sans avoir à le faire manuellement en cliquant sur l'interface utilisateur. Y a-t-il un moyen de le faire par programme?Airflow unpause dag par programmation?
Répondre
plug-in flux d'air repos-api-plugin peut également être utilisé pour des tâches de pause problématiquement.
effectue une pause DAG
Disponible en version Airflow: 1.7.0 ou plus
GET - http: // {HOST}: {PORT}/admin/rest_api/api api = pause
Arguments de la requête:
dag_id - string - l'identifiant du DAG
subdir (en option) - string - emplacement du fichier ou répertoire à partir duquel look pour le DAG
Exemples:
http: // {HOST}: {PORT}/admin/rest_api/api api = pause & dag_id = test_id
Voir pour plus de détails: https://github.com/teamclairvoyant/airflow-rest-api-plugin
J'ai créé la fonction suivante pour le faire si quelqu'un d'autre va dans ce numéro: import airflow.settings from airflow.models import DagModel def unpause_dag(dag): """ A way to programatically unpause a DAG. :param dag: DAG object :return: dag.is_paused is now False """ session = airflow.settings.Session() try: qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id) d = qry.first() d.is_paused = False session.commit() except: session.rollback() finally: session.close()
offre votre dag_id et exécuter cette commande sur votre ligne de commande.
airflow pause dag_id.
Pour plus d'informations sur l'interface de ligne de commande d'air: https://airflow.incubator.apache.org/cli.html
Impressionnant, ne peut pas croire que je manqué cela. Pas exactement par programmation mais néanmoins peut simplement utiliser un BashOperator pour l'exécuter ou utiliser le module de sous-commande pour l'exécuter. Je vois aussi maintenant ici https://github.com/apache/incubator-airflow/blob/master/airflow/bin/cli.py#L307 que j'ai le même code alors ouais je vais l'utiliser. –
vous pouvez aussi utiliser Réactiver (args = '', dag = DAG) méthode airflow.bin.cli –