2011-01-25 4 views
2

Je présente ci-dessous un exemple de multi-traitement. Ceci est un modèle de pool de processus. Ce n'est pas aussi simple que cela pourrait l'être, mais il est relativement proche de la structure du code que j'utilise actuellement. Il utilise aussi sqlalchemy, désolé. Ma question est - J'ai actuellement une situation où j'ai un script Python relativement long qui exécute un certain nombre de fonctions qui ressemblent chacune au code ci-dessous, donc le processus parent est le même dans tous les cas. En d'autres termes, plusieurs pools sont créés par un script python. (Je n'ai pas à le faire de cette façon, je suppose, mais l'alternative est d'utiliser quelque chose comme os.system et subprocess.) Le problème est que ces processus traînent et restent en mémoire. Les docs disent que ces processus démons sont censés rester en place jusqu'à ce que le processus parent se termine, mais que se passe-t-il si le processus parent continue alors à générer un autre pool ou processus et ne quitte pas immédiatement. Appeler terminate() fonctionne, mais cela ne semble pas terriblement poli. Existe-t-il un bon moyen de demander aux processus de se terminer correctement? C'est à dire. nettoyer après vous et partez maintenant, je dois commencer la prochaine piscine?Processus démon final avec module multi-traitement

J'ai également essayé d'appeler join() sur les processus. Selon la documentation, cela signifie attendre que les processus se terminent. Et s'ils ne prévoient pas de se terminer? Ce qui se passe réellement, c'est que le processus est bloqué.

Merci d'avance.

Cordialement, Faheem.

import multiprocessing, time 

class Worker(multiprocessing.Process): 
    """Process executing tasks from a given tasks queue""" 
    def __init__(self, queue, num): 
     multiprocessing.Process.__init__(self) 
     self.num = num 
     self.queue = queue 
     self.daemon = True 

    def run(self): 
     import traceback 
     while True: 
      func, args, kargs = self.queue.get() 
      try: 
       print "trying %s with args %s"%(func.__name__, args) 
       func(*args, **kargs) 
      except: 
       traceback.print_exc() 
      self.queue.task_done() 

class ProcessPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.queue = multiprocessing.JoinableQueue() 
     self.workerlist = [] 
     self.num = num_threads 
     for i in range(num_threads): 
      self.workerlist.append(Worker(self.queue, i)) 

    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.queue.put((func, args, kargs)) 

    def start(self): 
     for w in self.workerlist: 
      w.start() 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.queue.join() 
     for worker in self.workerlist: 
      print worker.__dict__ 
      #worker.terminate()  <--- terminate used here 
      worker.join()    <--- join used here 

start = time.time() 

from sqlalchemy import * 
from sqlalchemy.orm import * 

dbuser = '' 
password = '' 
dbname = '' 
dbstring = "postgres://%s:%[email protected]:5432/%s"%(dbuser, password, dbname) 
db = create_engine(dbstring, echo=True) 
m = MetaData(db) 

def make_foo(i): 
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True)) 

conn = db.connect() 
for i in range(10): 
    conn.execute("DROP TABLE IF EXISTS foo%s"%i) 
conn.close() 

for i in range(10): 
    make_foo(i) 

m.create_all() 

def do(i, dbstring): 
    dbstring = "postgres://%s:%[email protected]:5432/%s"%(dbuser, password, dbname) 
    db = create_engine(dbstring, echo=True) 
    Session = scoped_session(sessionmaker()) 
    Session.configure(bind=db) 
    Session.execute("ALTER TABLE foo%s SET (autovacuum_enabled = false);"%i) 
    Session.execute("ALTER TABLE foo%s SET (autovacuum_enabled = true);"%i) 
    Session.commit() 

pool = ProcessPool(5) 
for i in range(10): 
    pool.add_task(do, i, dbstring) 
pool.start() 
pool.wait_completion() 

Répondre

3

Vous savez multiprocessing vous avez déjà des classes pour les pools de travailleurs, n'est-ce pas?

La méthode standard est d'envoyer vos fils un signal de fin:

queue.put(("QUIT", None, None)) 

vérifier cela vaut le coup:

if func == "QUIT": 
    return 
+0

Bonjour Thomas. Merci pour la réponse utile. Oui, je pensais à utiliser multiprocess.Pool. Peut-être que ce serait mieux qu'une solution faite maison. S'il vous plaît commenter si vous pensez que c'est le cas. Hmm, map_async ressemble au ticket. Je peux faire pool.map_async (do, range (10), callback = results.append). Bien que j'aimerais être capable de passer plus d'un argument. Merci également pour la suggestion de queue.put. Je regardais les signaux et les tuyaux, ce qui est inutilement compliqué. Blog intéressant, btw. Cordialement, Faheem. –

+0

De rien. En général, j'opte toujours pour une solution prête à l'emploi à moins qu'il y ait une raison pour laquelle elle ne fait pas ce que vous voulez. Vous pouvez seulement passer un argument avec n'importe quel type de fonction 'map', mais il y a des façons de le contourner. –

+0

Eh bien, la chose la plus simple à faire est d'envelopper tous les arguments dans un argument, par exemple. en utilisant un dictionnaire. Je ne suis pas clair sur les avantages entre apply_async et map_async, mais je pense qu'un objet résultat est meilleur que beaucoup, donc je suppose que je vais utiliser map_async. Merci. –

0

Ma façon de faire face à c'était:

import multiprocessing 

for prc in multiprocessing.active_children(): 
    prc.terminate() 

J'aime ça plus que je n'ai pas à polluer la fonction de travail avec une clause if.

Questions connexes