2011-11-26 3 views
4

Je souhaite ajouter une liste de dicts avec le module de multiprocesseur python.Problème de verrouillage multiprocesseur python

Voici une version simplifiée de mon code:

#!/usr/bin/python2.7 
# -*- coding: utf-8 -*- 

import multiprocessing 
import functools 
import time 

def merge(lock, d1, d2): 
    time.sleep(5) # some time consuming stuffs 
    with lock: 
     for key in d2.keys(): 
      if d1.has_key(key): 
       d1[key] += d2[key] 
      else: 
       d1[key] = d2[key] 

l = [{ x % 10 : x } for x in range(10000)] 
lock = multiprocessing.Lock() 
d = multiprocessing.Manager().dict() 

partial_merge = functools.partial(merge, d1 = d, lock = lock) 

pool_size = multiprocessing.cpu_count() 
pool = multiprocessing.Pool(processes = pool_size) 
pool.map(partial_merge, l) 
pool.close() 
pool.join() 

print d 
  1. Je reçois cette erreur lors de l'exécution de ce script. Comment dois-je résoudre cela?

    RuntimeError: Lock objects should only be shared between processes through inheritance

  2. est le lock en fonction merge nécessaire dans cet état? ou python s'en occupera?

  3. Je pense que map supposé faire est de mapper quelque chose d'une liste à une autre liste, ne pas vider toutes les choses dans une liste à un seul objet. Alors, y a-t-il une façon plus élégante de faire de telles choses?

Répondre

11

Ce qui suit devrait fonctionner multi-plateforme (à savoir sur Windows aussi) dans les deux Python 2 et 3. Il utilise un initialiseur pool de processus pour définir le gestionnaire dict en tant que global dans chaque processus enfant.

Pour votre information:

  • L'utilisation d'un verrou est inutile avec un gestionnaire dict.
  • Le nombre de processus dans un Pool est par défaut le nombre de CPU.
  • Si le résultat ne vous intéresse pas, vous pouvez utiliser apply_async au lieu de map.
import multiprocessing 
import time 

def merge(d2): 
    time.sleep(1) # some time consuming stuffs 
    for key in d2.keys(): 
     if key in d1: 
      d1[key] += d2[key] 
     else: 
      d1[key] = d2[key] 

def init(d): 
    global d1 
    d1 = d 

if __name__ == '__main__': 

    d1 = multiprocessing.Manager().dict() 
    pool = multiprocessing.Pool(initializer=init, initargs=(d1,)) 

    l = [{ x % 5 : x } for x in range(10)] 

    for item in l: 
     pool.apply_async(merge, (item,)) 

    pool.close() 
    pool.join() 

    print(l) 
    print(d1) 
Questions connexes