je veux déclencher une simplehttpoperator, comme ceci: flux d'air trigger_dag test_trigger --conf '{ "name": "quelque chose"}'comment passer des paramètres de la tâche pythonoperator à la tâche simplehttpoperator dans airflow dag?
puis-je utiliser un python_callable de pythonoperator pour accepter les paramètres à l'aide kwargs [ 'dag_run' ] .conf, et je veux passer le ['dag_run']. conf à simplehttpoperator, comment puis-je le faire? Quelqu'un peut-il aider?
cc_ = {}
def run_this_func(ds, **kwargs):
cc_ = kwargs['dag_run'].conf
logging.info(cc_)
return cc_
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
http_task = SimpleHttpOperator(
task_id='http_task',
http_conn_id='test_http',
method='POST',
endpoint='/api/v1/function',
data=cc_,
headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
response_check=lambda response: True if "10000" in response.content else False,
dag=dag)
http_task.set_upstream(run_this)
mais comment utiliser XCOM dans simplehttpoperator? pouvez-vous donner un code de cas? – pyfroggogogo
@pyfroggogogo, je mets à jour avec un exemple de code, essayez si cela fonctionne – Chengzhi
Les modèles doivent être passés en tant que chaîne. Vous pouvez utiliser 'data = json.loads ('{{... | tojson}}')' pour le retrouver dans un type dict après le rendu. –