2012-10-13 4 views
0

Contexte: Clojure + RabbitMQ (via Langohr), suite à this question.Clojure + RabbitMQ/consommation de messages multithread

Je reçois des résultats bizarres en consommant des messages à partir d'un RabbitMQ mq (obtenir les messages d'un échange direct et publier à un échange de fanout après le traitement du message). Je ne comprends pas pourquoi les messages se retrouvent dans threads distincts pendant la consommation (à chaque fois qu'un message passe un thread).

Le consommateur démarre dans un thread séparé (pour éviter qu'il ne plante le thread principal si des exceptions d'E/S surviennent), mais cela n'explique pas la commutation.

; Message handler 
(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     content (string/join " " (map msg '("title" "link" "body"))) 
     tags (pluck-tags content)] 
    (println (format "HANDLER %s: Message: %s | found tags: %s" 
        (Thread/currentThread) 
        (msg "title") 
        (tags-to-csv tags))) 
    (nil))) 
    ; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags)))))) 


(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 

Le gestionnaire de messages affiche simplement l'unité d'exécution en cours et la charge utile reçue. Voilà ce que je reçois:

HANDLER in Thread[pool-1-thread-2,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ... 

NOTE

Je l'ai remarqué lors de la lecture avec des agents. Je voulais traiter chaque message dans son propre pool de threads lié à la CPU, et le publier dans un pool de threads non borné (IO). Mais, après avoir imprimé le fil de discussion actuel, j'ai remarqué que même sans utiliser d'agents (ou de futurs), les messages sont traités par différents threads.

Répondre

0

Auteur de Langohr ici.

Il doit y avoir quelque chose qui manque dans le code. Si vous obtenez cette sortie avec des agents, c'est facile: les agents Clojure (aussi, futurs et promesses) utilisent un pool de threads. Langohr.consumers/subscribe de Langohr ou QueueingConsumer sous-jacent dans le client Java RabbitMQ ne le font pas.

+0

Merci pour la réponse. J'ai modifié la question (ajouté le gestionnaire de messages). Donc, comme vous pouvez le voir, pas d'agents là-bas. – neektza

1

1) Vous avez un échange en éventail, ce qui signifie que la clé de routage n'est pas utilisée du tout lors du routage des messages. Un échange de fanout achemine les messages vers chaque file d'attente. Si vous souhaitez utiliser des clés de routage, utilisez des échanges directs ou de rubrique.

2) Vous utilisez toujours le même nom de file d'attente, ce qui signifie que votre code ne fait qu'ajouter plusieurs consommateurs à la même file d'attente. Cela implique que rabbitmq va juste arrondir les messages autour de vos consommateurs.

+0

L'échange de fanout (e-pub-name) est utilisé pour publier les messages traités. L'échange direct (e-sous-nom) est utilisé pour écouter les messages dans la file d'attente de pré-paiement. Donc, le processus de clojure est un intermédiaire. La grande image: ** producteurs ** => échange direct -> file d'attente préproc => le processus clojure (ou plusieurs processeurs, en fonction de la charge) => échange de fanout -> ** consommateurs ** – neektza

+0

The Clojure le processus est moyen d'être évolutif, c'est pourquoi j'utilise un échange direct pour écouter - j'ai juste besoin de démarrer plus de processus et les messages sont routés via round-robin. Après le traitement du message, je viens de le publier à d'autres consommateurs. – neektza