2011-06-05 3 views
5

Voici mon script.zéro mq pub/sub avec multipart ne fonctionne pas


#!/usr/bin/env python 

import traceback 
import sys 
import zmq 
from time import sleep 

print "Creating the zmq.Context" 
context = zmq.Context() 

print "Binding the publisher to the local socket at port 5557" 
sender = context.socket(zmq.PUB) 
sender.bind("tcp://*:5557") 

print "Binding the subscriber to the local socket at port 5557" 
receiver = context.socket(zmq.SUB) 
receiver.connect("tcp://*:5557") 

print "Setting the subscriber option to get only those originating from \"B\"" 
receiver.setsockopt(zmq.SUBSCRIBE, "B") 

print "Waiting a second for the socket to be created." 
sleep(1) 

print "Sending messages" 
for i in range(1,10): 
    msg = "msg %d" % (i) 
    env = None 
    if i % 2 == 0: 
     env = ["B", msg] 
    else: 
     env = ["A", msg] 
    print "Sending Message: ", env 
    sender.send_multipart(env) 

print "Closing the sender." 
sender.close() 

failed_attempts = 0 
while failed_attempts < 3: 
    try: 
     print str(receiver.recv_multipart(zmq.NOBLOCK)) 
    except: 
     print traceback.format_exception(*sys.exc_info()) 
     failed_attempts += 1 

print "Closing the receiver." 
receiver.close() 

print "Terminating the context." 
context.term() 

""" 
Output: 

Creating the zmq.Context 
Binding the publisher to the local socket at port 5557 
Binding the subscriber to the local socket at port 5557 
Setting the subscriber option to get only those originating from "B" 
Waiting a second for the socket to be created. 
Sending messages 
Sending Message: ['A', 'msg 1'] 
Sending Message: ['B', 'msg 2'] 
Sending Message: ['A', 'msg 3'] 
Sending Message: ['B', 'msg 4'] 
Sending Message: ['A', 'msg 5'] 
Sending Message: ['B', 'msg 6'] 
Sending Message: ['A', 'msg 7'] 
Sending Message: ['B', 'msg 8'] 
Sending Message: ['A', 'msg 9'] 
Closing the sender. 
['B', 'msg 2'] 
['B', 'msg 4'] 
['B', 'msg 6'] 
['B', 'msg 8'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
Closing the receiver. 
Terminating the context. 
""" 

Et, la question est ... pourquoi ce code ne fonctionne-t-il pas? [EDITION] Après avoir reçu une réponse super rapide sur la liste de diffusion zeromq, j'ai mis à jour le code ci-dessus.

Répondre

8

Crédit: Chuck Remes

Vous pouvez avoir besoin d'un "sommeil" entre les étapes de création de socket (bind, connectez-vous, setsockopt) et la transmission effective des messages. Les opérations de connexion bind & sont asynchrones, elles peuvent donc ne pas se terminer lorsque vous arrivez à la logique qui envoie tous les messages. Dans ce cas, tous les messages envoyés via le socket PUB seront abandonnés car une opération zmq_bind() ne crée pas de file d'attente tant qu'un autre socket ne s'est pas connecté avec succès. En remarque, il n'est pas nécessaire de créer 2 contextes dans cet exemple. Les deux sockets peuvent être créés dans le même contexte. Cela ne fait pas de mal, mais ce n'est pas non plus nécessaire.

Crédit: Pieter

Il y a un "résolution de problèmes" à la fin de CH1 qui explique cela.

Certains types de socket (ROUTER et PUB) laisseront silencieusement tomber les messages pour qu'ils n'ont pas de destinataires. La connexion est, comme l'a dit Chuck, asynchrone et prend environ 100msec. Si vous démarrez deux threads, liez d'un côté, connectez l'autre, puis commencez immédiatement à envoyer des données sur un tel type de socket, vous perdrez les 100 premiers ms de données (environ).

Faire un sommeil est une option brutale de «prouver que ça marche». Vous pouvez synchroniser d'une manière ou d'une autre (plus généralement) la perte de messages lors du démarrage normal (c'est-à-dire voir les données publiées sous la forme d'une diffusion pure sans début ni fin définitifs). Voir l'exemple de mise à jour de la météo, syncpub et syncsub pour plus de détails.

Questions connexes