2

J'ai un acteur à distance, Bar et un acteur local, Foo. Je souhaite utiliser Foo pour transmettre des messages à Bar à chaque appel d'une CLI.Comment puis-je envoyer des messages à un acteur distant via CLI avec Akka Remoting?

Bar Les messages peuvent être transmis avec succès, mais Foo se bloque en attendant un message. Pour résoudre ce problème, j'ai ajouté un sys.exit(0) à la fin de Foo principal. Cela provoque un problème d'association avec le système Foo. Comment puis-je arrêter mon acteur local entre les versions successives du CLI sans tuer mon acteur local manuellement?

Tais-toi et donne moi le code!


Foo:

build.sbt

name := "Foo" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11" 
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11" 
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0" 

fork in run := true 

Main.scala

import akka.actor._ 
import com.typesafe.config.ConfigFactory 

case class Config(mode: String = "", greeting: String="") 

class Foo extends Actor { 
    // create the remote actor 
    val BarActor = context.actorSelection("akka.tcp://[email protected]:2552/user/BarActor") 

    def receive = { 
    case method: String => BarActor ! method 
    } 
} 

object CommandLineInterface { 

    val config = ConfigFactory.load() 
    val system = ActorSystem("FooSystem", config.getConfig("FooApp")) 

    val FooActor = system.actorOf(Props[Foo], name = "FooActor") 

    val parser = new scopt.OptionParser[Config]("Foo") { 
    head("foo", "1.x") 

    help("help").text("prints usage text") 

    opt[String]('m', "method").action((x, c) => 
     c.copy(greeting = x)).text("Bar will greet with <method>") 
    } 
} 

object Main extends App { 
    import CommandLineInterface.{parser, FooActor} 

    parser.parse(args, Config()) match { 
    case Some(config) => FooActor ! config.greeting 
    case None => sys.error("Bad news...") 
    } 
    /* 
    When sys.exit(0) commented, this hangs and Bar greet. 
    When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet. 
    */ 

    //sys.exit(0) 
} 

application.conf

FooApp { 
    akka { 
    loglevel = "INFO" 
    actor { 
     provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
     enabled-transports = ["akka.remote.netty.tcp"] 
     netty.tcp { 
     hostname = "127.0.0.1" 
     port = 0 
     } 
     log-sent-messages = on 
     log-received-messages = on 
    } 
    } 
} 

Bar:

build.sbt

name := "Bar" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11" 
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11" 

Main.scala

import akka.actor._ 
import com.typesafe.config.ConfigFactory 

class Bar extends Actor { 
    def receive = { 
    case greeting: String => Bar.greet(greeting) 
    } 
} 

object Bar { 
    val config = ConfigFactory.load() 
    val system = ActorSystem("BarSystem", config.getConfig("BarApp")) 
    val BarActor = system.actorOf(Props[Bar], name = "BarActor") 

    def greet(greeting: String) = println(greeting) 

    def main(args: Array[String]): Unit = { 
    /* Intentionally empty */ 
    } 
} 

application.conf

BarApp { 
    akka { 
    loglevel = "INFO" 
    actor { 
     provider = remote 
    } 
    remote { 
     enabled-transports = ["akka.remote.netty.tcp"] 
     netty.tcp { 
     hostname = "127.0.0.1" 
     port = 2552 
     } 
     log-sent-messages = on 
     log-received-messages = on 
    } 
    } 
} 

Run Foo avec sbt 'run-main Main -m hello' et exécuter Bar avec sbt 'run-main Main'.

Désolé pour le code long, mais c'est le MVCE pour mon problème.

Comment puis-je atteindre mon comportement souhaité - l'acteur CLI meurt entre les invocations CLI successives avec l'acteur distant attendant de nouveaux messages.

+1

Pourquoi pensez-vous que 'bar' est mort? Y a-t-il quelque chose dans le journal indiquant cela? –

+0

@ PawełBartkiewicz J'ai essayé de clarifier ma signification. Désolé pour la faute. :) J'espère que c'est plus clair. – erip

Répondre

3

Cela se produit parce que vous appelez sys.exit(0) immédiatement après l'envoi d'un message à FooActor, donc il y a une chance significative que les sorties de l'application avant FooActor a la chance de lire le message même, et encore moins à l'avant BarActor.

Il semble y avoir many possible solutions, l'un d'entre eux étant:

class Foo extends Actor { 
    // create the remote actor 
    val BarActor = context.actorSelection("akka.tcp://[email protected]:2552/user/BarActor") 

    override def receive = { 
    case method: String => { 
     BarActor ! method 
     self ! PoisonPill 
    } 
    } 

    override def postStop = { 
    context.system.terminate 
    } 
} 

Malheureusement, il se trouve que le système est encore arrêté avant d'envoyer le message à Bar.

Je n'ai pas trouvé de solution raisonnable à ce problème si vous voulez envoyer un message dans un style «fire and forget». Cependant, dans la plupart des cas, il est souhaitable d'obtenir une sorte de réponse de l'acteur à distance, vous pouvez donc faire:

class Foo extends Actor { 
    // create the remote actor 
    val BarActor = context.actorSelection("akka.tcp://[email protected]:2552/user/BarActor") 

    override def receive = { 
    case method: String => { 
     BarActor ! method 
     context.become(waitingToKillMyself) 
    } 
    } 

    def waitingToKillMyself: Receive = { 
    case response: String => { 
     println(response) 
     self ! PoisonPill 
    } 
    } 

    override def postStop = { 
    context.system.terminate 
    } 
} 

// ... 

object Main extends App { 
    import CommandLineInterface.{parser, FooActor, system} 
    import system.dispatcher 

    parser.parse(args, Config()) match { 
    case Some(config) => { 
     FooActor ! config.greeting 
     system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill) 
    } 

    case None => sys.error("Bad news...") 
    } 
} 

Bar:

class Bar extends Actor { 
    def receive = { 
    case greeting: String => { 
     Bar.greet(greeting) 
     sender() ! "OK" 
    } 
    } 
} 
+0

Cela semble être une solution solide si c'est un message ponctuel, mais je veux que Bar soit toujours en train d'attendre un message. Est-ce que cela atteindra cette fin? – erip

+1

'BarSystem' n'est pas un système d'acteur séparé? –

+0

Désolé, vous n'avez pas vu votre commentaire après l'avoir modifié. Oui, je crois que cela devrait fonctionner correctement (je ne peux pas essayer moi-même en ce moment, je peux le faire plus tard si vous voulez). Puisque 'FooSystem' et' BarSystem' sont des systèmes d'acteur séparés, 'shutdown' ou' terminate' devrait arrêter seulement 'FooSystem' ([documentation] (http://doc.akka.io/api/akka/2.4/index.html #[email protected](): scala.concurrent.Future [akka.actor.Terminated])). –