exemple factice exemples:
j'ai une trame de données df
:Comment traiter les lignes d'une trame de données de pandas en parallèle dans le python
> df
para0 para1 para2
0 17.439020 True high
1 19.757758 True high
2 12.434424 True medium
3 14.789654 True low
4 14.131464 False high
5 9.900233 True high
6 10.977869 False low
7 8.004251 True medium
8 11.468420 False low
9 12.764453 False high
dans lequel chaque rangée se compose d'un ensemble de paramètres pour une fonction foobar
:
def foobar(r):
""" r is a row of df, does something, and it takes a long time"""
if r.para1:
x = r.para2
else:
x = 'low'
return int(r.para0), (r.Index+13)%3 == 0, x
Je voudrais demander foobar
à chaque rangée de df
, recueillir ses résultats , et les stocker avec leurs paramètres respectés dans un, bien, DataFrame.
Ma solution (actuelle):
df['count'] = 0
df['valid'] = False
df['outpt'] = ''
def wrapper(r, df):
c, v, o = foobar(r)
df.ix[r.Index,'count'] = c
df.ix[r.Index,'valid'] = v
df.ix[r.Index,'outpt'] = o
for r in df.itertuples():
wrapper(r, df)
Cela donne:
> df
para0 para1 para2 count valid outpt
0 17.439020 True high 17.0 False high
1 19.757758 True high 19.0 False high
2 12.434424 True medium 12.0 True medium
3 14.789654 True low 14.0 False low
4 14.131464 False high 14.0 False low
5 9.900233 True high 9.0 True high
6 10.977869 False low 10.0 False low
7 8.004251 True medium 8.0 False medium
8 11.468420 False low 11.0 True low
9 12.764453 False high 12.0 False low
Voici ma question:
Dans la vraie vie, la fonction foobar
est de calcul coûteux et prend environ 20-30 min à courir, df
a typiquement entre 100-2000 lignes. J'ai accès à une machine avec huit cœurs, et comme foobar
ne dépend que de la ligne traitée en cours et ni sur rien d'autre, il devrait être trivial d'exécuter ces calculs en parallèle.
Il serait bien aussi que, quand quelque chose va mal (par exemple, si quelqu'un s'éteint accidentellement la machine), il ne serait pas nécessaire de commencer tout depuis le début, à savoir, pour ignorer les lignes qui ont déjà été traité.
Comment est-ce que je peux faire ceci?
Mon essai sur multiprocessing
malheureusement échoué:
from multiprocessing import Pool
pool = Pool(3)
results = []
for r in df.itertuples():
results += [pool.apply_async(wrapper, r, df)]
Avec:
> results[0].get()
…
/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol)
48 def dumps(cls, obj, protocol=None):
49 buf = io.BytesIO()
---> 50 cls(buf, protocol).dump(obj)
51 return buf.getbuffer()
52
PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed
Voici comment je créé le dataframe jouet:
import pandas as pd
import numpy as np
df = pd.DataFrame({
'para0' : pd.Series(
np.random.gamma(12,size=10),
dtype=np.float),
'para1' : pd.Series(
[(True,False)[i] for i in np.random.randint(0,2,10)],
dtype=np.bool),
'para2' : pd.Categorical(
[('low','medium','high')[i] for i in np.random.randint(0,3,10)],
ordered=True),
})