2017-08-21 6 views
2

Je travaille sur l'implémentation d'Akka Alpakka pour la consommation depuis et la production de files d'attente ActiveMQ, en Java. Je peux consommer de la file d'attente avec succès, mais je n'ai pas encore été en mesure d'implémenter l'accusé de réception de message au niveau de l'application.Reconnaissance manuelle des messages ActiveMQ avec Alpakka

Mon but est de consommer des messages d'une file d'attente et de les envoyer à un autre acteur pour traitement. Lorsque cet acteur a terminé le traitement, je veux qu'il puisse contrôler l'accusé de réception du message dans ActiveMQ. Vraisemblablement, cela serait fait en envoyant un message à un autre acteur qui peut faire l'accusé de réception, en appelant une fonction d'accusé de réception sur le message lui-même, ou d'une autre manière.

Dans mon test, 2 messages sont placés dans la file d'attente AlpakkaTest, puis ce code tente de les consommer et de les acquitter. Cependant, je ne vois pas un moyen de définir la session ActiveMQ à CLIENT_ACKNOWLEDGE, et je ne vois aucune différence de comportement avec ou sans l'appel à m.acknowledge();. Pour cette raison, je pense que les messages sont toujours auto-reconnus.

Est-ce que quelqu'un connaît la manière acceptée de configurer des sessions ActiveMQ pour CLIENT_ACKNOWLEDGE et de reconnaître manuellement les messages ActiveMQ dans les systèmes Java Akka en utilisant Alpakka?

La fonction de test pertinente est:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test. 

Source<Message, NotUsed> jmsSource = JmsSource.create(
    JmsSourceSettings.create(connectionFactory) 
     .withQueue("AlpakkaTest") 
     .withBufferSize(2) 
); 

Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function. 

try { 
    List<Message> messages = jmsSource 
     .take(2) 
     .runWith(Sink.seq(), materializer) 
     .toCompletableFuture().get(4, TimeUnit.SECONDS); 

    for(Message m:messages) { 
     System.out.println("Found Message ID: " + m.getJMSMessageID()); 

     try { 
      m.acknowledge(); 
     } catch(JMSException jmsException) { 
      System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")"); 
     } 
    } 
} catch (InterruptedException e1) { 
    e1.printStackTrace(); 
} catch (ExecutionException e1) { 
    e1.printStackTrace(); 
} catch (TimeoutException e1) { 
    e1.printStackTrace(); 
} catch (JMSException e) { 
    e.printStackTrace(); 
} 

Ce code imprime:

Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1 
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1 

Répondre

3

Mise à jour: Le mode d'accusé de réception est configurable dans le connecteur JMS depuis Alpakka 0.15. De la documentation liée:

Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings 
    .create(connectionFactory) 
    .withQueue("test") 
    .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge()) 
); 

CompletionStage<List<String>> result = jmsSource 
    .take(msgsIn.size()) 
    .map(message -> { 
     String text = ((ActiveMQTextMessage)message).getText(); 
     message.acknowledge(); 
     return text; 
    }) 
    .runWith(Sink.seq(), materializer); 

de la version 0.11, le connecteur JMS de Alpakka ne prend pas en charge la reconnaissance des messages au niveau de l'application. Alpakka crée en interne un Session avec le CLIENT_ACKNOWLEDGE mode here et accuse chaque message here dans le MessageListener interne. L'API n'expose pas ces paramètres pour le remplacement.

Il existe un ticket ouvert qui traite de l'activation de l'accusé de réception en aval des sources basées sur des files d'attente, mais ce ticket a été inactif pendant un certain temps.

Actuellement, vous ne pouvez pas empêcher Alpakka d'accuser réception des messages au niveau JMS. Cependant, cela ne vous empêche pas d'ajouter une étape à votre flux qui envoie chaque message à un acteur pour traitement et utilise les réponses de l'acteur comme signaux de contre-pression. Le Akka Streams documentation décrit comment faire cela avec une combinaison de mapAsync et ask ou avec Sink.actorRefWithAck. Par exemple, pour utiliser l'ancien:

Timeout askTimeout = Timeout.apply(4, TimeUnit.SECONDS); 

jmsSource 
    .mapAsync(2, msg -> ask(processorActor, msg, askTimeout)) 
    .runWith(Sink.seq(), materializer); 

(Side note: Dans le projet StreamZ lié, il a été récemment ouvert ticket pour permettre la reconnaissance de niveau d'application.Streamz remplace l'ancien module akka-camel et, comme Alpakka, est construit sur Akka Streams. Streamz a également une API Java et est listed dans la documentation Alpakka en tant que connecteur externe.)

+0

Merci. L'exigence d'accusé de réception concerne plus l'intégrité du message, mais l'application de la contre-pression de cette manière est certainement une bonne information. Je vais enquêter sur Streamz aussi. –

2

En regardant le code source pour le Alpakka JmsSourceStage il reconnaît déjà chaque message entrant pour vous (et il est la session est une session Client Ack). De ce que je peux dire de la source il n'y a pas de mode qui vous permet de faire la reconnaissance des messages.

Vous pouvez afficher le code source de Alpakka here.