2010-01-17 6 views
45

Je vais avoir des problèmes avec le module de multitraitement. J'utilise un pool de travailleurs avec sa méthode map pour charger des données à partir de beaucoup de fichiers et pour chacun d'eux j'analyse des données avec une fonction personnalisée. Chaque fois qu'un fichier a été traité, j'aimerais qu'un compteur soit mis à jour afin que je puisse garder une trace du nombre de fichiers à traiter. est un exemple de code ici:Python multitraitement et un compteur partagé

def analyze_data(args): 
    # do something 
    counter += 1 
    print counter 


if __name__ == '__main__': 

    list_of_files = os.listdir(some_directory) 

    global counter 
    counter = 0 

    p = Pool() 
    p.map(analyze_data, list_of_files) 

Je ne peux pas trouver une solution pour cela.

Répondre

48

Le problème est que la variable counter n'est pas partagée entre vos processus: chaque processus distinct crée sa propre instance locale et l'incrémente.

Voir this section de la documentation pour certaines techniques que vous pouvez utiliser pour partager l'état entre vos processus. Dans votre cas, vous pouvez partager une instance Value entre vos employés

Voici une version de travail de votre exemple (avec des données d'entrée fictives). Notez qu'il utilise des valeurs globales que je voudrais vraiment essayer d'éviter dans la pratique:

from multiprocessing import Pool, Value 
from time import sleep 

counter = None 

def init(args): 
    ''' store the counter for later use ''' 
    global counter 
    counter = args 

def analyze_data(args): 
    ''' increment the global counter, do something with the input ''' 
    global counter 
    # += operation is not atomic, so we need to get a lock: 
    with counter.get_lock(): 
     counter.value += 1 
    print counter.value 
    return args * 10 

if __name__ == '__main__': 
    #inputs = os.listdir(some_directory) 

    # 
    # initialize a cross-process counter and the input lists 
    # 
    counter = Value('i', 0) 
    inputs = [1, 2, 3, 4] 

    # 
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    # 
    p = Pool(initializer = init, initargs = (counter,)) 
    i = p.map_async(analyze_data, inputs, chunksize = 1) 
    i.wait() 
    print i.get() 
+0

Bonne réponse! J'ai eu le même problème dans IronPython, et tandis que multiprocessing.Value n'est pas disponible, vous pouvez faire quelque chose de similaire avec clr.Reference et System.Threading.Interlocked: http://stackoverflow.com/questions/2255461/how-to-atomically- increment-a-static-member-in-ironpython/2314858 # 2314858 –

+3

@jkp, comment le feriez-vous sans la variable globale? - J'essaie d'utiliser une classe, mais ce n'est pas aussi facile que ça en a l'air. Voir http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-pythons-multiprocessing-pool-ma – Anna

+18

Malheureusement, cet exemple semble être imparfaite, puisque 'counter.value + = 1 'n'est pas atomique entre les processus, donc la valeur sera erronée si elle est exécutée assez longtemps avec quelques processus –

24

classe compteur sans bug condition de course:

class Counter(object): 
    def __init__(self): 
     self.val = multiprocessing.Value('i', 0) 

    def increment(self, n=1): 
     with self.val.get_lock(): 
      self.val.value += n 

    @property 
    def value(self): 
     return self.val.value 
+0

Pour un code similaire qui fonctionne avec '' joblib's Parallel' (le code dans cette réponse ne fonctionne pas avec 'joblib'), voir https://github.com/davidheryanto/etc/blob/master/python-recipes/parallel-joblib-counter.py –

0

plus rapide classe compteur sans utiliser le verrou intégré de la valeur deux fois

class Counter(object): 
    def __init__(self, initval=0): 
     self.val = multiprocessing.RawValue('i', initval) 
     self.lock = multiprocessing.Lock() 

    def increment(self): 
     with self.lock: 
      self.val.value += 1 

    @property 
    def value(self): 
     return self.val.value 

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

Questions connexes