Je le DAG suivant, qui exécute les différentes méthodes avec une classe dédiée à une routine de pré-traitement des données:Utiliser XCom pour échanger des données entre classes?
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
from table_builder import OnlineOfflinePreprocess
else:
print('Define MARKETING_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'max_active_runs': 1,
'concurrency': 4
}
worker = OnlineOfflinePreprocess()
DAG = DAG(
dag_id='marketing_data_preproc',
default_args=default_args,
start_date=datetime.today()
)
import_online_data = PythonOperator(
task_id='import_online_data',
python_callable=worker.import_online_data,
dag=DAG)
import_offline_data = PythonOperator(
task_id='import_offline_data',
python_callable=worker.import_offline_data,
dag=DAG)
merge_aurum_to_sherlock = PythonOperator(
task_id='merge_aurum_to_sherlock',
python_callable=worker.merge_aurum_to_sherlock,
dag=DAG)
merge_sherlock_to_aurum = PythonOperator(
task_id='merge_sherlock_to_aurum',
python_callable=worker.merge_sherlock_to_aurum,
dag=DAG)
upload_au_to_sh = PythonOperator(
task_id='upload_au_to_sh',
python_callable=worker.upload_table,
op_args='aurum_to_sherlock',
dag=DAG)
upload_sh_to_au = PythonOperator(
task_id='upload_sh_to_au',
python_callable=worker.upload_table,
op_args='sherlock_to_aurum',
dag=DAG)
import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock
merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au
Cela produit l'erreur suivante:
[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'
Ce qui est en fait assez évident étant donné la façon dont Le flux d'air fonctionne: les sorties des différentes méthodes de classe appelées ne sont pas stockées dans l'objet de classe global initialisé en haut du graphique. Puis-je résoudre ce problème avec XCom
? Dans l'ensemble, quelle est la réflexion sur la façon de mélanger la cohérence de la POO avec Airflow?
Super, c'est logique. J'ai affaire à de très grandes tables, alors ce qui va faire sens est l'une des options de stockage non locales que vous mentionnez. XCom ne semble pas être fait pour passer des GB autour. Je voulais juste m'assurer que je ne manquais rien d'évident. À votre santé! – Aaron