2010-09-21 5 views
8

Je souhaite envoyer un message à un serveur RabbitMQ, puis attendre un message de réponse (dans une file d'attente de "réponse à"). Bien sûr, je ne veux pas attendre indéfiniment dans le cas où l'application traitant ces messages est en panne - il doit y avoir un délai d'expiration. Cela semble être une tâche très basique, mais je ne trouve pas de moyen de le faire. J'ai maintenant rencontré ce problème avec l'API Java.RabbitMQ Attendre un message avec un délai d'expiration

Répondre

0

com.rabbitmq.client.QueueingConsumer a une méthode nextDelivery(long timeout), qui fera ce que vous voulez. Cependant, cela a été déprécié. L'écriture de votre propre délai d'attente n'est pas si difficile, bien qu'il soit préférable d'avoir un thread continu et une liste d'identifiants en temps réel, plutôt que d'ajouter et de supprimer des consommateurs et des threads de timeout associés tout le temps.

Editer pour ajouter: A noté la date sur ce après avoir répondu!

0

J'ai abordé ce problème en utilisant C# en créant un objet pour garder une trace de la réponse à un message particulier. Il configure une file d'attente de réponse unique pour un message et s'y abonne. Si la réponse n'est pas reçue dans un laps de temps spécifié, un compte à rebours annule l'abonnement, ce qui supprime la file d'attente. Séparément, j'ai des méthodes qui peuvent être synchrones à partir de mon thread principal (utilise un sémaphore) ou asynchrone (utilise un rappel) pour utiliser cette fonctionnalité.

Fondamentalement, la mise en œuvre ressemble à ceci:

//Synchronous case: 
//Throws TimeoutException if timeout happens 
var msg = messageClient.SendAndWait(theMessage); 

//Asynchronous case 
//myCallback receives an exception message if there is a timeout 
messageClient.SendAndCallback(theMessage, myCallback); 
2

La bibliothèque cliente Java RabbitMQ maintenant supports a timeout argument to its QueueConsumer.nextDelivery() method.

Par exemple, le tutoriel RPC utilise le code suivant:

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    if (delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 

Maintenant, vous pouvez utiliser consumer.nextDelivery(1000) attendre au maximum une seconde. Si le délai d'expiration est atteint, la méthode renvoie null.

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    // Use a timeout of 1000 milliseconds 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000); 

    // Test if delivery is null, meaning the timeout was reached. 
    if (delivery != null && 
     delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
}