2017-07-03 1 views
0

Je suis une classe de configuration JmsConfig qui gère les événements JMS à partir d'un sujet de la manière suivante:meilleure façon propre à migrer événement JMS écouter Spring Integration avec Spring Boot

  • Il définit un @Bean ConnectionFactory, contenant une mise en œuvre ActiveMQ
  • il définit un @Bean JmsListenerContainerFactory instancier un DefaultJmsListenerContainerFactory et passer à travers de Boot DefaultJmsListenerContainerFactoryConfigurer
  • il définit un @Bean MessageConverter contenant un paramètre MappingJackson2MessageConverter et une coutume ObjectMapper
  • J'utilise @JmsListener annotation pointant vers myfactory sur une méthode de mon service. C'est la seule utilisation que j'ai pour le sujet, l'abonnement seul.

Maintenant, je veux passer à Spring Integration. Après avoir lu beaucoup, et à condition que je n'ai pas besoin d'une utilisation bidirectionnelle (en rejetant Passerelles) ni un mécanisme d'interrogation (rejeter @InboundChannelAdapter), je vais pour un message-driven-channel-adapter, en langage de configuration XML traditionnel. J'ai trouvé que l'idiome de Java devrait être accompli au moyen de la nouvelle bibliothèque DSL d'intégration de ressort, et ainsi, je recherche l'extrait approprié.

Il semble JmsMessageDrivenChannelAdapter est le bon équivalent, et je l'ai trouvé un moyen:

IntegrationFlows.from(Jms.messageDriverChannelAdapter(...))

Mais le problème est que cela n'accepte que le ActiveMQ ConnectionFactory ou un AbstractMessageListenerContainer, mais pas ma botte préconfiguré JmsListenerContainerFactory !

Comment cela devrait-il être mis en œuvre de manière ultime?

Répondre

3

JmsListenerContainerFactory est spécifique pour le @JmsListener, il s'agit d'une abstraction de niveau supérieur utilisée pour configurer un DefaultMessageListenerContainer. L'amorçage ne fournit pas d'option de configuration automatique pour un DefaultMessageListenerContainer brut; vous devez le câbler vous-même. Mais vous pouvez toujours utiliser les propriétés de démarrage ...

@Bean 
public IntegrationFlow flow(ConnectionFactory connectionFactory, 
          JmsProperties properties) { 
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(container(connectionFactory, properties))) 
      ... 
      .get(); 
} 

private DefaultMessageListenerContainer container(ConnectionFactory connectionFactory, 
                JmsProperties properties) { 
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); 
    container.setConcurrentConsumers(properties.getListener().getConcurrency()); 
    container.setMaxConcurrentConsumers(properties.getListener().getMaxConcurrency()); 
    ... 
    return container; 
} 
+0

Grande réponse merci !!!!!!!!!!! – Whimusical

3

Il y a même une meilleure approche. Je suis surpris que Gary ne l'ait pas commenté. Il existe un constructeur prêt à l'emploi appelé Jms.container(...).

@Bean 
public IntegrationFlow jmsMyServiceMsgInboundFlow(ConnectionFactory connectionFactory, MessageConverter jmsMessageConverter, MyService myService, JmsProperties jmsProperties, @Value("${mycompany.jms.destination.my-topic}") String topicDestination){ 

     JmsProperties.Listener jmsInProps = jmsProperties.getListener(); 

     return IntegrationFlows.from(
           Jms.messageDrivenChannelAdapter( Jms.container(connectionFactory, topicDestination) 
                    .pubSubDomain(false) 
                    .sessionAcknowledgeMode(jmsInProps .getAcknowledgeMode().getMode()) 
                    .maxMessagesPerTask(1) 
                    .errorHandler(e -> e.printStackTrace()) 
                    .cacheLevel(0) 
                    .concurrency(jmsInProps.formatConcurrency()) 
                    .taskExecutor(Executors.newCachedThreadPool()) 
                    .get())) 
            ) 
            .extractPayload(true) 
            .jmsMessageConverter(jmsMessageConverter) 
            .destination(topicDestination) 
            .autoStartup(true) 
            //.errorChannel("NOPE") 
          ) 
          .log(LoggingHandler.Level.DEBUG) 
          .log() 
          .handle(myService, "myMethod", e -> e.async(true).advice(retryAdvice())) 
          .get();