2011-01-25 4 views
4

J'ai écrit une fonction python qui scrute un système de fichiers en utilisant un modèle de répertoire fourni, avec des actions facultatives à prendre fournies à chaque niveau. J'ai ensuite essayé le multi-threading car certains des volumes sont sur des partages réseau et je veux minimiser le blocage des E/S. J'ai commencé par utiliser la classe Pool multitraitement, puisque c'était plus pratique ... (sérieusement, pas de classe Pool pour le threading) Ma fonction démêle le plus possible le pattern FS fourni et soumet les chemins retournés au pool jusqu'à ce que pas de nouveaux chemins sont retournés. J'ai obtenu ceci pour fonctionner bien en utilisant la fonction et la classe directement, mais maintenant j'essaye d'employer cette fonction d'une autre classe et mon programme semble se bloquer. Pour simplifier j'ai réécrit la fonction en utilisant Threads au lieu de processus, et même écrit une simple classe ThreadPool ... même problème. Voici une version très simplifiée du code qui présente toujours les mêmes problèmes:Multithreading Python FS Crawler

file test1.py: 
------------------------------------------------ 

import os 
import glob 
from multiprocessing import Pool 

def mapGlob(pool,paths,pattern): 
    results = [] 
    paths = [os.path.join(p,pattern) for p in paths] 
    for result in pool.map(glob.glob,paths): 
     results += result 
    return results 

def findAllMyPaths(): 
    pool = Pool(10) 
    paths = ['/Volumes'] 
    follow = ['**','ptid_*','expid_*','slkid_*'] 
    for pattern in follow: 
     paths = mapGlob(pool,paths,pattern) 
    return paths 


file test2.py: 
---------------------------------------------------------------------------- 

from test1 import findAllMyPaths 

allmypaths = findAllMyPaths() 

Maintenant, si je l'appelle

>>>from test1 import findAllMyPaths 
>>>findAllMyPaths() 
>>>...long list of all the paths 

cela fonctionne très bien, mais si essayer:

>>>from test2 import allmypaths 

se bloque python pour toujours. Les fonctions d'action sont appelées (dans cet exemple glob), mais elles ne semblent jamais revenir ... J'ai besoin d'aide s'il vous plaît ... la version parallélisée fonctionne beaucoup plus vite quand elle fonctionne correctement (6-20X plus vite en fonction des 'actions' sont en train d'être mappés à chaque point de l'arbre FS), donc j'aimerais pouvoir l'utiliser.

aussi si je change la fonction de mappage à une version non-parallèle:

def mapGlob(pool,paths,pattern): 
    results = [] 
    paths = [os.path.join(p,pattern) for p in paths] 
    for path in paths: 
     results += glob.glob(path) 
    return results 

tout fonctionne bien.

Edit:

je me suis tourné le débogage à multitraitement pour voir si cela pouvait me aider davantage. Dans le cas où cela fonctionne, je reçois:

[DEBUG/MainProcess] created semlock with handle 5 
[DEBUG/MainProcess] created semlock with handle 6 
[DEBUG/MainProcess] created semlock with handle 9 
[DEBUG/MainProcess] created semlock with handle 10 
[INFO/PoolWorker-1] child process calling self.run() 
[INFO/PoolWorker-2] child process calling self.run() 
[INFO/PoolWorker-3] child process calling self.run() 
[INFO/PoolWorker-5] child process calling self.run() 
[INFO/PoolWorker-4] child process calling self.run() 
[INFO/PoolWorker-6] child process calling self.run() 
[INFO/PoolWorker-7] child process calling self.run() 
[INFO/PoolWorker-9] child process calling self.run() 
[INFO/PoolWorker-8] child process calling self.run() 
[INFO/PoolWorker-10] child process calling self.run() 
[DEBUG/MainProcess] closing pool 
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {} 
[DEBUG/MainProcess] finalizing pool 
[DEBUG/MainProcess] helping task handler/workers to finish 
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished 
[DEBUG/MainProcess] task handler got sentinel 
[DEBUG/MainProcess] task handler sending sentinel to result handler 
[DEBUG/MainProcess] task handler sending sentinel to workers 
[DEBUG/MainProcess] task handler exiting 
[DEBUG/MainProcess] result handler got sentinel 
[DEBUG/MainProcess] ensuring that outqueue is not full 
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0 
[DEBUG/PoolWorker-2] worker got sentinel -- exiting 
[DEBUG/PoolWorker-1] worker got sentinel -- exiting 
[INFO/PoolWorker-2] process shutting down 
[DEBUG/PoolWorker-7] worker got sentinel -- exiting 
[INFO/PoolWorker-1] process shutting down 
[INFO/PoolWorker-7] process shutting down 
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers 
[INFO/PoolWorker-7] process exiting with exitcode 0 
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers 
[INFO/PoolWorker-1] process exiting with exitcode 0 
[DEBUG/PoolWorker-5] worker got sentinel -- exiting 
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0 
[INFO/PoolWorker-5] process shutting down 
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers 
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers 
[INFO/PoolWorker-2] process exiting with exitcode 0 
[INFO/PoolWorker-5] process exiting with exitcode 0 
[DEBUG/PoolWorker-6] worker got sentinel -- exiting 
[INFO/PoolWorker-6] process shutting down 
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers 
[INFO/PoolWorker-6] process exiting with exitcode 0 
[DEBUG/PoolWorker-4] worker got sentinel -- exiting 
[DEBUG/PoolWorker-9] worker got sentinel -- exiting 
[INFO/PoolWorker-9] process shutting down 
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers 
[INFO/PoolWorker-9] process exiting with exitcode 0 
[INFO/PoolWorker-4] process shutting down 
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers 
[INFO/PoolWorker-4] process exiting with exitcode 0 
[DEBUG/PoolWorker-10] worker got sentinel -- exiting 
[INFO/PoolWorker-10] process shutting down 
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers 
[INFO/PoolWorker-10] process exiting with exitcode 0 
[DEBUG/PoolWorker-8] worker got sentinel -- exiting 
[INFO/PoolWorker-8] process shutting down 
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers 
[INFO/PoolWorker-8] process exiting with exitcode 0 
[DEBUG/PoolWorker-3] worker got sentinel -- exiting 
[INFO/PoolWorker-3] process shutting down 
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0 
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers 
[INFO/PoolWorker-3] process exiting with exitcode 0 
[DEBUG/MainProcess] terminating workers 
[DEBUG/MainProcess] joining task handler 
[DEBUG/MainProcess] joining result handler 
[DEBUG/MainProcess] joining pool workers 

et quand il ne tout ce que je reçois est:

[DEBUG/MainProcess] created semlock with handle 6 
[DEBUG/MainProcess] created semlock with handle 7 
[DEBUG/MainProcess] created semlock with handle 10 
[DEBUG/MainProcess] created semlock with handle 11 
[INFO/PoolWorker-1] child process calling self.run() 
[INFO/PoolWorker-2] child process calling self.run() 
[INFO/PoolWorker-3] child process calling self.run() 
[INFO/PoolWorker-8] child process calling self.run() 
[INFO/PoolWorker-5] child process calling self.run() 
[INFO/PoolWorker-4] child process calling self.run() 
[INFO/PoolWorker-9] child process calling self.run() 
[INFO/PoolWorker-6] child process calling self.run() 
[INFO/PoolWorker-7] child process calling self.run() 
[INFO/PoolWorker-10] child process calling self.run() 
+0

Je ne vois pas comment ce code pourrait fonctionner. Vous passez 'mapGlob' à' pool.map', mais 'mapGlob' prend 3 paramètres et les fonctions passées à' pool.map' doivent prendre 1 paramètre. – interjay

+0

désolé, oui bien sûr ... J'ai réécrit l'exemple à partir de zéro et une erreur. l'a corrigé. – Cyclone

Répondre

1

Pas une solution complète, mais j'ai trouvé un moyen de faire fonctionner le code dans l'une ou l'autre guise: de l'interpréteur ou comme code dans un script en cours d'exécution. Je pense que le problème a à voir avec la note suivante dans les documents multiprocessing:

La fonctionnalité dans ce package nécessite que la méthode principale soit importable par les enfants. Ceci est couvert dans les directives de programmation, mais il convient de souligner ici. Cela signifie que certains exemples, tels que les exemples multiprocessing.Pool, ne fonctionneront pas dans l'interpréteur interactif. Je ne suis pas sûr pourquoi cette limitation existe, et pourquoi je peux encore parfois utiliser un pool de l'interpréteur interactif et parfois non, mais oh bien ....

pour la contourner, je fais ce qui suit dans un module qui pourrait utiliser multitraitement:

import __main__ 
__SHOULD_MULTITHREAD__ = False 
if hasattr(__main__,'__file__'): 
    __SHOULD_MULTITHREAD__ = True 

le reste du code dans ce module peut alors vérifier ce drapeau pour voir si elle doit utiliser une piscine ou tout simplement exécuter sans parallélisation. Pour ce faire, je peux toujours utiliser et tester des fonctions parallélisées dans des modules de l'interpréteur interactif, ils fonctionnent simplement beaucoup plus lentement.

0

Si je ne me trompe pas, ne devrait pas ressembler à la test2.py ce

from test1 import findAllMyPaths 
allmypaths = findAllMyPaths 

puis

from test2 import allmypaths 
allmypaths() 
+0

J'essaye de simuler avoir un global qui contient tous les chemins ... Il est seulement peuplé une fois quand la classe est utilisée la première fois ... Dans mon code actuel ce global est stocké comme une variable de classe, et cette classe peut ensuite utiliser ces chemins pour instancier un objet en utilisant ce chemin sans avoir à faire une nouvelle analyse à chaque fois. – Cyclone