2016-10-20 1 views
0

J'essaie d'utiliser ConsistentHashingRoutingLogic Akka pour garantir que les messages avec la même clé sont routés vers le même acteur. Il est important que les messages avec la même clé soient traités dans la commande FIFO. Les messages avec des clés différentes peuvent être acheminés vers différents acteurs et traités en parallèle librement. Je n'utilise pas Akka en mode distribué.Akka ConsistentHashingRoutingLogic ne routage pas toujours le même thread Dispatcher

Les messages sont en fait des messages JSON lus à partir d'un courtier RabbitMQ afin que mon acteur maître reçoive un message AMQP et utilise la clé de routage comme clé de message. La même clé est également dans le message lui-même. L'acteur fait partie d'une application Spring.

Mon acteur principal ressemble à ceci:

@Named("MessageHandlerMaster") 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class MessageHandlerMaster extends UntypedActor { 

    private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class); 

    private Router router; 

    @Autowired 
    public MessageHandlerMaster(final SpringProps springProps) { 

    List<Routee> routees = Stream.generate(() -> { 
     ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class)); 
     getContext().watch(worker); 
     return new ActorRefRoutee(worker); 
    }).limit(5) //todo: configurable number of workers 
     .collect(Collectors.toList()); 

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees); 
    } 

    public void onReceive(Object message) { 
    if (message instanceof Message) { 
     Message amqpMessage = (Message) message; 
     String encoding = getMessageEncoding(amqpMessage); 
     try { 
     String json = new String(amqpMessage.getBody(), encoding); 
     String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey(); 
     log.debug("Routing message based on routing key " + routingKey); 
     router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender()); 
     } catch (UnsupportedEncodingException e) { 
     log.warn("Unknown content encoding sent in message! {}", encoding); 
     } 
    } else if (message instanceof Terminated) { 
     //if one of the routee's died, remove it and replace it 
     log.debug("Actor routee terminated!"); 
     router.removeRoutee(((Terminated) message).actor()); 
     ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class)); 
     getContext().watch(r); 
     router = router.addRoutee(new ActorRefRoutee(r)); 
    } 
    } 

    private static String getMessageEncoding(Message message) { 
    String encoding = message.getMessageProperties().getContentEncoding(); 
    if ((encoding == null) || (encoding.equals(""))) { 
     encoding = "UTF-8"; 
    } 
    return encoding; 
    } 
} 

Je suis d'abord RemettRe le maître une fois par:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master"); 

et puis juste soumettre les messages que lui:

master.tell(message, ActorRef.noSender()); 

Mais lorsque j'imprime les journaux à partir de onReceive() de mon travailleur, je vois que différents threads du répartiteur sont étant utilisé parfois pour la même clé.

De même, il n'est pas clair pourquoi parfois le même thread Dispatcher est utilisé pour l'acteur maître et pour un acteur Worker. Est-ce que ce ne devrait pas être un message asynchrone passant entre les threads?

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 

Comme vous pouvez le voir ici, le fil répartiteur pour le message de traitement des travailleurs avec la clé 10.420.186 était parfois 9 et parfois 10. L'acteur principal est parfois aussi utilisé ces 2 fils.

Comment puis-je être sûr que le ConsistentHashingRoutingLogic fonctionne réellement et que le même thread traite les messages avec la même clé? Est-ce que je fais quelque chose de mal dans l'initialisation de mon routeur?

+1

Je pense que les acteurs ne sont pas liés aux threads. Il n'y a donc rien de mal dans le journal joint. – vrudkovsk

Répondre

0

Donc @vrudkovsk a raison avec son commentaire. Je pense que vous êtes confus entre les threads et les acteurs. Les acteurs sont juste des objets en mémoire qui ont une adresse et une boîte aux lettres. Les répartiteurs sont essentiellement des pools de threads qui effectuent des actions avec l'acteur. actions Exemple sont:

  • dequeue un message d'une boîte aux lettres pour le traiter dans un acteur
  • enqueue un message à une boîte aux lettres.

Différents threads peuvent effectuer des actions pour le même acteur. C'est décidé par le répartiteur. Akka s'assure qu'un seul thread à la fois traitera un message dans un acteur. Cela ne veut pas dire que ce sera toujours le même fil.

Si vous voulez vous assurer qu'ils arrivent au même acteur, je recommanderais connecter le chemin d'acteur ou de l'adresse à l'aide context.self.path ou context.self.path.address puisque ceux-ci sont des identifiants uniques au sein de la même ActorSystem.

+0

Merci pour votre réponse.Donc, dans ce cas, ai-je toujours l'assurance que si j'ai 5 acteurs Worker, quel que soit le Thread utilisé pour l'acteur 1, puisque j'utilise 'ConsistentHashingRoutingLogic', il n'y a aucun moyen qu'un deuxième thread de dispatching décroche un autre message qui devrait être acheminé à l'acteur 1 avant le traitement du message précédent? – jbx

+0

C'est exact. Le 'ConsistentHashingRoutingLogic' s'assure que votre message finira dans le bon acteur indépendamment de quel thread fait le travail. – hveiga