2017-10-13 28 views
1

J'ai un fichier parquet au format ruche et une compression rapide. Il s'inscrit dans la mémoire et un pandas.info fournit les données suivantes.temps de traitement incohérent dans dask distribué fastparquet

Nombre de lignes par groupe dans le fichier parquet est juste 100K

>>> df.info() 
<class 'pandas.core.frame.DataFrame'> 
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM= 
Data columns (total 8 columns): 
payment_method_id   int16 
payment_plan_days   int16 
plan_list_price   int16 
actual_amount_paid  int16 
is_auto_renew    bool 
transaction_date   datetime64[ns] 
membership_expire_date datetime64[ns] 
is_cancel     bool 
dtypes: bool(2), datetime64[ns](2), int16(4) 
memory usage: 698.7+ MB 

Maintenant, faire un calcul simple avec dask je reçois les horaires suivants

Utilisation de filetage

>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime() 
'Fri Oct 13 23:44:50 2017' 
141.98732048354384 
'Fri Oct 13 23:44:59 2017' 

Utilisation distribués (groupe local)

>>> c=Client() 
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime() 
'Fri Oct 13 23:47:04 2017' 
141.98732048354384 
'Fri Oct 13 23:47:15 2017' 
>>> 

C'était OK, environ 9 secondes chacun.

Maintenant, en utilisant multitraitement, voici la surprise ...

>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime() 
'Fri Oct 13 23:50:43 2017' 
141.98732048354384 
'Fri Oct 13 23:57:49 2017' 
>>> 

j'attendre multitraitement et distribué/cluster local à être du même ordre de grandeur avec peut-être quelques différences avec filetage (pour bon ou mauvais

Cependant, le multiprocessing prend 47 fois plus de temps pour faire un moyen simple à travers une colonne in16?

Mon env est juste une nouvelle installation de conda avec les modules requis. Pas de cueillette de rien.

pourquoi existe-t-il ces différences? Je ne peux pas gérer dask/distributed pour avoir un comportement prévisible afin de pouvoir choisir judicieusement entre les différents scheduler en fonction de la nature de mon problème.

Ceci est juste un exemple de jouet mais j'ai été incapable d'obtenir un exemple aligné sur mes attentes (comme ma compréhension de la lecture des documents au moins).

Y a-t-il quelque chose que je devrais garder à l'esprit? ou suis-je complètement manquer le point?

Merci

JC

Répondre

1

Avec l'ordonnanceur fileté, chaque tâche a accès à toute la mémoire du processus - toutes les données dans ce cas - et peut donc faire ses calculs sans aucune copie de mémoire. Avec le planificateur distribué, le planificateur sait quel thread et quel opérateur produit les données requises par une tâche ultérieure, ou a déjà ces données en mémoire. L'intelligence du planificateur est spécifiquement orientée vers le déplacement du calcul vers le bon opérateur, pour éviter la communication et la copie de données. Inversement, le planificateur multiprocessus a tendance à envoyer des résultats de tâche vers et depuis le processus principal, ce qui peut impliquer beaucoup de sérialisation et de copie. Certaines tâches peuvent être fusionnées (en combinant des tâches en appelant plusieurs fonctions python dans une chaîne), mais certaines ne le peuvent pas. Toute sérialisation et copie nécessite un effort CPU et, probablement plus important pour vous, de l'espace mémoire. Si vos données d'origine représentent une fraction significative du total du système, vous remplissez probablement la mémoire physique, ce qui entraîne un ralentissement important du facteur.

+0

oui, après quelques luttes et beaucoup de relecture du manuel, je suis arrivé à la même conclusion.Beaucoup d'apprentissage ;-) –