J'utilise Akka Cluster (version 2.4.10) avec quelques nœuds désignés pour le rôle "front-end" et peu d'autres comme "workers". Les travailleurs sont sur des machines distantes. Le travail entrant est distribué par l'acteur frontal aux travailleurs par routage circulaire. Le problème est de renvoyer la réponse des «travailleurs» à l'acteur principal. Je peux voir que le travail est complété par les travailleurs. Mais le message envoyé par les ouvriers au front n'atteint pas et finit comme des lettres mortes. Je vois l'erreur ci-dessous dans le journal.Akka round-robin: Envoi d'une réponse depuis les routes distantes à l'expéditeur
[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.
Je l'ai vu et je suis this suivant le même dans mon code. J'ai également vu this, mais la solution proposée ne s'applique pas dans ce cas, car je ne connais pas les routes à l'avance. Cela vient de la configuration et cela peut changer. La configuration du routeur round-robin est comme ci-dessous.
akka.actor.deployment {
/frontEnd/hm = {
router = round-robin-group
nr-of-instances = 5
routees.paths = ["/user/hmWorker"]
cluster {
enabled = on
use-role = backend
allow-local-routees = on
}
}
}
Le routeur est instancié dans l'acteur frontal comme ci-dessous.
val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))
Le contrôleur et les codes du travailleur sont indiqués ci-dessous.
// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {
val list = List("a", "b") // Assume this is a big list
val groups = list.grouped(500)
override def receive: Actor.Receive = {
val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
val future = Future.sequence(futures).map(_.flatten)
val result = Await.result(future, 50 seconds)
println(s"Result is $result")
}
}
// Node 2
class Worker extends Actor {
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future onComplete {
case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
}
Veuillez me faire savoir ce que je fais mal ici. Apprécier ton aide.
Oui. Cela fonctionne avec "pipeTo". Merci. – Jegan