2017-08-11 4 views
1

J'utilise Airflow 1.8.1 et je veux pousser le résultat d'une requête sql de PostgreOperator.Airflow: Comment faire pour pousser la valeur xcom de PostgreOperator?

Voilà mes tâches:

check_task = PostgresOperator(
    task_id='check_task', 
    postgres_conn_id='conx', 
    sql="check_task.sql", 
    xcom_push=True, 
    dag=dag) 

def py_is_first_execution(**kwargs): 
    value = kwargs['ti'].xcom_pull(task_ids='check_task') 
    print 'count ----> ', value 
    if value == 0: 
     return 'next_task' 
    else: 
     return 'end-flow' 

check_branch = BranchPythonOperator(
    task_id='is-first-execution', 
    python_callable=py_is_first_execution, 
    provide_context=True, 
    dag=dag) 

et voici mon script sql:

select count(1) from table 

quand je vérifie la valeur de XCOM de check_task il récupère la valeur none.

Répondre

0

Enfin, j'ai créé un nouveau capteur ExecuteSqlOperator dans le gestionnaire de plugins sous $AIRFLOW_HOME/plugins. A titre d'exemple, j'ai utilisé CheckOperator et j'ai modifié la valeur retournée: le fonctionnement de base de cet opérateur était exactement l'inverse de ce dont j'avais besoin.

Voici le défaut du ExecuteSqlOperator: CheckOperator

et voici mon personnalisé SqlSensor: ReverseSqlSensor

class SqlExecuteOperator(BaseOperator): 
""" 
Performs checks against a db. The ``CheckOperator`` expects 
a sql query that will return a single row. 

Note that this is an abstract class and get_db_hook 
needs to be defined. Whereas a get_db_hook is hook that gets a 
single record from an external source. 
:param sql: the sql to be executed 
:type sql: string 
""" 

template_fields = ('sql',) 
template_ext = ('.hql', '.sql',) 
ui_color = '#fff7e6' 

@apply_defaults 
def __init__(
     self, sql, 
     conn_id=None, 
     *args, **kwargs): 
    super(SqlExecuteOperator, self).__init__(*args, **kwargs) 
    self.conn_id = conn_id 
    self.sql = sql 

def execute(self, context=None): 
    logging.info('Executing SQL statement: ' + self.sql) 
    records = self.get_db_hook().get_first(self.sql) 
    logging.info("Record: " + str(records)) 
    records_int = int(records[0]) 
    print (records_int) 
    return records_int 

def get_db_hook(self): 
    return BaseHook.get_hook(conn_id=self.conn_id) 
1

Si je me trompe, le flux d'air pousse automatiquement vers xcom lorsqu'une requête renvoie une valeur. Cependant, quand vous regardez le code du postgresoperator vous voyez qu'il a une méthode d'exécution qui appelle la méthode d'exécution du PostgresHook (extension de dbapi_hook). Les deux méthodes ne renvoient rien, en tant que tel, elle ne pousse rien à xcom. Ce que nous avons fait pour corriger ceci est de créer un CustomPostgresSelectOperator, une copie de PostgresOperator, mais au lieu de 'hook.run (..)', 'return hook.get_records (..)'.

Espérons que cela vous aide.