2017-07-16 3 views
0

J'ai 2 files d'attente, disons q1 et q2, qui correspondent aux échanges e1 et e2 avec la clé de liaison b1 et b2. Je veux exécuter des fonctions de consommation en parallèle, disons c1 et c2 qui écouteront respectivement q1 et q2. J'ai essayé de la manière suivante:Consommation multiple dans rabbitmq pour plusieurs files d'attente

def c1(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e1', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q1') 
    queue_name = result.method.queue 
    binding_key = "b1" 
    channel.queue_bind(exchange='e1', 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

def c2(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e2', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q2') 
    queue_name = result.method.queue 
    binding_key = "b2" 
    channel.queue_bind(exchange=e1, 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

if __name__ == '__main__': 
    c1() 
    c2() 

Cependant, il écoute uniquement la fonction c1 et la fonction c2, il ne reçoit pas exécuté. Comment puis-je exécuter les deux fonctions? Merci d'avance.

EDIT: J'ai méthode c1 et c1 dans 2 module différent (fichier)

+0

Vous devriez utiliser le module de thread python ou une alternative au blocage de la connexion. – alphiii

Répondre

1

Pour exécuter les deux fonctions en même temps une méthode de multithreading doit être en ordre. S'il vous plaît jeter un oeil here pour quelques exemples de python.

Voici votre code modifié avec la classe Process. Il peut également utiliser thread ou l'exécuter explicitement à partir du système d'exploitation.

import pika 
from multiprocessing import Process 


def callback(): 
    print 'callback got data' 


class c1(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e1', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q1') 
     queue_name = result.method.queue 
     binding_key = "b1" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 
     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 


class c2(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e2', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q2') 
     queue_name = result.method.queue 
     binding_key = "b2" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 

     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 

if __name__ == '__main__': 
    subscriber_list = [] 
    subscriber_list.append(c1()) 
    subscriber_list.append(c2()) 

    # execute 
    process_list = [] 
    for sub in subscriber_list: 
     process = Process(target=sub.run) 
     process.start() 
     process_list.append(process) 

    # wait for all process to finish 
    for process in process_list: 
     process.join()