Je travaille sur l'implémentation d'Akka Alpakka pour la consommation depuis et la production de files d'attente ActiveMQ, en Java. Je peux consommer de la file d'attente avec succès, mais je n'ai pas encore été en mesure d'implémenter l'accusé de réception de message au niveau de l'application.Reconnaissance manuelle des messages ActiveMQ avec Alpakka
Mon but est de consommer des messages d'une file d'attente et de les envoyer à un autre acteur pour traitement. Lorsque cet acteur a terminé le traitement, je veux qu'il puisse contrôler l'accusé de réception du message dans ActiveMQ. Vraisemblablement, cela serait fait en envoyant un message à un autre acteur qui peut faire l'accusé de réception, en appelant une fonction d'accusé de réception sur le message lui-même, ou d'une autre manière.
Dans mon test, 2 messages sont placés dans la file d'attente AlpakkaTest, puis ce code tente de les consommer et de les acquitter. Cependant, je ne vois pas un moyen de définir la session ActiveMQ à CLIENT_ACKNOWLEDGE, et je ne vois aucune différence de comportement avec ou sans l'appel à m.acknowledge();
. Pour cette raison, je pense que les messages sont toujours auto-reconnus.
Est-ce que quelqu'un connaît la manière acceptée de configurer des sessions ActiveMQ pour CLIENT_ACKNOWLEDGE et de reconnaître manuellement les messages ActiveMQ dans les systèmes Java Akka en utilisant Alpakka?
La fonction de test pertinente est:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.
Source<Message, NotUsed> jmsSource = JmsSource.create(
JmsSourceSettings.create(connectionFactory)
.withQueue("AlpakkaTest")
.withBufferSize(2)
);
Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.
try {
List<Message> messages = jmsSource
.take(2)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(4, TimeUnit.SECONDS);
for(Message m:messages) {
System.out.println("Found Message ID: " + m.getJMSMessageID());
try {
m.acknowledge();
} catch(JMSException jmsException) {
System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
}
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
} catch (TimeoutException e1) {
e1.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
Ce code imprime:
Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1
Merci. L'exigence d'accusé de réception concerne plus l'intégrité du message, mais l'application de la contre-pression de cette manière est certainement une bonne information. Je vais enquêter sur Streamz aussi. –