Contexte: Clojure + RabbitMQ (via Langohr), suite à this question.Clojure + RabbitMQ/consommation de messages multithread
Je reçois des résultats bizarres en consommant des messages à partir d'un RabbitMQ mq (obtenir les messages d'un échange direct et publier à un échange de fanout après le traitement du message). Je ne comprends pas pourquoi les messages se retrouvent dans threads distincts pendant la consommation (à chaque fois qu'un message passe un thread).
Le consommateur démarre dans un thread séparé (pour éviter qu'il ne plante le thread principal si des exceptions d'E/S surviennent), mais cela n'explique pas la commutation.
; Message handler
(defn message-handler
[pub-name ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
content (string/join " " (map msg '("title" "link" "body")))
tags (pluck-tags content)]
(println (format "HANDLER %s: Message: %s | found tags: %s"
(Thread/currentThread)
(msg "title")
(tags-to-csv tags)))
(nil)))
; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
Le gestionnaire de messages affiche simplement l'unité d'exécution en cours et la charge utile reçue. Voilà ce que je reçois:
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
NOTE
Je l'ai remarqué lors de la lecture avec des agents. Je voulais traiter chaque message dans son propre pool de threads lié à la CPU, et le publier dans un pool de threads non borné (IO). Mais, après avoir imprimé le fil de discussion actuel, j'ai remarqué que même sans utiliser d'agents (ou de futurs), les messages sont traités par différents threads.
Merci pour la réponse. J'ai modifié la question (ajouté le gestionnaire de messages). Donc, comme vous pouvez le voir, pas d'agents là-bas. – neektza