2017-09-08 3 views
1

Je le DAG suivant, qui exécute les différentes méthodes avec une classe dédiée à une routine de pré-traitement des données:Utiliser XCom pour échanger des données entre classes?

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH') 

if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    from table_builder import OnlineOfflinePreprocess 
else: 
    print('Define MARKETING_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

default_args = { 
    'start_date': datetime.now(), 
    'max_active_runs': 1, 
    'concurrency': 4 
} 

worker = OnlineOfflinePreprocess() 

DAG = DAG(
    dag_id='marketing_data_preproc', 
    default_args=default_args, 
    start_date=datetime.today() 
) 

import_online_data = PythonOperator(
    task_id='import_online_data', 
    python_callable=worker.import_online_data, 
    dag=DAG) 

import_offline_data = PythonOperator(
    task_id='import_offline_data', 
    python_callable=worker.import_offline_data, 
    dag=DAG) 

merge_aurum_to_sherlock = PythonOperator(
    task_id='merge_aurum_to_sherlock', 
    python_callable=worker.merge_aurum_to_sherlock, 
    dag=DAG) 

merge_sherlock_to_aurum = PythonOperator(
    task_id='merge_sherlock_to_aurum', 
    python_callable=worker.merge_sherlock_to_aurum, 
    dag=DAG) 

upload_au_to_sh = PythonOperator(
    task_id='upload_au_to_sh', 
    python_callable=worker.upload_table, 
    op_args='aurum_to_sherlock', 
    dag=DAG) 

upload_sh_to_au = PythonOperator(
    task_id='upload_sh_to_au', 
    python_callable=worker.upload_table, 
    op_args='sherlock_to_aurum', 
    dag=DAG) 

import_online_data >> merge_aurum_to_sherlock 
import_offline_data >> merge_aurum_to_sherlock 

merge_aurum_to_sherlock >> merge_sherlock_to_aurum 
merge_aurum_to_sherlock >> upload_au_to_sh 
merge_sherlock_to_aurum >> upload_sh_to_au 

Cela produit l'erreur suivante:

[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info' 

Ce qui est en fait assez évident étant donné la façon dont Le flux d'air fonctionne: les sorties des différentes méthodes de classe appelées ne sont pas stockées dans l'objet de classe global initialisé en haut du graphique. Puis-je résoudre ce problème avec XCom? Dans l'ensemble, quelle est la réflexion sur la façon de mélanger la cohérence de la POO avec Airflow?

Répondre

3

C'est moins un problème de POO avec flux d'air et plus sur l'état avec le flux d'air.

Tout état qui doit être transmis entre les tâches doit être stocké de manière persistante. C'est parce que chaque tâche de flux d'air est un processus indépendant (qui pourrait même être exécuté sur une machine différente!) Et donc la communication en mémoire n'est pas possible.

Vous avez raison, vous pouvez utiliser XCOM pour passer cet état (s'il est petit, puisqu'il est stocké dans la base de données de flux d'air). Si c'est grand, vous voudrez probablement le stocker ailleurs, peut-être un système de fichiers ou S3 ou HDFS ou une base de données spécialisée.

+0

Super, c'est logique. J'ai affaire à de très grandes tables, alors ce qui va faire sens est l'une des options de stockage non locales que vous mentionnez. XCom ne semble pas être fait pour passer des GB autour. Je voulais juste m'assurer que je ne manquais rien d'évident. À votre santé! – Aaron