2011-03-30 8 views
3

J'ai essayé à la fois le multitraitement inclus dans le paquet python2.6 Ubuntu (__version__ dit 0.70a1) et le dernier de PyPI (2.6.2.1). Dans les deux cas, je ne sais pas comment utiliser correctement imap - cela fait que l'interpréteur cesse de répondre à ctrl-C (la carte fonctionne correctement). pdb montre next() est suspendu à la variable de condition wait() appeler IMapIterator, donc personne ne nous réveille. Des indices? Merci à l'avance.multitraitement Pool.imap cassé?

$ cat /tmp/go3.py 
import multiprocessing as mp 
print mp.Pool(1).map(abs, range(3)) 
print list(mp.Pool(1).imap(abs, range(3))) 

$ python /tmp/go3.py 
[0, 1, 2] 
^C^C^C^C^C^\Quit 

Répondre

9

avis d'abord que cela fonctionne:

import multiprocessing as mp 
import multiprocessing.util as util 
pool=mp.Pool(1) 
print list(pool.imap(abs, range(3))) 

La différence est que pool ne soit pas parachevées lorsque l'appel à pool.imap() extrémités.

En revanche,

print(list(mp.Pool(1).imap(abs, range(3)))) 

provoque l'instance Pool à finaliser rapidement après la fin de l'appel imap. L'absence d'une référence entraîne l'appel du Finalizer (appelé self._terminate dans la classe Pool). Cela déclenche une séquence de commandes qui détruit le thread du gestionnaire de tâches, le thread du gestionnaire de résultat, les sous-processus de travail, etc.

Tout cela se produit si rapidement qu'au moins sur la majorité des analyses, la tâche envoyée à la tâche gestionnaire ne complète pas.

Voici les bits correspondants de code:

De /usr/lib/python2.6/multiprocessing/pool.py:

class Pool(object): 
    def __init__(self, processes=None, initializer=None, initargs=()): 
     ... 
     self._terminate = Finalize(
      self, self._terminate_pool, 
      args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 
        self._task_handler, self._result_handler, self._cache), 
      exitpriority=15 
      ) 

/usr/lib/python2.6/multiprocessing/ util.py:

class Finalize(object): 
    ''' 
    Class which supports object finalization using weakrefs 
    ''' 
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 
     ... 
     if obj is not None: 
      self._weakref = weakref.ref(obj, self) 

Les weakref.ref(obj,self) les causes self() à appeler quand obj est sur le point d'être finalisé. J'ai utilisé la commande de débogage util.log_to_stderr(util.SUBDEBUG) pour apprendre la séquence d'événements. Par exemple:

import multiprocessing as mp 
import multiprocessing.util as util 
util.log_to_stderr(util.SUBDEBUG) 

print(list(mp.Pool(1).imap(abs, range(3)))) 

cède

[DEBUG/MainProcess] created semlock with handle 3077013504 
[DEBUG/MainProcess] created semlock with handle 3077009408 
[DEBUG/MainProcess] created semlock with handle 3077005312 
[DEBUG/MainProcess] created semlock with handle 3077001216 
[INFO/PoolWorker-1] child process calling self.run() 
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {} 
[DEBUG/MainProcess] finalizing pool 
... 

et comparer avec

import multiprocessing as mp 
import multiprocessing.util as util 
util.log_to_stderr(util.SUBDEBUG) 
pool=mp.Pool(1) 
print list(pool.imap(abs, range(3))) 

qui donne

[DEBUG/MainProcess] created semlock with handle 3078684672 
[DEBUG/MainProcess] created semlock with handle 3078680576 
[DEBUG/MainProcess] created semlock with handle 3078676480 
[DEBUG/MainProcess] created semlock with handle 3078672384 
[INFO/PoolWorker-1] child process calling self.run() 
[DEBUG/MainProcess] doing set_length() 
[0, 1, 2] 
[INFO/MainProcess] process shutting down 
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15> 
... 
[DEBUG/MainProcess] finalizing pool