2017-09-19 2 views
2

Utilisation de la dernière version d'apache airflow. Commencé avec LocalExecutor, dans ce mode, tout fonctionnait correctement, à l'exception de certaines interactions, l'interface Web indique que le CeleryExecutor était nécessaire pour les utiliser. Installé et configuré l'exécuteur Céleri avec Redis, configuré Redis comme l'URL du courtier et le backend de résultat.Apache Airflow Céleri Redis DecodeError

Il semble fonctionner au début, jusqu'à ce qu'une tâche est planifiée à quel point il donne l'erreur suivante:

File "/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler 
    job.run() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run 
    self._execute() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute 
    self._execute_helper(processor_manager) 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper 
    self.executor.heartbeat() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat 
    self.sync() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync 
    state = async.state 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state 
    return self._get_task_meta()['status'] 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta 
    return self._maybe_set_cache(self.backend.get_task_meta(self.id)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta 
    meta = self._get_task_meta_for(task_id) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for 
    return self.decode_result(meta) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result 
    return self.meta_from_decoded(self.decode(payload)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode 
    accept=self.accept) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors 
    reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors 
    yield 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

semble être une erreur de sérialisation de conserves au vinaigre, mais je ne suis pas sûr de savoir comment tracer la cause. Aucune suggestion?

Ce problème concerne systématiquement un flux de travail dans lequel j'utilise la fonctionnalité de sous-repérage, le problème est peut-être lié à cela.

NOTE: J'ai également testé en utilisant RabbitMQ, y avait un problème différent; Le client affiche «connexion réinitialisée par un pair» et se bloque. Le journal RabbitMQ affiche "connexion TCP inattendue fermée du client".

Répondre

0

je suis tombé sur ce après avoir vu la même trace exacte dans nos journaux de planificateur:

File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

Le fait que le céleri a essayé de unpickle quelque chose qui commence par « { » semblait suspect, donc je pris un tcpdump de le trafic et déclenché une tâche via l'interface Web. Le inclus cet échange capture résultant presque exactement le même instant que le backtrace ci-dessus est apparu dans les journaux du planificateur:

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2" 
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}" 

La charge utile de la réponse de Redis est clairement JSON, alors pourquoi le céleri tente de unpickle il? Nous sommes en train de migrer d'Airflow 1.7 à 1.8, et pendant notre déploiement, nous avons une flotte de travailleurs Airflow en cours d'exécution v1.7 et une autre version v1.8. Les travailleurs étaient censés tirer des files d'attente avec des charges de travail disjointes, mais en raison d'un bug dans l'un de nos DAG, nous avions une TaskInstance programmée par Airflow 1.8 qui était ensuite exécutée par un ouvrier céleri lancé via Airflow 1.7.

AIRFLOW-1038 a modifié le sérialiseur pour les statuts de tâche de céleri de JSON (par défaut) à pickle, afin que les travailleurs exécutant une version du code avant cette modification sérialiseront les résultats dans JSON, et les planificateurs exécutant une version du code qui inclut cette Le changement va essayer de désérialiser les résultats en décochant, ce qui provoque l'erreur ci-dessus.

0

Veuillez vérifier quel type de celery_result_backend vous avez configuré dans airflow.cfg. Essayez de le basculer vers un backend de base de données (mysql etc) si ce n'est pas le cas.

Nous voyons qu'avec un backend ampq (seulement disponible sur Céleri 3.1 et ci-dessous), redis et rpc backend il y a parfois des problèmes.