2009-12-14 3 views
5

J'ai actuellement un problème de perte de message. Cette erreur se produit rarement, mais arrive assez souvent pour être ennuyante. Voici le contexte du problème:Est-il possible de perdre des messages en utilisant MSMQ MessageQueue.Peek avec un délai d'expiration?

  • J'ai activé le journal des messages sur goldmine_service_queue, un MSMQ sur le serveur Windows 2003.
  • Je peux prouver que le message est en train d'être inséré dans goldmine_service_queue puisque le message apparaît dans le journal des messages. Cette information donne des informations sur le moment où le message a disparu.
  • Les fonctions de journalisation utilisent http://logging.apache.org/log4net/index.html
  • Les journaux ne présentent pas d'erreurs.
  • La fonction worker (illustrée ci-dessous) s'exécute dans un thread d'un service Windows. Il est responsable de jeter un coup d'œil aux messages (éléments de travail) de la file d'attente et de les traiter. À partir des journaux, je soupçonne fortement que mon problème peut être lié au comportement MessageQueue.Peek et au délai d'expiration.

Est-il possible que le timeout et la réception de message se produisent en même temps? Y a-t-il une meilleure façon pour moi de gérer les arrêts de service pour éviter cette erreur?

 private void workerFunction() 
    { 

     logger.Info("Connecting to queue: " + Settings.Default.goldmine_service_queue); 
     MessageQueue q = new MessageQueue(Settings.Default.goldmine_service_queue); 
     q.Formatter = new ActiveXMessageFormatter(); 

     while (serviceStarted) 
     { 

      Message currentMessage = null; 

      try 
      { 
       currentMessage = q.Peek(new TimeSpan(0,0,30)); 
      } 
      catch (System.Messaging.MessageQueueException mqEx) 
      { 
       if (mqEx.ToString().Contains("Timeout for the requested operation has expired")) 
       { 
        logger.Info("Check for service stop request"); 
       } 
       else 
       { 
        logger.Error("Exception while peeking into MSMQ: " + mqEx.ToString()); 
       } 
      } 
      catch (Exception e) 
      { 
       logger.Error("Exception while peeking into MSMQ: " + e.ToString()); 
      } 

      if (currentMessage != null) 
      { 

       logger.Info(currentMessage.Body.ToString()); 
       try 
       { 
        ProcessMessage(currentMessage); 
       } 
       catch (Exception processMessageException) 
       { 
        logger.Error("Error in process message: " + processMessageException.ToString()); 
       } 

       //Remove message from queue. 
       logger.Info("Message removed from queue."); 
       q.Receive(); 
       //logPerformance(ref transCount, ref startTime); 
      } 


     }//end while 

     Thread.CurrentThread.Abort(); 

    } 

Répondre

5

Je ne pense pas que les messages devraient manquer d'après un examen rapide, mais vous travaillez de manière très étrange avec beaucoup de possibilités pour les conditions de course. Pourquoi ne pas simplement recevoir le message et le passer à ProcessMessage (si ProcessMessage échoue, vous effectuez une lecture quand même). Si vous devez gérer plusieurs destinataires, effectuez la réception dans une transaction MSMQ afin que le message ne soit pas disponible pour les autres destinataires, mais ne soit pas supprimé de la file d'attente tant que la transaction n'est pas validée. De même, plutôt que d'interroger la file d'attente, pourquoi ne pas effectuer une réception asynchrone et laisser le pool de threads gérer l'achèvement (où vous devez appeler EndReceive). Cela évite d'attacher un thread et vous n'avez pas besoin d'un arrêt de service spécial (fermez la file d'attente des messages, puis appelez le MessageQueue.ClearConnectionCache();).

Également, abandonner le thread est un très mauvais moyen de quitter, juste revenir de la fonction de démarrage du fil.

+0

Merci pour la réponse: - en ce qui concerne les conditions de course: Je ne suis pas sûr de voir les conditions de course que vous mentionnez. Selon http://msdn.microsoft.com/en-us/library/t5te2tk0.aspx, MessageQueue.Peek (Timeout) est un appel bloquant. - Je suis préoccupé par l'utilisation d'un modèle asynchrone car ProcessMessage effectue certaines opérations COM. Je ne sais pas si ces opérations sont thread safe. Je suis d'accord pour essayer d'utiliser Receive() au lieu de peak, sauf que je ne sais pas comment créer l'opportunité d'arrêter la boucle while en raison d'un arrêt de service. –

+0

Re: COM dans 'ProcessMessage': ne devrait pas poser de problème si un seul thread exécute' ProcessMessage' (ce qui peut être réalisé dans le cas asynchrone en démarrant seulement une nouvelle réception asynchrone après la fin de 'ProcessMessage'.) En async Si vous fermez la file d'attente (et vider le cache --- en fonction d'une recherche, cela semble être nécessaire pour vraiment fermer la connexion) et toute réception en attente sera annulée Il n'y a pas de boucle à annuler – Richard

1

Je suis vouloir que ce changement currentMessage = q.Peek(new TimeSpan(0,0,30)); à currentMessage = q.Receive(); va résoudre votre problème. J'ai utilisé MSMQ pour passer des messages dans le même contexte, mais seulement utiliser peek (et attendre une exception timeout) pour déterminer si la file d'attente est vide. L'appel à Receive est bloquant si vous planifiez en conséquence. Editer: aussi pour votre exception attraper - vous pouvez vérifier si votre exception est une exception de délai d'expiration en comparant le code d'erreur. Où mqe est votre exception de file d'attente de messages. Vous pourriez aimer cela mieux qu'une comparaison de chaîne.

+0

J'utilise 'Message rsp = colaStatus.ReceiveByCorrelationId (correlationId, nouveau TimeSpan (0, timeOut, 0)); 'Je suis confus si 5 minutes est un niveau très élevé pour le délai d'attente A mon humble avis, je pense que mieux vaut moins d'une minute. – Kiquenet

3

Juste quelques commentaires pour clarifier comment MSMQ fonctionne ici.

"Je peux prouver que le message est en cours d'insertion dans goldmine_service_queue car le message apparaît dans le journal des messages.

Le message est placé dans la file d'attente du journal lorsque le message d'origine est supprimé de la file d'attente goldmine_service_queue. Vous pouvez donc dire que le message a été remis avec succès à la file d'attente ET qu'il a été supprimé avec succès de la file d'attente. "Je suspecte fortement que mon problème puisse être lié au comportement MessageQueue.Peek et au délai d'attente."

Un coup d'oeil ne fait rien pour supprimer le message de la file d'attente. Seulement "q.Receive()" est-ce que. Dans votre code, il n'y a pas de connexion explicite entre le message en cours de lecture et celui qui est reçu. "q.Receive();" dit simplement "recevoir le message du haut de la file". Dans un environnement multithread, vous pouvez vous attendre à ce que les messages soient lus de manière incohérente: certains peuvent être consultés et traités plusieurs fois. Vous devriez obtenir l'ID du message "Peeked" et utiliser ReceiveByID pour que vous puissiez seulement recevoir le message "peeked".

Questions connexes