2016-10-06 4 views
0

J'utilise Akka Cluster (version 2.4.10) avec quelques nœuds désignés pour le rôle "front-end" et peu d'autres comme "workers". Les travailleurs sont sur des machines distantes. Le travail entrant est distribué par l'acteur frontal aux travailleurs par routage circulaire. Le problème est de renvoyer la réponse des «travailleurs» à l'acteur principal. Je peux voir que le travail est complété par les travailleurs. Mais le message envoyé par les ouvriers au front n'atteint pas et finit comme des lettres mortes. Je vois l'erreur ci-dessous dans le journal.Akka round-robin: Envoi d'une réponse depuis les routes distantes à l'expéditeur

[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered. 

Je l'ai vu et je suis this suivant le même dans mon code. J'ai également vu this, mais la solution proposée ne s'applique pas dans ce cas, car je ne connais pas les routes à l'avance. Cela vient de la configuration et cela peut changer. La configuration du routeur round-robin est comme ci-dessous.

akka.actor.deployment { 
    /frontEnd/hm = { 
    router = round-robin-group 
    nr-of-instances = 5 
    routees.paths = ["/user/hmWorker"] 
    cluster { 
     enabled = on 
     use-role = backend 
     allow-local-routees = on 
    } 
    } 
} 

Le routeur est instancié dans l'acteur frontal comme ci-dessous.

val router = context.actorOf(FromConfig.props(), name = "hm") 
val controller = context.actorOf(Props(classOf[Controller], router)) 

Le contrôleur et les codes du travailleur sont indiqués ci-dessous.

// Node 1 : Controller routes requests using round-robin 
class Controller(router: ActorRef) extends Actor { 

    val list = List("a", "b") // Assume this is a big list 

    val groups = list.grouped(500) 

    override def receive: Actor.Receive = { 
     val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]])) 
     val future = Future.sequence(futures).map(_.flatten) 
     val result = Await.result(future, 50 seconds) 
     println(s"Result is $result") 
    } 
} 

// Node 2 
class Worker extends Actor { 

    override def receive: Actor.Receive = { 
     case Message(lst) => 
      val future: Future[List[String]] = // Do Something asynchronous 
      future onComplete { 
       case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor. 
       case Failure(th) => // Error handling 
      } 
    } 
} 

Veuillez me faire savoir ce que je fais mal ici. Apprécier ton aide.

Répondre

2

Vous ne devez pas utiliser sender() dans le rappel sur un Future. Au moment où le rappel est traité, le sender() fait probablement référence à quelque chose de différent de ce qu'il était lorsque vous avez reçu le message.

Pensez soit sauver la référence en dehors de la fonction de rappel d'abord comme:

override def receive: Actor.Receive = { 
    case Message(lst) => 
     val future: Future[List[String]] = // Do Something asynchronous 
     val replyTo: ActorRef = sender() 
     future onComplete { 
      case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor. 
      case Failure(th) => // Error handling 
     } 
} 

Ou mieux encore, utiliser le modèle de tuyau:

import akka.pattern.pipe 
override def receive: Actor.Receive = { 
    case Message(lst) => 
    val future: Future[List[String]] = // Do Something asynchronous 
    future.pipeTo(sender()) 
} 
+0

Oui. Cela fonctionne avec "pipeTo". Merci. – Jegan