2017-06-13 1 views
6

J'ai du mal à comprendre comment fonctionne BranchPythonOperator dans Airflow. Je sais que c'est principalement utilisé pour la branche, mais je suis confus par la documentation quant à ce qu'il faut passer dans une tâche et ce que je dois passer/attendre de la tâche en amont. Etant donné l'exemple simple dans la documentation on this page, à quoi ressemblerait le code source pour la tâche amont appelée run_this_first et les 2 en aval qui sont branchés? Comment Airflow sait-il exactement exécuter branch_a au lieu de branch_b? Où la sortie de la tâche en amont est-elle remarquée/lue?Comment fonctionne BranchPythonOperator d'Airflow?

Répondre

7

Votre BranchPythonOperator est créé avec un python_callable, qui sera une fonction. Cette fonction doit renvoyer, en fonction de votre logique métier, le nom de la tâche des tâches immédiatement en aval que vous avez connectées. Cela pourrait être de 1 à N tâches immédiatement en aval. Il n'y a rien que les tâches en aval ont à lire, mais vous pouvez leur transmettre des métadonnées en utilisant xcom.

def decide_which_path(): 
    if something is True: 
     return "branch_a" 
    else: 
     return "branch_b" 


branch_task = BranchPythonOperator(
    task_id='run_this_first', 
    python_callable=decide_which_path, 
    trigger_rule="all_done", 
    dag=dag) 

branch_task.set_downstream(branch_a) 
branch_task.set_downstream(branch_b) 

Il est important de définir le trigger_rule ou tout le reste sera sautée, comme la valeur par défaut est all_success.

+0

Est-ce encore vrai en ce qui concerne le trigger_rule? Les docs ne suggèrent pas que vous en avez besoin, mais juste une tâche fictive, car les autres tâches (à part celle retournée par la fonction) immédiatement en aval seront ignorées https://airflow.incubator.apache.org/concepts.html #branching – Davos

+0

Oui, c'est correct, cela dépend donc de la façon dont les tâches en aval sont connectées. Je pense que j'ai supposé que toutes les branches ont fusionné dans une ligne principale de tâches, mais ce n'est probablement même pas le cas d'utilisation normal (mais c'est mon normal). – Nick

+0

quel est votre usecase par intérêt? Mon actuel est 'ce fichier existe-t-il' et si ce n'est pas le cas, continuez à le créer, sinon tâche factice puis dag se termine avec succès. C'est spécifiquement pour charger des données statiques (ne changeront jamais) d'une base de données SQL à hadoop. Je le veux idempotent, avec un non-op très rapide, évitant l'impact de la requête à la base de données source complètement si pas nécessaire. – Davos