2017-07-03 1 views
0

J'utilise Apache ActiveMQ pour mettre en file d'attente une grande quantité de messages, puis les supprimer en fin de journée. Cependant, je suis confus au sujet du mode de fonctionnement d'ActiveMQ. Sur mon PC, je n'ai pas installé ActiveMQ en tant que service, ni j'ai installé un serveur quelque part. Je viens comprenait la « ActiveMQ-all-5.14.5.jar » comme une dépendance Maven sur mon projet et je suis en utilisant le code suivant à ce jour:Configurer Apache ActiveMQ par programme

public static void main(String[] args) throws URISyntaxException, Exception { 
    BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)")); 
    broker.start(); 
    Connection connection = null; 
    try { 
     // Producer 
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848"); 
     connection = connectionFactory.createConnection(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue queue = session.createQueue("customerQueue"); 
     String payload = "Important Task"; 
     Message msg = session.createTextMessage(payload); 
     MessageProducer producer = session.createProducer(queue); 
     System.out.println("Sending text '" + payload + "'"); 
     msg.setLongProperty("_AMQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000L); 
     producer.send(msg); 

     // Consumer 
     MessageConsumer consumer = session.createConsumer(queue); 
     connection.start(); 
     QueueBrowser browser = session.createBrowser(queue); 
     while (browser.getEnumeration().hasMoreElements()) { 
      TextMessage textMsg = (TextMessage) consumer.receive(); 
      browser.getEnumeration().nextElement(); 
      System.out.println(textMsg); 
      System.out.println("Received: " + textMsg.getText()); 
     } 

     session.close(); 
    } finally { 
     if (connection != null) { 
      connection.close(); 
     } 
     broker.stop(); 
    } 
} 

Comme vous pouvez le voir, je veux retarder une message de 5 secondes (ou plus, cela peut varier) mais dans chaque guide que j'ai trouvé, je suis chargé de configurer le fichier de configuration XML. Mais, il s'agit d'un fichier utilisé lorsque vous exécutez ActiveMQ en tant que service. J'emploie actuellement juste la bibliothèque de pot. Initialement, j'ai installé le serveur Glassgfish pour utiliser JMS afin de mettre en file d'attente tous les messages mais depuis j'ai abandonné le projet mais l'adresse IP est encore utilisée depuis ActiveMQ (localhost: 4848).

Notez que ce qui suit est un exemple parfaitement fonctionnel -KahaDB est également utilisé pour stocker les messages en cas de panne du serveur.

En ce qui me concerne, ActiveMQ démarre un serveur local à partir du STS que j'utilise ce code mais où est le fichier de configuration? Puis-je modifier ses propriétés par programme?

+0

Avez-vous essayé quelque chose comme 'broker.setSchedulerSupport (true)'? (voir http://activemq.apache.org/maven/5.11.0/apidocs/org/apache/activemq/broker/BrokerService.html#setSchedulerSupport(boolean)) – Tome

+0

Je viens de le faire, ça n'a pas marché. – Lefteris008

+0

Êtes-vous sûr de la propriété '_AMQ_SCHED_DELIVERY' que vous utilisez? La propriété ActiveMQ serait plutôt quelque chose comme 'AMQ_SCHEDULED_DELAY' (voir http://activemq.apache.org/delay-and-schedule-message-delivery.html) – Tome

Répondre

2

Cela devrait fonctionner (fonctionne pour nous avec ActiveMQ 5.12.3). Assurez-vous d'abord de nettoyer votre magasin KahaDB, afin d'éviter que les messages précédents soient lus.

public static void main(String[] args) throws Exception { 
    BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:4848)")); 
    broker.setSchedulerSupport(true); 
    broker.start(); 
    Connection connection = null; 
    try { 
     // Producer 
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:4848"); 
     connection = connectionFactory.createConnection(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue queue = session.createQueue("customerQueue"); 
     String payload = "Important Task"; 
     Message msg = session.createTextMessage(payload); 
     MessageProducer producer = session.createProducer(queue); 
     System.out.println("Sending text '" + payload + "'"); 
     msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L); 
     producer.send(msg); 
     connection.start(); 

     // Consumer 
     MessageConsumer consumer = null; 
     consumer = session.createConsumer(queue); 

     QueueBrowser browser = session.createBrowser(queue); 
     while (browser.getEnumeration().hasMoreElements()) { 
      TextMessage textMsg = (TextMessage) consumer.receive(); 
      browser.getEnumeration().nextElement(); 
      System.out.println(textMsg); 
      System.out.println("Received: " + textMsg.getText()); 
     } 

     session.close(); 
    } finally{ 
     if (connection != null) { 
      connection.close(); 
     } 
     broker.stop(); 
    } 
} 

La première course propre (avec un magasin KahaDB vide) ne doit pas sortie

"Reçu: tâche importante"

, alors que le second, si vous ne supprimez pas les fichiers de données entre.

Retrait de la ligne `

msg.setLongProperty (ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);

rendrait la première sortie de course propre "Reçu: tâche importante"

+0

La suppression de 'KahaDB' a résolu le problème. Merci! – Lefteris008