2014-09-15 3 views
4

Im en utilisant RabbitMQ pour envoyer des informations int simple et bref, tout d'abord j'envoyer id à un projet comme ça:RabbitMQ et SharedQueue fermé

private void SendPgcIdToRabbitMQ(string id) 
    { 
     var factory = new ConnectionFactory() { HostName = "localhost" }; 
     using (var connection = factory.CreateConnection()) 
     { 
      using (var channel = connection.CreateModel()) 
      { 
       bool durable = true; 
       channel.QueueDeclare("XQueue", durable, false, false, null); 

       var body = Encoding.UTF8.GetBytes(id); 

       channel.BasicPublish("", "XQueue", null, body); 
       Console.WriteLine(" [x] Sent {0}", id); 
      } 
     } 
    } 

et auditeur de celui-ci:

public void Listener() 
    { 
     var factory = new ConnectionFactory() { HostName = "localhost" }; 
     using (var connection = factory.CreateConnection()) 
     { 
      using (var channel = connection.CreateModel()) 
      { 
       channel.QueueDeclare("XQueue", true, false, false, null); 

       var consumer = new QueueingBasicConsumer(channel); 
       channel.BasicConsume("XQueue", false, consumer); 

       Console.WriteLine(" [*] Waiting for messages. " + 
            "To exit press CTRL+C"); 
       while (true) { 
        var ea = 
         (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 

        var body = ea.Body; 
        var message = Encoding.UTF8.GetString(body); 
        Console.WriteLine(" [x] Received {0}", message); 

        AddPGCFileID(message); 

        channel.BasicAck(ea.DeliveryTag, false); 

        Thread.Sleep(500); 
       } 
      } 
     } 
    } 

Il fonctionne bien, donc après avoir reçu le message que je suis en train de traiter une opération, alors je reçois deuxième ID et créer une autre file d'attente pour faire de même:

 private void SendSurveyIdToRabbitMQ(int yID) 
    { 
     var factory = new ConnectionFactory() { HostName = "localhost" }; 
     using (var connection = factory.CreateConnection()) { 
      using (var channel = connection.CreateModel()) { 
       bool durable = true; 
       channel.QueueDeclare("YQueue", durable, false, false, null); 

       var body = Encoding.UTF8.GetBytes(yID.ToString()); 

       channel.BasicPublish("", "YQueue", null, body); 
       Console.WriteLine(" [x] Sent {0}", yID); 
      } 
     } 
    } 

et recevoir:

 public void InquiryListener() 
    { 
     var factory = new ConnectionFactory() { HostName = "localhost" }; 
     using (var connection = factory.CreateConnection()) { 
      using (var channel = connection.CreateModel()) { 
       channel.QueueDeclare("YQueue", true, false, false, null); 

       var consumer = new QueueingBasicConsumer(channel); 
       channel.BasicConsume("YQueue", false, consumer); 

       Console.WriteLine(" [*] Waiting for messages. " + 
            "To exit press CTRL+C"); 
       while (true) { 
        var ea = 
         (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 

        var body = ea.Body; 
        var message = Encoding.UTF8.GetString(body); 
        Console.WriteLine(" [x] Received {0}", message); 

        StartProcessing(Convert.ToInt32(message)); 

        channel.BasicAck(ea.DeliveryTag, false); 

        Thread.Sleep(500); 
       } 
      } 
     } 
    } 

file d'attente Première envoi et la réception fonctionne très bien, mais en deuxième je reçois: enter image description here

Il est étrange parce qu'il travaillait cette façon, depuis quelque temps, je suis geting ce problème. J'ai réinitialisé rabbitmq, supprimer toutes les files d'attente, etc. ne trouve pas où est un problème. Des idées? Je voudrais déboguer pour savoir si le deuxième processus se termine correctement (crash accidentel sur les deuxièmes processus ne cause pas de problème avec rabbitmq) et il est passé, je suis supriced parce qu'aucune erreur ne se produit sur YQueue, mais après environ minutes de travailler mon processus (seulement en attente, message non-entrant, non-traitement) Je gey cette même exception sur XQueue

Répondre

1

Vérifiez d'abord si la file d'attente est vide avant d'exécuter while(true){ ... }.