2017-09-11 5 views
1

je veux déclencher une simplehttpoperator, comme ceci: flux d'air trigger_dag test_trigger --conf '{ "name": "quelque chose"}'comment passer des paramètres de la tâche pythonoperator à la tâche simplehttpoperator dans airflow dag?

puis-je utiliser un python_callable de pythonoperator pour accepter les paramètres à l'aide kwargs [ 'dag_run' ] .conf, et je veux passer le ['dag_run']. conf à simplehttpoperator, comment puis-je le faire? Quelqu'un peut-il aider?

cc_ = {} 


def run_this_func(ds, **kwargs): 
    cc_ = kwargs['dag_run'].conf 
    logging.info(cc_) 
    return cc_ 

run_this = PythonOperator(
    task_id='run_this', 
    provide_context=True, 
    python_callable=run_this_func, 
    dag=dag) 

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/function', 
    data=cc_, 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 

http_task.set_upstream(run_this) 

Répondre

0

Pour la communication entre les tâches, vous pouvez consulter le XCOM, https://airflow.incubator.apache.org/concepts.html#xcoms

***** ***** MISE À JOUR
(merci Daniel pour plus de détails) Ci-dessous quelques codes vous pouvez essayer, dans votre SimpleHttpOperator vous obtenez la valeur de retour via XCOM:

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/function', 
    data=json.loads("{{ task_instance.xcom_pull(task_ids='run_this', key='return_value') }}"), 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 
+0

mais comment utiliser XCOM dans simplehttpoperator? pouvez-vous donner un code de cas? – pyfroggogogo

+0

@pyfroggogogo, je mets à jour avec un exemple de code, essayez si cela fonctionne – Chengzhi

+1

Les modèles doivent être passés en tant que chaîne. Vous pouvez utiliser 'data = json.loads ('{{... | tojson}}')' pour le retrouver dans un type dict après le rendu. –

0

Merci à @Chengzhi et @ Daniel. Enfin, j'ai écrit un filtre personnalisé 'tojson' dans Jinja2/filter.py, car dans le flux d'air, la version par défaut de Jinja2 est 2.8.1 et Jinja2 ne contient pas le filtre intégré nommé 'tojson' jusqu'à la version 2.9.

def do_tojson(value): 
    value = json.JSONEncoder().encode(value) 
    return value 

Dans le fichier dag, le code est le suivant. Ça marche.

def run_this_func(ds, **kwargs): 
    cc_ = kwargs['dag_run'].conf 
    return cc_ 

run_this = PythonOperator(
    task_id='run_this', 
    provide_context=True, 
    python_callable=run_this_func, 
    dag=dag) 

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/task', 
    data="{{ task_instance.xcom_pull(task_ids='run_this') |tojson}}", 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*", 
      "Content-Type": "application/json"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 

http_task.set_upstream(run_this)