2009-08-24 14 views
2

J'essaie de comprendre comment écrire un programme en python qui utilise la file d'attente multi-traitement.Multiprocessing avec file d'attente renouvelable

Je dispose de plusieurs serveurs et un d'entre eux fournira la file d'attente à distance avec ceci:

from multiprocessing.managers import BaseManager 
import Queue 
import daemonme 

queue = Queue.Queue() 

class QueueManager(BaseManager): 
    pass 

daemonme.createDaemon() 
QueueManager.register('get_job', callable=lambda:queue) 
m = QueueManager(address=('', 50000), authkey='') 
s = m.get_server() 
s.serve_forever() 

Maintenant, je veux utiliser mon double Xeon, serveur quad core pour traiter les travaux hors de cette file d'attente à distance. Les emplois sont totalement indépendants les uns des autres. Donc, si j'ai 8 cœurs, je voudrais commencer 7 processus qui choisissent un travail de la file d'attente, le traiter, puis revenir à la suivante. Chacun des 7 processus va le faire, mais je ne peux pas me concentrer sur la structure de ce programme.

Quelqu'un peut-il me fournir quelques idées éclairées sur la structure de base de ce?

Merci d'avance.

Répondre

0

Vous devez utiliser le modèle maître-esclave (ou agriculteur-travailleur). Le processus initial serait le maître et crée les emplois. Il

  1. crée une file d'attente
  2. crée 7 processus esclaves, en passant la file d'attente en tant que paramètre
  3. commence à écrire des emplois dans la file d'attente

Les processus esclaves lues en continu à partir de la file d'attente, et effectuer les travaux (peut-être jusqu'à ce qu'ils reçoivent un message d'arrêt de la file d'attente). Il n'est pas nécessaire d'utiliser les objets Manager dans ce scénario, AFAICT.

+0

Comment gérer cette mise en œuvre * files d'attente à distance *? Je pense que multiprocessing.managers est un très bon choix s'il a besoin de partager des ressources à distance. – drAlberT

2

Regardez la doc comment retreive une file d'attente du manager (paragraphe 17.6.2.7) qu'avec un pool (paragraphe 17.6.2.9) des travailleurs lancent 7 emplois qui passent la file d'attente à chacun.

en alternative, vous pouvez penser à quelque chose comme un problème producteur/consommateur:

from multiprocessing.managers import BaseManager 
import random 

class Producer(): 
def __init__(self): 
    BaseManager.register('queue') 
    self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs') 
    self.m.connect() 
    self.cm_queue = self.m.queue() 
    while 1: 
     time.sleep(random.randint(1,3)) 
     self.cm_queue.put(<PUT-HERE-JOBS>) 

from multiprocessing.managers import BaseManager 
import time 
import random 
class Consumer(): 
def __init__(self): 
    BaseManager.register('queue') 

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs') 
    self.m.connect() 
    self.queue = self.m.queue() 
    while 1: 
     <EXECUTE(job = self.queue.get())> 


from multiprocessing.managers import BaseManager, Queue 
class Manager(): 

def __init__(self): 

    self.queue = QueueQueu() 

    BaseManager.register('st_queue', callable=lambda:self.queue) 

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs') 
    self.s = self.m.get_server() 

    self.s.serve_forever() 
+0

J'ai ce travail (merci). Ce que j'ai besoin de savoir, c'est dans votre section quelle est la meilleure façon de traiter ces tâches? Ce sont tous des fichiers python, donc serait-il préférable de les exécuter en tant que module? Ou devraient-ils être exécutés sous un processus python séparé avec le module de sous-processus? – WeWatchYourWebsite

+0

regardez le module OS et sa méthode exec par exemple – DrFalk3n