2017-06-28 1 views
2

Dans l'opération ci-dessous (adapté des docs API Dash DataFrame), si je ne suis pas attaché à un planificateur (laissez la ligne assignant la variable client commentée), l'opération se termine correctement comme prévu.Dask Distributed semble ne pas réussir l'opération demandée pendant le calcul()

from dask.distributed import Client 
import dask.dataframe as dd 
import pandas as pd 

connection_loc = 'foobar.net:8786' 
# client = Client(connection_loc) 

df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]}) 
ddf = dd.from_pandas(df, npartitions=2) 
foo = ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() 

Le moment de cette même ligne est uncommented et une connexion de client est affecté, l'erreur suivante se produit: TypeError: unorderable types: list() >= int() (voir pour plus complet traceback).

En examinant la traceback, je peux voir que le bytestring qu'il essaie de désérialiser n'est pas ce que j'attendrais qu'il devrait essayer de désérialiser (voir la première ligne en trace complète distributed.protocol.pickle - INFO - Failed to deserialize).

J'ai complètement arrêté et redémarré les conteneurs distants exécutant à la fois le travailleur et le planificateur en vain. J'ai également utilisé client.restart() sans aucune chance. Une idée de pourquoi cette autre tâche est passée au travailleur et de lancer cette erreur? Une solution pour que Dask arrête de faire ça?

retraçage complète:

dask_worker_1  | distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95+\x01\x00\x00\x00\x00\x00\x00(\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_fill_function\x94\x93\x94(h\x00\x8c\x0f_make_skel_func\x94\x93\x94h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x02KCC\x0e|\x00j\x00d\x01\x83\x01j\x01\x83\x00S\x00\x94NK\x02\x86\x94\x8c\x07rolling\x94\x8c\x03sum\x94\x86\x94\x8c\x02df\x94\x85\x94\x8c\x1fdask_method/dask_dist_matrix.py\x94\x8c\x08<lambda>\x94K\rC\x00\x94))t\x94R\x94]\x94}\x94\x87\x94R\x94}\x94N}\x94tRN\x8c3('from_pandas-ddc065084280667dd51853b144bdd4e8', 0)\x94NK\x02K\x00)}\x94t\x94." 
dask_worker_1  | Traceback (most recent call last): 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads 
dask_worker_1  |  return pickle.loads(x) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func 
dask_worker_1  |  if cell_count >= 0 else 
dask_worker_1  | TypeError: unorderable types: list() >= int() 
dask_worker_1  | distributed.worker - WARNING - Could not deserialize task 
dask_worker_1  | Traceback (most recent call last): 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 1113, in add_task 
dask_worker_1  |  self.tasks[key] = _deserialize(function, args, kwargs, task) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 573, in _deserialize 
dask_worker_1  |  args = pickle.loads(args) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads 
dask_worker_1  |  return pickle.loads(x) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func 
dask_worker_1  |  if cell_count >= 0 else 
dask_worker_1  | TypeError: unorderable types: list() >= int() 

Dask: 0.15.0 Distribué: 1.17.1 OS: Ubuntu 16.04.2 LTS

Répondre

2

Je soupçonne que vous avez un décalage dans vos versions de cloudpickle entre les travailleurs et les clients. Vous devrez vous assurer que tous vos employés et clients ont la même configuration logicielle. Vous pouvez essayer la commande suivante pour vous aider:

client.get_versions(check=True) 

Je ne pense pas que cela inclut cloudpickle dans la version 1.17.1 dask.distributed mais devrait dans toutes les versions ultérieures. (il fonctionne maintenant en master)

+0

Merci, 'client.get_versions (check = True)' a créé la même erreur. – kuanb

+1

Ah, idéalement, il retournera une erreur informative vous indiquant quelles versions sont incompatibles où. Je suppose que cette méthode dépend de cloudpickle pour fonctionner. Les environnements logiciels distribués sont difficiles. La plupart des déploiements de grande envergure que je connais utilisent quelque chose pour les aider à les synchroniser. – MRocklin

+0

Merci. J'ai deux autres questions si possible. Je leur demande ici car ils peuvent être liés. Après avoir résolu les dépendances incohérentes, je rencontre une erreur Tornado sur get_versions ('distributed.utils - ERROR - Stream est fermé: en essayant d'appeler la méthode distante 'broadcast''). Si j'ignore l'exécution de get_versions, l'erreur suivante s'affichera: 'distributed.utils - ERROR - (" ('apply-29db5629d323ed627f7f91b2363edb30', 0) ", 'tcp: //10.0.0.248: 39689')'. – kuanb

0

Comme l'indique l'autre réponse, il s'agit certainement d'une discordance dans les versions logicielles. J'ai eu le même problème.

J'ai fait plusieurs choses pour que tout fonctionne à nouveau. J'utilisais dask_ec2, donc je vais inclure ces changements ici, mais je ne sais pas comment vous configurez votre cluster. Tout d'abord, comme j'utilisais ubuntu 16.04 localement, j'ai imaginé qu'il était plus probable d'avoir les mêmes bibliothèques, etc. si les serveurs distribués avaient la même version, mais cela avait un problème (voir https://github.com/dask/dask-ec2/issues/98). Résumé: J'ai modifié dask_ec2/salt.py, en le configurant pour télécharger cherrypy==3.2.3 dans la méthode __install_salt_rest_api (voir le problème lié pour plus de détails). Deuxièmement, j'ai défini dask_ec2 pour utiliser les versions les plus récentes d'Anaconda. En dask_ec2/formulas/salt/conda/settings.sls, changer les lignes de DOWNLOAD_URL à:

{% set download_url = 'https://repo.continuum.io/archive/Anaconda2-5.0.1-Linux-x86_64.sh' %} 

{% set download_url = 'https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh' %} 

Troisièmement, je courais une mise à jour sur mon ordinateur, pour assurer mes propres bibliothèques étaient à jour:

Par ex de: Upgrading all packages with pip

pip freeze --local | grep -v '^\-e' | cut -d = -f 1 | xargs -n1 pip install -U 

et

conda update --all 

J'ai finalement remis en marche le tout, et il a bien fonctionné.