2011-08-24 2 views
3

Quelle est la manière la plus correcte d'établir une communication bidirectionnelle entre les processus utilisant 0mq? J'ai besoin de créer plusieurs processus d'arrière-plan qui vont attendre les commandes du processus principal, effectuer quelques calculs et renvoyer le résultat au processus principal.Connexion un-à-plusieurs 0mq

Répondre

5

Il y a quelques façons de le faire. L'approche la plus directe pourrait consister à utiliser les sockets REQ/REP. Chaque processus d'arrière-plan/travailleur aurait une prise REP, et vous utilisez une prise REQ pour communiquer avec eux:

import zmq 

def worker(addr): 
    context = zmq.Context() 
    socket = context.socket(zmq.REP) 
    socket.bind(addr) 
    while True: 
     # get message from boss 
     msg = socket.recv() 
     # ...do smth 
     # send back results 
     socket.send(msg) 

if __name__ == '__main__': 
    # spawn 5 workers 
    from multiprocessing import Process 
    for i in range(5): 
     Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start() 

Il faudrait se connecter à chaque travailleur de leur envoyer un message et retourner les résultats:

context = zmq.Context() 
socket = context.socket(zmq.REQ) 
socket.connect(worker_addr) 
socket.send('message') 
msg = socket.recv() 

Une autre approche serait d'utiliser PUB/SUB pour déclencher des messages aux travailleurs et PUSH/PULL pour récolter les résultats:

import zmq 

def worker(worker_id, publisher_addr, results_addr): 
    context = zmq.Context() 
    sub = context.socket(zmq.SUB) 
    sub.connect(publisher_addr) 
    sub.setsockopt(zmq.SUBSCRIBE, worker_id) 
    push = context.socket(zmq.PUSH) 
    push.connect(results_addr) 

    while True: 
     msg = sub.recv_multipart()[1] 
     # do smth, send off results 
     push.send_multipart([worker_id, msg]) 

if __name__ == '__main__': 
    publisher_addr = 'tcp://127.0.0.1:5000' 
    results_addr = 'tcp://127.0.0.1:5001' 

    # launch some workers into space 
    from multiprocessing import Process 
    for i in range(5): 
     Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start() 

Pour diffuser une commande à un travailleur spécifique, vous feriez quelque chose comme:

context = zmq.Context() 
pub = context.socket(zmq.PUB) 
pub.bind(publisher_addr) 
# send message to worker-1 
pub.send_multipart(['worker-1', 'hello']) 

Pull dans les résultats:

context = zmq.Context() 
pull = context.socket(zmq.PULL) 
pull.bind(results_addr) 

while True: 
    worker_id, result = pull.recv_multipart() 
    print worker_id, result 
3

Envisagez d'utiliser Request Reply Broker mais remplacez le socket REQ par le DEALER. Le DEALER n'est pas bloquant pour l'envoi et équilibrera automatiquement le trafic vers vos employés.

Dans l'image Client serait votre main process et Service A/B/C sont background processes (workers). Main process doit se lier à un point de terminaison. Workers doit se connecter au point de terminaison du processus principal pour recevoir les éléments de travail.

Dans main process conserver la liste des éléments de travail et envoyer l'heure. S'il n'y a pas de réponse pendant un certain temps, renvoyez simplement l'élément de travail, car worker est probablement mort.

Request Reply Broker

Questions connexes