J'essaie d'utiliser ConsistentHashingRoutingLogic
Akka pour garantir que les messages avec la même clé sont routés vers le même acteur. Il est important que les messages avec la même clé soient traités dans la commande FIFO. Les messages avec des clés différentes peuvent être acheminés vers différents acteurs et traités en parallèle librement. Je n'utilise pas Akka en mode distribué.Akka ConsistentHashingRoutingLogic ne routage pas toujours le même thread Dispatcher
Les messages sont en fait des messages JSON lus à partir d'un courtier RabbitMQ afin que mon acteur maître reçoive un message AMQP et utilise la clé de routage comme clé de message. La même clé est également dans le message lui-même. L'acteur fait partie d'une application Spring.
Mon acteur principal ressemble à ceci:
@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {
private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);
private Router router;
@Autowired
public MessageHandlerMaster(final SpringProps springProps) {
List<Routee> routees = Stream.generate(() -> {
ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}).limit(5) //todo: configurable number of workers
.collect(Collectors.toList());
router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
public void onReceive(Object message) {
if (message instanceof Message) {
Message amqpMessage = (Message) message;
String encoding = getMessageEncoding(amqpMessage);
try {
String json = new String(amqpMessage.getBody(), encoding);
String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
log.debug("Routing message based on routing key " + routingKey);
router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
} catch (UnsupportedEncodingException e) {
log.warn("Unknown content encoding sent in message! {}", encoding);
}
} else if (message instanceof Terminated) {
//if one of the routee's died, remove it and replace it
log.debug("Actor routee terminated!");
router.removeRoutee(((Terminated) message).actor());
ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
getContext().watch(r);
router = router.addRoutee(new ActorRefRoutee(r));
}
}
private static String getMessageEncoding(Message message) {
String encoding = message.getMessageProperties().getContentEncoding();
if ((encoding == null) || (encoding.equals(""))) {
encoding = "UTF-8";
}
return encoding;
}
}
Je suis d'abord RemettRe le maître une fois par:
this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");
et puis juste soumettre les messages que lui:
master.tell(message, ActorRef.noSender());
Mais lorsque j'imprime les journaux à partir de onReceive()
de mon travailleur, je vois que différents threads du répartiteur sont étant utilisé parfois pour la même clé.
De même, il n'est pas clair pourquoi parfois le même thread Dispatcher est utilisé pour l'acteur maître et pour un acteur Worker. Est-ce que ce ne devrait pas être un message asynchrone passant entre les threads?
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
Comme vous pouvez le voir ici, le fil répartiteur pour le message de traitement des travailleurs avec la clé 10.420.186 était parfois 9 et parfois 10. L'acteur principal est parfois aussi utilisé ces 2 fils.
Comment puis-je être sûr que le ConsistentHashingRoutingLogic
fonctionne réellement et que le même thread traite les messages avec la même clé? Est-ce que je fais quelque chose de mal dans l'initialisation de mon routeur?
Je pense que les acteurs ne sont pas liés aux threads. Il n'y a donc rien de mal dans le journal joint. – vrudkovsk