2015-09-10 1 views
1

Je développe une application multithread, où plusieurs "processeurs" (Runnables dans ThreadPools) envoient des messages les uns aux autres. Ils communiquent en utilisant l'interface BlockingQueue: lorsque le processeur A est terminé avec la tâche T1, il le pousse à la file d'attente Q1 (par exemple, BlockingQueue<MyTask> si T1 est représenté par la classe MyTask); après cela, le processeur B tire la tâche de Q1, effectue des calculs et pousse le résultat résultat à Q2; etc.Liaison RabbitMQ à BlockingQueue

J'utilise LinkedBlockingQueue, car mon application est monolithique et tous les processeurs "vivent" dans la même JVM. Cependant, je veux que mon application devienne modulaire (Microservice Architecture), j'ai donc décidé d'utiliser RabbitMQ comme un courtier de messages.

Le problème est de migrer des implémentations java des files d'attente vers RabbitMQ avec des changements minimes dans le code source du client. De ce fait, j'essaie de trouver une sorte de liaison entre les abstractions RabbitMQ et l'interface BlockingQueue. Ainsi, quand quelqu'un envoie un message à la file d'attente d'amqp, il devrait apparaître dans une file d'attente java. Et vice versa: quand quelqu'un pousse un objet dans la file d'attente java, il doit être propagé à l'échange d'un amqp.

Un exemple de mise en œuvre de l'interrogation (à partir de la file d'attente d'amqp, en utilisant spring-amqp) est présenté ci-dessous.

<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) { 
    LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(); 

    MessageConverter messageConverter = listenerContainer.getMessageConverter(); 
    listenerContainer.setupMessageListener((MessageListener) message -> { 
     Object task = messageConverter.fromMessage(message); 
     queue.offer(elementType.cast(task)); 
    }); 

    return queue; 
} 

Je ne peux pas trouver un cadre qui implémente l'interface BlockingQueue utilisant les files d'attente RabbitMQ maintenant. Si ce type de cadre n'existe pas, mon idée est-elle erronée d'un point de vue architectural, ou est-ce que personne ne l'a pas encore implémenté?

Répondre

1

Je ne suis pas sûr que vous voulez vraiment le faire de la façon que vous décrivez - les messages entrants seront remis à la file d'attente et s'asseoir en mémoire, pas dans RabbitMQ.

Je pense une implémentation simple BlockingQueue qui utilise un RabbitTemplate dessous pour tirer les messages de la file d'attente de lapin (en utilisant ou receiveAndConvert()) pourrait être mieux pour prendre des/opérations sondage - il laissera le message dans RabbitMQ jusqu'à ce que nécessaire, et simplement RabbitTemplate.convertAndSend() pour les opérations d'offre/vente.

Bien que très simple, cela pourrait être un ajout utile au framework; Considérez contributing.

+1

En plus de la réponse de Gary, je suggère de jeter un oeil à la Spring Integration Framework qui fournit le 'PollableAmqpChannel': http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp- canaux –

+0

Gary, merci pour votre réponse. En fait, je voulais dire ce genre de mise en œuvre, mais je pensais que cela existait déjà. Je considérerai contribuer après intégration cette implémentation dans mon projet (si l'intégration au printemps ne fonctionnera pas dans mon cas). –

+0

@ArtemBilan merci, je vais essayer printemps-intégration. La première impression est plutôt positive. –