J'ai une grande liste contenant des chaînes codées binaires que j'ai utilisé pour traiter en une seule fonction avant, comme ceci:Python multitraitement
""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')
def func numpy_array(data, peaks):
rt_counter=0
for x in peaks:
if rt_counter %(len(peaks)/20) == 0:
update_progress()
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[rt_counter][1][peak_counter][0]=float(buff2)
else:
data[rt_counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
rt_counter+=1
J'ai lu sur multitraitement et pensé que je voulais essayez de voir que si je pouvais obtenir une forte augmentation de la performance, je réécris ma fonction dans 2 (aide et « appelant ») comme ceci:
def numpy_array(data, peaks):
processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
pool = mp.Pool(processes=processors)
chunk_size=len(peaks)/processors
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
pool.map(decode(data,chunk,counter))
def decode(data,chunk,counter):
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
print data[counter][1][10][0]
counter+=1
Le programme fonctionne, mais seulement utilise 100-110% du CPU (selon le dessus) et une fois qu'il devrait être fini il jette TypeError: map() takes at least 3 arguments (2 given)
à moi, pourrait anyo avec un peu plus d'expérience avec le multiprocess me donner un indice quant à ce que les choses à surveiller (cela pourrait causer le TypeError)? Qu'est-ce qui pourrait causer mon utilisation faible de cpu?
- code après des réponses incorporant -
def decode((data,chunk,counter)):
print len(chunk), counter
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
counter+=1
def numpy_array(data, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors=mp.cpu_count()
pool = mp.Pool(processes=processors)
chunk_size=int(len(peaks)/processors)
map_parameters=[]
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
map_parameters.append((data,chunk,counter))
pool.map(decode,map_parameters)
Cette dernière version « fonctionne » pour autant qu'il remplit le tableau dans les processus (où le tableau contient des valeurs) mais une fois que tous les processus sont effectués l'accès au tableau ne donne que des valeurs nulles parce que chaque processus obtient une copie locale du tableau.
Le premier argument à défaut multiprocessing.Pool au nombre de processeurs. – cdarke