J'utilise py-amqplib pour accéder à RabbitMQ en Python. L'application reçoit des demandes d'écoute sur certains sujets MQ de temps en temps.Comment attendre des messages sur plusieurs files d'attente en utilisant py-amqplib
La première fois qu'il reçoit une telle demande, il crée une connexion AMQP et un canal et commence un nouveau thread pour écouter les messages:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener est très simple:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
Après avoir créé la connexion, il s'abonne au sujet qui vous intéresse, comme suit:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
La première fois que tout cela fonctionne très bien. Cependant, il échoue lors d'une demande ultérieure de s'abonner à un autre sujet. Sur les demandes suivantes, je réutilise la connexion AMQP et le thread AMQPListener (puisque je ne veux pas démarrer un nouveau thread pour chaque rubrique) et quand j'appelle le bloc de code au-dessus de l'appel de méthode channel.queue_declare() ne retourne jamais. J'ai également essayé de créer un nouveau canal à ce moment-là et l'appel connection.channel() ne revient jamais non plus. La seule façon dont j'ai pu le faire fonctionner est de créer un nouveau thread de connexion, de canal et d'écoute par sujet (par exemple, routing_key), mais ce n'est vraiment pas idéal. Je pense que c'est la méthode wait() qui bloque toute la connexion, mais je ne sais pas quoi faire. Je devrais sûrement être capable de recevoir des messages avec plusieurs clés de routage (ou même sur plusieurs canaux) en utilisant un seul thread d'écoute?
Une question connexe est: comment puis-je arrêter le thread écouteur lorsque ce sujet n'est plus d'intérêt? L'appel channel.wait() semble bloquer définitivement s'il n'y a aucun message. La seule façon dont je peux penser est d'envoyer un message factice à la file d'attente qui "l'empoisonner", c'est à dire. être interprété par l'auditeur comme un signal d'arrêt.
Merci, mais le problème est que je mai besoin de s'abonner à de nouveaux sujets à tout moment, à savoir. après que 'channel.wait()' a été appelé. – EMP