2015-10-11 1 views
3

Je veux lire 10000 messages de Websphere MQ dans des groupes dans l'ordre séquentiel, j'utilise le code ci-dessous pour faire la même chose, mais il prend beaucoup de temps pour lire tous les messages. Même j'ai essayé d'utiliser des concepts multi-thread, mais parfois 2 threads consomment le même groupe et la condition de course qui se passe. Voici l'extrait de code ci-dessous. J'essaie d'utiliser 3 threads pour lire séquentiellement 10 000 messages de MQ, mais deux de mes threads accèdent au même groupe à la fois. Comment éviter cela? Quelle est la meilleure façon de lire un grand volume de messages en séquentiel? Mon exigence est que je veux lire 10000 messages séquentiellement. S'il vous plaît aider.comment lire un grand volume de messages de Websphere MQ

MQConnectionFactory factory = new MQConnectionFactory(); 
factory.setQueueManager("QM_host") 
MQQueue destination = new MQQueue("default"); 
Connection connection = factory.createConnection(); 
connection.start(); 
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 

MessageConsumer lastMessageConsumer = 
    session.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE"); 
TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait(); 
lastMessageConsumer.close(); 

if (lastMessage != null) { 

    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq"); 
    String groupId = lastMessage.getStringProperty("JMSXGroupID"); 

    boolean failed = false; 

    for (int i = 1; (i < groupSize) && !failed; i++) { 

     MessageConsumer consumer = session.createConsumer(destination, 
      "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i); 
     TextMessage message = (TextMessage)consumer.receiveNoWait(); 

     if (message != null) { 
      System.out.println(message.getText()); 
     } else { 
      failed = true; 
     } 

     consumer.close(); 

    } 

    if (failed) { 
     session.rollback(); 
    } else { 
     System.out.println(lastMessage.getText()); 
     session.commit(); 
    } 

} 

connection.close(); 

Répondre

0

Je pense une meilleure façon serait d'avoir un fil de coordinateur dans votre application, qui écouter les derniers messages des groupes et pour chacun commencerait un nouveau thread pour obtenir des messages appartenant au groupe assigné à cette fil. (Dans les threads recevant les messages appartenant à un groupe, vous n'avez pas besoin d'utiliser une boucle for pour récupérer chaque message séparément, mais vous devriez prendre n'importe quel message appartenant à un groupe. groupe, tout en conservant un compteur de groupe et en mettant hors service les messages hors service. Ce serait sûr tant que vous ne commettiez votre session qu'après avoir reçu et traité tous les messages du groupe. (Cela donnerait plus de performance, car chaque groupe serait traité par un thread séparé, et ce thread n'accéderait à chaque message qu'une seule fois dans MQ.)

0

Veuillez consulter la documentation d'IBM au sequential retrieval of messages. Au cas où la page se déplace ou est modifiée, je citerai la partie la plus pertinente. Pour le traitement séquentiel soit garanti, les conditions suivantes doivent être remplies:

  • Toutes les demandes de vente ont été réalisées à partir de la même application.
  • Toutes les demandes de mise provenaient soit de la même unité de travail, soit toutes les demandes de put étaient faites en dehors d'une unité de travail.
  • Les messages ont tous la même priorité.
  • Les messages ont tous la même persistance. Pour la mise en file d'attente distante, la configuration est telle qu'il ne peut y avoir qu'un chemin de l'application effectuant la demande put, via son gestionnaire de files d'attente , par intercommunication, vers le gestionnaire de file d'attente de destination et la file d'attente cible.
  • Les messages ne sont pas placés dans une file d'attente de rebut (par exemple, si une file d'attente est temporairement saturée).
  • L'application qui reçoit le message ne modifie pas délibérément l'ordre de récupération, par exemple en spécifiant un MsgId particulier ou CorrelId ou en utilisant des priorités de message.
  • Une seule application effectue des opérations pour récupérer les messages de la file d'attente de destination. S'il y a plus d'une application , ces applications doivent être conçues pour obtenir tous les messages dans chaque séquence placée par une application émettrice.

Bien que la page ne précise pas explicitement, quand ils disent « une application » ce que l'on entend est un seul fil de cette seule application. Si une application a des threads concurrents, l'ordre de traitement n'est pas garanti.

En outre, la lecture 10.000 messages en une seule unité de travail comme l'a suggéré dans une autre réponse est pas recommandé comme un moyen de préserver l'ordre de message! Seulement le faire si les 10 000 messages doivent réussir ou échouer en tant qu'unité atomique, ce qui n'a rien à voir avec le fait qu'ils aient été reçus dans l'ordre. Dans le cas où un grand nombre de messages doivent être traités dans une seule unité de travail, il est absolument nécessaire de régler la taille des fichiers journaux, et très probablement quelques autres paramètres. Préserver l'ordre de séquence est une torture suffisante pour tout transport de messagerie asynchrone avec thread sans introduire également des transactions massives qui durent très longtemps.

0

Vous pouvez faire ce que vous voulez avec les classes MQ pour Java (non-JMS) et cela peut être possible avec les classes MQ pour JMS mais être très compliqué.

Commencez par lire ceci page à partir du Knowledge MQ.

J'ai converti le pseudo-code (de la page Web ci-dessus) en classes MQ pour Java et l'ai changé d'un accès à un get destructif.

En outre, je préfère faire chaque groupe de messages sous un point de synchronisation (en supposant un groupe de taille raisonnable). Tout d'abord, il vous manque plusieurs drapeaux pour le champ 'options' de GMO (GetMessageOptions) et le champ MatchOptions doit être défini sur 'MQMO_MATCH_MSG_SEQ_NUMBER', afin que tous les threads récupèrent toujours le premier message du groupe le premier message. c'est-à-dire ne pas saisir le 2ème message du groupe pour le premier message comme indiqué ci-dessus.

MQGetMessageOptions gmo = new MQGetMessageOptions(); 
MQMessage rcvMsg = new MQMessage(); 

/* Get the first message in a group, or a message not in a group */ 
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_SYNCPOINT; 
gmo.MatchOptions = CMQC.MQMO_MATCH_MSG_SEQ_NUMBER; 
rcvMsg.messageSequenceNumber = 1; 
inQ.get(rcvMsg, gmo); 

/* Examine first or only message */ 
... 

gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_SYNCPOINT; 
do while ((rcvMsg.messageFlags & CMQC.MQMF_MSG_IN_GROUP) == CMQC.MQMF_MSG_IN_GROUP) 
{ 
    rcvMsg.clearMessage(); 
    inQ.get(rcvMsg, gmo); 
    /* Examine each remaining message in the group */ 
    ... 
} 
qMgr.commit();