2010-07-09 2 views
36

Je veux un processus de longue durée pour retourner sa progression sur une file d'attente (ou quelque chose de similaire) que je vais alimenter dans une boîte de dialogue de progression. J'ai aussi besoin du résultat lorsque le processus est terminé. Un exemple de test échoue ici avec un RuntimeError: Queue objects should only be shared between processes through inheritance.Comment passez-vous une référence de file d'attente à une fonction gérée par pool.map_async()?

import multiprocessing, time 

def task(args): 
    count = args[0] 
    queue = args[1] 
    for i in xrange(count): 
     queue.put("%d mississippi" % i) 
    return "Done" 

def main(): 
    q = multiprocessing.Queue() 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, [(x, q) for x in range(10)]) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Je suis en mesure d'obtenir ce travail en utilisant des objets individuels de processus (où je suis alowed pour passer une référence file d'attente), mais je n'ai pas une piscine pour gérer les nombreux processus que je veux lancer. Des conseils sur un meilleur modèle pour cela?

+0

Ce n'est pas une réponse à votre question, mais essayez la bibliothèque 'execnet' pour les mappages multi-processus. Le 'multiprocessing 'intégré a encore quelques problèmes à résoudre (voir le tracker Python). De plus, son code source est assez grand et compliqué. La bibliothèque 'execnet' me semble beaucoup mieux que' multiprocessing'. –

Répondre

43

Le code suivant semble fonctionner:

import multiprocessing, time 

def task(args): 
    count = args[0] 
    queue = args[1] 
    for i in xrange(count): 
     queue.put("%d mississippi" % i) 
    return "Done" 


def main(): 
    manager = multiprocessing.Manager() 
    q = manager.Queue() 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, [(x, q) for x in range(10)]) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Notez que la file d'attente est obtenu à partir d'un manager.Queue() plutôt que multiprocessing.Queue(). Merci Alex de m'avoir indiqué dans cette direction.

+0

+1 et juste une note rapide que votre question m'a aidé dans un problème que j'ai eu aujourd'hui. J'avais trouvé la version Manager de la file d'attente, mais mon code ne fonctionnait pas parce que je comptais sur un global. Il doit être passé en paramètre, comme vous le faites. – winwaed

+0

Aussi +1 pour 'manager.Queue' - très utile. – fantabolous

8

Faire q mondiaux travaux ...:

import multiprocessing, time 

q = multiprocessing.Queue() 

def task(count): 
    for i in xrange(count): 
     q.put("%d mississippi" % i) 
    return "Done" 

def main(): 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, range(10)) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Si vous avez besoin de plusieurs files d'attente, par exemple pour éviter de mélanger la progression des différents processus de pool, une liste globale de files d'attente devrait fonctionner (bien sûr, chaque processus devra alors savoir quel index dans la liste à utiliser, mais c'est OK pour passer en argument; -).

+0

Cela fonctionnera-t-il si la "tâche" est définie dans un module ou un paquet différent? L'exemple de code est très simplifié. Le programme réel a une architecture MVC où un pipeline producteur-consommateur est configuré sur plusieurs cœurs (le modèle) et il doit envoyer des mises à jour de progression à l'interface graphique wxPython (la vue). – David

+2

@David, vous pouvez essayer; Si votre vrai code ne fonctionne pas de cette façon simple, vous devrez augmenter d'un cran dans la complexité et opter pour un gestionnaire (qui peut vous donner des proxies aux files d'attente, etc). –

+0

Cela ne semble pas fonctionner du tout. q ne retourne jamais rien q.empty() est toujours vrai sur ma machine. Même si j'augmente l'appel de sommeil à 10 secondes, ce qui devrait être un temps excessif pour que la tâche mette quelques messages dans la file d'attente, q.empty renvoie toujours True. – David

Questions connexes