0

J'ai un flux configuré DSL Spring Integration:DSL Spring Integration Java: mise en mémoire tampon le flux de messages et gestionnaire mis en thread séparé

// A custom channel Bean 
@Autowired 
@Qualifier(INPUT_DATA_CHANNEL) 
private PublishSubscribeChannel publishSubscribeChannel; 

//A Service that can do database recording 
@Autowired 
private DatabaseActivator databaseActivator; 

@Bean 
public IntegrationFlow setupDatabaseFlow() { 

    return IntegrationFlows.from(publishSubscribeChannel) 
      .handle((p, h) -> databaseActivator.recordToDatabase(p)) 
      .get(); 
} 

Selon les journaux, tout se passe de façon séquentielle dans le thread "principal". BTW, j'utilise publishSubscribeChannel comme en parallèle j'ai éditeur/gestionnaire de lapin qui écoute de la même manière à ce canal.

Étant donné que le fonctionnement de la base de données prend du temps, comment dois-je aborder correctement la manipulation afin que "main" ne soit pas ralentie. De préférence, le thread principal doit être débloqué dès que possible et la manipulation doit se poursuivre dans un thread de travail. Ai-je raison? Puis-je introduire un tampon dans le flux, qui va collecter des paquets de messages de publishSubscribeChannel?

En outre, je préfère un autre thread (pool) pour gérer l'envoi réel afin de supprimer la charge du thread principal qui exécute le flux. Je suis bien au courant de ThreadPoolTaskExecutor au printemps que les deux ont un tampon et un pool de threads. Est-ce un bon moyen de l'utiliser, et comment utiliser ThreadPoolTaskExecutor en Java DSL façon?

Répondre

0

Il est le cteur en la matière:

/** 
* Create a PublishSubscribeChannel that will use an {@link Executor} 
* to invoke the handlers. If this is null, each invocation will occur in 
* the message sender's thread. 
* 
* @param executor The executor. 
*/ 
public PublishSubscribeChannel(Executor executor) { 

http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-configuration-pubsubchannel

Avec Java DSL, vous pouvez le déclarer comme:

@Bean 
PublishSubscribeChannel publishSubscribeChannel(Executor executor) { 
    return Channels.publishSubscribe(executor).get(); 
}