2009-11-27 5 views
3

J'utilise py-amqplib pour accéder à RabbitMQ en Python. L'application reçoit des demandes d'écoute sur certains sujets MQ de temps en temps.Comment attendre des messages sur plusieurs files d'attente en utilisant py-amqplib

La première fois qu'il reçoit une telle demande, il crée une connexion AMQP et un canal et commence un nouveau thread pour écouter les messages:

connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False) 
    channel = connection.channel() 

    listener = AMQPListener(channel) 
    listener.start() 

AMQPListener est très simple:

class AMQPListener(threading.Thread): 
    def __init__(self, channel): 
     threading.Thread.__init__(self) 
     self.__channel = channel 

    def run(self): 
     while True: 
      self.__channel.wait() 

Après avoir créé la connexion, il s'abonne au sujet qui vous intéresse, comme suit:

channel.queue_declare(queue = queueName, exclusive = False) 
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True) 
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination) 

def receive_callback(msg): 
    self.queue.put(msg.body) 

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback) 

La première fois que tout cela fonctionne très bien. Cependant, il échoue lors d'une demande ultérieure de s'abonner à un autre sujet. Sur les demandes suivantes, je réutilise la connexion AMQP et le thread AMQPListener (puisque je ne veux pas démarrer un nouveau thread pour chaque rubrique) et quand j'appelle le bloc de code au-dessus de l'appel de méthode channel.queue_declare() ne retourne jamais. J'ai également essayé de créer un nouveau canal à ce moment-là et l'appel connection.channel() ne revient jamais non plus. La seule façon dont j'ai pu le faire fonctionner est de créer un nouveau thread de connexion, de canal et d'écoute par sujet (par exemple, routing_key), mais ce n'est vraiment pas idéal. Je pense que c'est la méthode wait() qui bloque toute la connexion, mais je ne sais pas quoi faire. Je devrais sûrement être capable de recevoir des messages avec plusieurs clés de routage (ou même sur plusieurs canaux) en utilisant un seul thread d'écoute?

Une question connexe est: comment puis-je arrêter le thread écouteur lorsque ce sujet n'est plus d'intérêt? L'appel channel.wait() semble bloquer définitivement s'il n'y a aucun message. La seule façon dont je peux penser est d'envoyer un message factice à la file d'attente qui "l'empoisonner", c'est à dire. être interprété par l'auditeur comme un signal d'arrêt.

Répondre

1

Si vous voulez plus d'un comsumer par canal il suffit de connecter un autre en utilisant basic_consume() et utiliser channel.wait() après. Il écoutera toutes les files d'attente jointes via basic_consume(). Assurez-vous de définir des tags de consommateur différents pour chaque basic_consume().

Utilisez channel.basic_cancel (consumer_tag) si vous souhaitez annuler un client spécifique dans une file d'attente (annulation de l'écoute d'un sujet spécifique).

+0

Merci, mais le problème est que je mai besoin de s'abonner à de nouveaux sujets à tout moment, à savoir. après que 'channel.wait()' a été appelé. – EMP

Questions connexes