2017-09-29 5 views
0

exemple factice exemples:

j'ai une trame de données df:Comment traiter les lignes d'une trame de données de pandas en parallèle dans le python

> df 

     para0 para1 para2 
0 17.439020 True high 
1 19.757758 True high 
2 12.434424 True medium 
3 14.789654 True  low 
4 14.131464 False high 
5 9.900233 True high 
6 10.977869 False  low 
7 8.004251 True medium 
8 11.468420 False  low 
9 12.764453 False high 

dans lequel chaque rangée se compose d'un ensemble de paramètres pour une fonction foobar:

def foobar(r): 
    """ r is a row of df, does something, and it takes a long time""" 
    if r.para1: 
     x = r.para2 
    else: 
     x = 'low' 
    return int(r.para0), (r.Index+13)%3 == 0, x 

Je voudrais demander foobar à chaque rangée de df, recueillir ses résultats , et les stocker avec leurs paramètres respectés dans un, bien, DataFrame.

Ma solution (actuelle):

df['count'] = 0 
df['valid'] = False 
df['outpt'] = '' 

def wrapper(r, df): 
    c, v, o = foobar(r) 
    df.ix[r.Index,'count'] = c 
    df.ix[r.Index,'valid'] = v 
    df.ix[r.Index,'outpt'] = o 

for r in df.itertuples(): 
    wrapper(r, df) 

Cela donne:

> df 
     para0 para1 para2 count valid outpt 
0 17.439020 True high 17.0 False high 
1 19.757758 True high 19.0 False high 
2 12.434424 True medium 12.0 True medium 
3 14.789654 True  low 14.0 False  low 
4 14.131464 False high 14.0 False  low 
5 9.900233 True high 9.0 True high 
6 10.977869 False  low 10.0 False  low 
7 8.004251 True medium 8.0 False medium 
8 11.468420 False  low 11.0 True  low 
9 12.764453 False high 12.0 False  low 

Voici ma question:

Dans la vraie vie, la fonction foobar est de calcul coûteux et prend environ 20-30 min à courir, df a typiquement entre 100-2000 lignes. J'ai accès à une machine avec huit cœurs, et comme foobar ne dépend que de la ligne traitée en cours et ni sur rien d'autre, il devrait être trivial d'exécuter ces calculs en parallèle.

Il serait bien aussi que, quand quelque chose va mal (par exemple, si quelqu'un s'éteint accidentellement la machine), il ne serait pas nécessaire de commencer tout depuis le début, à savoir, pour ignorer les lignes qui ont déjà été traité.

Comment est-ce que je peux faire ceci?


Mon essai sur multiprocessing malheureusement échoué:

from multiprocessing import Pool 

pool = Pool(3) 
results = [] 

for r in df.itertuples(): 
    results += [pool.apply_async(wrapper, r, df)] 

Avec:

> results[0].get() 
… 
/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol) 
    48  def dumps(cls, obj, protocol=None): 
    49   buf = io.BytesIO() 
---> 50   cls(buf, protocol).dump(obj) 
    51   return buf.getbuffer() 
    52 

PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed 

Voici comment je créé le dataframe jouet:

import pandas as pd 
import numpy as np 

df = pd.DataFrame({ 
    'para0' : pd.Series(
     np.random.gamma(12,size=10), 
     dtype=np.float), 
    'para1' : pd.Series(
     [(True,False)[i] for i in np.random.randint(0,2,10)], 
     dtype=np.bool), 
    'para2' : pd.Categorical(
     [('low','medium','high')[i] for i in np.random.randint(0,3,10)], 
     ordered=True), 
    }) 

Répondre

0

Je ne savoir si cela aide, mais essayez de utilisez list au lieu de itertuples.

je veux dire quelque chose comme ceci:

df_list = [[x[0], x[1],x[2]] for x in df.itertuples()] 
for r in df_list: 
    results += [pool.apply_async(wrapper, r, df)]