2011-03-15 2 views
1

Je développe une application client-serveur simple en python. J'utilise un gestionnaire pour configurer des files d'attente partagées, mais je n'arrive pas à comprendre comment passer un objet arbitraire du serveur au client. Je soupçonne qu'il a quelque chose à voir avec la fonction manager.register, mais ce n'est pas très bien expliqué dans le multiprocessing documentation. Le seul exemple utilise des files d'attente et rien d'autre.Comment transmettre un objet python à l'aide d'un gestionnaire distant?

Voici mon code:

#manager demo.py 
from multiprocessing import Process, Queue, managers 
from multiprocessing.managers import SyncManager 
import time 

class MyObject(): 
    def __init__(self, p, f): 
     self.parameter = p 
     self.processor_function = f 

class MyServer(): 
    def __init__(self, server_info, obj): 
     print '=== Launching Server ... =====' 
     (ip, port, pw) = server_info 
     self.object = obj  #Parameters for task processing 

     #Define queues 
     self._process_queue = Queue()  #Queue of tasks to be processed 
     self._results_queue = Queue()  #Queue of processed tasks to be stored 

     #Set up IS_Manager class and register server functions 
     class IS_Manager(managers.BaseManager): pass 
     IS_Manager.register('get_processQ', callable=self.get_process_queue) 
     IS_Manager.register('get_resultsQ', callable=self.get_results_queue) 
     IS_Manager.register('get_object', callable=self.get_object) 

     #Initialize manager and server 
     self.manager = IS_Manager(address=(ip, port), authkey=pw) 
     self.server = self.manager.get_server() 

     self.server_process = Process(target=self.server.serve_forever) 
     self.server_process.start() 

    def get_process_queue(self): return self._process_queue 
    def get_results_queue(self): return self._results_queue 
    def get_object(self): return self.object 

    def runUntilDone(self, task_list): 
     #Fill the initial queue 
     for t in task_list: 
      self._process_queue.put(t) 

     #Main loop 
     total_tasks = len(task_list) 
     while not self._results_queue.qsize()==total_tasks: 
      time.sleep(.5) 
      print self._process_queue.qsize(), '\t', self._results_queue.qsize() 
      if not self._results_queue.empty(): 
       print '\t', self._results_queue.get() 
      #Do stuff 
      pass 

class MyClient(): 
    def __init__(self, server_info): 
     (ip, port, pw) = server_info 
     print '=== Launching Client ... =====' 

     class IS_Manager(managers.BaseManager): pass 

     IS_Manager.register('get_processQ') 
     IS_Manager.register('get_resultsQ') 
     IS_Manager.register('get_object') 

     #Set up manager, pool 
     print '\tConnecting to server...' 
     manager = IS_Manager(address=(ip, port), authkey=pw) 
     manager.connect() 

     self._process_queue = manager.get_processQ() 
     self._results_queue = manager.get_resultsQ() 
     self.object = manager.get_object() 

     print '\tConnected.' 

    def runUntilDone(self):#, parameters): 
     print 'Starting client main loop...' 

     #Main loop 
     while 1: 
      if self._process_queue.empty(): 
       print 'I\'m bored here!' 
       time.sleep(.5) 
      else: 
       task = self._process_queue.get() 
       print task, '\t', self.object.processor_function(task, self.object.parameter) 

     print 'Client process is quitting. Bye!' 
     self._clients_queue.get() 

Et un simple serveur ...

from manager_demo import * 

def myProcessor(x, parameter): 
    return x + parameter 

if __name__ == '__main__': 
    my_object = MyObject(100, myProcessor) 
    my_task_list = range(1,20) 
    my_server_info = ('127.0.0.1', 8081, 'my_pw') 

    my_crawl_server = MyServer(my_server_info, my_object) 
    my_crawl_server.runUntilDone(my_task_list) 

Et un simple client ...

from manager_demo import * 
if __name__ == '__main__': 
    my_server_info = ('127.0.0.1', 8081, 'my_pw') 
    my_client = MyClient(my_server_info) 
    my_client.runUntilDone() 

Quand je lance ce qu'il se bloque sur :

[email protected]:~/Desktop$ python client.py 
=== Launching Client ... ===== 
    Connecting to server... 
    Connected. 
Starting client main loop... 
2 Traceback (most recent call last): 
    File "client.py", line 5, in <module> 
    my_client.runUntilDone() 
    File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone 
    print task, '\t', self.object.processor_function(task, self.object.parameter) 
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter' 

Pourquoi python n'a-t-il aucun problème avec les files d'attente ou la fonction processeur, mais s'étouffe avec le paramètre objet? Merci!

Répondre

2

Vous rencontrez ce problème car l'attribut parameter sur votre classe MyObject() n'est pas un appelable. Le documentation indique que _exposed_ est utilisé pour spécifier une séquence de noms de méthodes qui sont des proxies pour ce typeid. Dans le cas où aucune liste exposée n'est spécifiée, toutes les "méthodes publiques" de l'objet partagé seront accessibles. (Ici, une « méthode publique » désigne tout attribut qui a un __ appel méthode __() et dont le nom ne commence pas par « _ ».)

Ainsi, vous devrez exposer manuellement l'parameter attribut MyObject, probablement, comme méthode, en changeant votre MyObject():

class MyObject(): 
    def __init__(self, p, f): 
     self._parameter = p 
     self.processor_function = f 

    def parameter(self): 
     return self._parameter 

en outre, vous devrez changer votre tâche:

self.object.processor_function(task, self.object.parameter()) 

HTH.

Questions connexes