2013-04-12 3 views
2

J'ai une grande liste contenant des chaînes codées binaires que j'ai utilisé pour traiter en une seule fonction avant, comme ceci:Python multitraitement

""" just included this to demonstrate the 'data' structure """ 
data=np.zeros(250,dtype='float32, (250000,2)float32') 

def func numpy_array(data, peaks): 
rt_counter=0 
    for x in peaks: 
     if rt_counter %(len(peaks)/20) == 0: 
      update_progress() 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      if (index % 2 == 0): 
       data[rt_counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[rt_counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     rt_counter+=1 

J'ai lu sur multitraitement et pensé que je voulais essayez de voir que si je pouvais obtenir une forte augmentation de la performance, je réécris ma fonction dans 2 (aide et « appelant ») comme ceci:

def numpy_array(data, peaks): 
    processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now) 
    pool = mp.Pool(processes=processors) 
    chunk_size=len(peaks)/processors 
    for i in range(processors): 
     counter = i*chunk_size 
     chunk=peaks[i*chunk_size:(i+1)*chunk_size-1] 
     pool.map(decode(data,chunk,counter)) 

def decode(data,chunk,counter): 
    for x in chunk: 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      if (index % 2 == 0): 
       data[counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     print data[counter][1][10][0] 
     counter+=1  

Le programme fonctionne, mais seulement utilise 100-110% du CPU (selon le dessus) et une fois qu'il devrait être fini il jette TypeError: map() takes at least 3 arguments (2 given) à moi, pourrait anyo avec un peu plus d'expérience avec le multiprocess me donner un indice quant à ce que les choses à surveiller (cela pourrait causer le TypeError)? Qu'est-ce qui pourrait causer mon utilisation faible de cpu?

- code après des réponses incorporant -

def decode((data,chunk,counter)): 
    print len(chunk), counter 
    for x in chunk: 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      if (index % 2 == 0): 
       data[counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     counter+=1 

def numpy_array(data, peaks): 
    """Fills the NumPy array 'data' with m/z-intensity values acquired 
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'. 

    The m/z values are assumed to be ordered without validating this 
    assumption. 

    Note: This function uses multi-processing 
    """ 
    processors=mp.cpu_count() 
    pool = mp.Pool(processes=processors) 
    chunk_size=int(len(peaks)/processors) 
    map_parameters=[] 
    for i in range(processors): 
     counter = i*chunk_size 
     chunk=peaks[i*chunk_size:(i+1)*chunk_size-1] 
     map_parameters.append((data,chunk,counter)) 
    pool.map(decode,map_parameters) 

Cette dernière version « fonctionne » pour autant qu'il remplit le tableau dans les processus (où le tableau contient des valeurs) mais une fois que tous les processus sont effectués l'accès au tableau ne donne que des valeurs nulles parce que chaque processus obtient une copie locale du tableau.

+0

Le premier argument à défaut multiprocessing.Pool au nombre de processeurs. – cdarke

Répondre

2

Quelque chose comme cela devrait fonctionner

Notez que pool.map prend une fonction et une liste de paramètres pour cette fonction pour chaque appel. Dans votre exemple original, vous l'appelez simplement dans la fonction numpy_array.

La fonction ne doit avoir qu'un argument, d'où l'empilement des arguments dans un tuple et les doubles parenthèses plutôt étranges dans decode (qui est appelé tuple unpacking).

def numpy_array(data, peaks): 
    processors=4 
    pool = mp.Pool(processes=processors) 
    chunk_size=len(data)/processors 
    print range(processors) 
    map_parameters = [] # new 
    for i in range(processors): 
     counter = i*chunk_size 
     chunk=peaks[i*chunk_size:(i+1)*chunk_size-1] 
     map_parameters.append((data,chunk,counter)) # new 
    pool.map(decode, map_parameters) # new 

def decode((data,chunk,counter)): # changed 
    for x in chunk: 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      if (index % 2 == 0): 
       data[counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     print data[counter][1][10][0] 
     counter+=1 
1

Le bug est dans votre fonction numpy_array:

for i in range(processors): 
    counter = i*chunk_size 
    chunk=peaks[i*chunk_size:(i+1)*chunk_size-1] 
    pool.map(decode(data,chunk,counter)) 

Le problème est que vous appelez map séquentiellement de sorte que vous ne diffusez un processus à la fois. En outre, je ne pense pas que vous appelez map correctement que vous faites pool.map(f(*args)) lorsque la signature est map(f, ['list', 'of', 'data']).

Je voudrais utiliser un partiel afin que vous ne créez pas de copies de data car je suppose que le tableau est assez grand ou pourrait être plus grand dans le futur.

Cela devrait être:

import functools 
decode_with_data = functools.partial(decode, data) 
args = [] 
for i in range(processors): 
    counter = i * chunk_size 
    chunk = peaks[1*chunk_size:(i+1)*chunk_size-1] 
    args.append(chunk, counter) 
pool.map(decode_with_data, args) 
+0

Je vous donne un +1 juste pour me pointer vers functools.partial. Je n'ai jamais entendu parler de ça et ça ressemble à une bibliothèque vraiment soignée –