Utilisation de la dernière version d'apache airflow. Commencé avec LocalExecutor, dans ce mode, tout fonctionnait correctement, à l'exception de certaines interactions, l'interface Web indique que le CeleryExecutor était nécessaire pour les utiliser. Installé et configuré l'exécuteur Céleri avec Redis, configuré Redis comme l'URL du courtier et le backend de résultat.Apache Airflow Céleri Redis DecodeError
Il semble fonctionner au début, jusqu'à ce qu'une tâche est planifiée à quel point il donne l'erreur suivante:
File "/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler
job.run()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run
self._execute()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute
self._execute_helper(processor_manager)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper
self.executor.heartbeat()
File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat
self.sync()
File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync
state = async.state
File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state
return self._get_task_meta()['status']
File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta
meta = self._get_task_meta_for(task_id)
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for
return self.decode_result(meta)
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result
return self.meta_from_decoded(self.decode(payload))
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode
accept=self.accept)
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads
return load(BytesIO(s))
kombu.exceptions.DecodeError: invalid load key, '{'.
semble être une erreur de sérialisation de conserves au vinaigre, mais je ne suis pas sûr de savoir comment tracer la cause. Aucune suggestion?
Ce problème concerne systématiquement un flux de travail dans lequel j'utilise la fonctionnalité de sous-repérage, le problème est peut-être lié à cela.
NOTE: J'ai également testé en utilisant RabbitMQ, y avait un problème différent; Le client affiche «connexion réinitialisée par un pair» et se bloque. Le journal RabbitMQ affiche "connexion TCP inattendue fermée du client".