2015-07-24 2 views
0

Mon Acteur Parent ressembleApplication d'une action personnalisée sur SupervisorStrategy après maxNrOfRetries?

case object StartRemoteProcessor 

class ConnectorActor extends Actor with ActorLogging { 
    override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 20 seconds) { 
    case e: OutOfMemoryError => 
     log.error("Exception received => " + e.getMessage) 
     Restart 
    case e: IllegalArgumentException => 
     log.error("Exception received => " + e.getMessage) 
     Restart 
    } 

    def receive = LoggingReceive { 
    case StartRemoteProcessor => 
     val remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor") 
     log.info("Starting Remote Processor") 
     remoteProcessor ! "Start" 
    case "ProcessingStopped" => 
     notifyFailure() 
    } 

    def notifyFailure() = { 
    log.info("notifying failure to server") 
    } 
} 

Comme par docs

L'acteur enfant est arrêté si la limite est dépassée.

Exigence

  • Une fois que le maxNrOfRetries est épuisé, je veux prendre une action personnalisée notifyFailure lorsque l'acteur est finalement arrêté. Cela enverra un courrier électronique

Sur mon enfant Actor je

class ProcessingActor extends Actor with ActorLogging { 

    override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent) 
    override def preStart(): Unit =() 
    override def postStop(): Unit = context.parent ! "ProcessingStopped" 

    def receive = LoggingReceive { 
    case "Start" => 
     log.info("ProcessingActor path => " + self.path) 
     startProcessing() 
    } 

    def startProcessing() = { 
    println("executing startProcessing") 
    throw new IllegalArgumentException("not implemented by choice") 
    } 
} 

Mais dans les journaux, je vois que notifyFailure est appelé à chaque Restart

[INFO] [07/24/2015 11:57:50.107] [main] [Remoting] Starting remoting 
[INFO] [07/24/2015 11:57:50.265] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2554] 
[INFO] [07/24/2015 11:57:50.266] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2554] 
ConnectorSystem Started 
[INFO] [07/24/2015 11:57:50.277] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2554/user/connectorActor] Starting Remote Processor 
[ERROR] [07/24/2015 11:57:50.509] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2554/user/connectorActor] Exception received => not implemented by choice 
[ERROR] [07/24/2015 11:57:50.511] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/remote/akka.tcp/[email protected]:2554/user/connectorActor/processingActor] not implemented by choice 
java.lang.IllegalArgumentException: not implemented by choice 
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23) 
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

[INFO] [07/24/2015 11:57:50.526] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2554/user/connectorActor] notifying failure to server 
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2554/user/connectorActor] Exception received => not implemented by choice 
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/remote/akka.tcp/[email protected]:2554/user/connectorActor/processingActor] not implemented by choice 
java.lang.IllegalArgumentException: not implemented by choice 
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23) 
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

[INFO] [07/24/2015 11:57:50.533] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2554/user/connectorActor] notifying failure to server 
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2554/user/connectorActor] Exception received => not implemented by choice 
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/remote/akka.tcp/[email protected]:2554/user/connectorActor/processingActor] not implemented by choice 
java.lang.IllegalArgumentException: not implemented by choice 
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23) 
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

[INFO] [07/24/2015 11:57:50.538] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2554/user/connectorActor] notifying failure to server 
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2554/user/connectorActor] Exception received => not implemented by choice 
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/remote/akka.tcp/[email protected]:2554/user/connectorActor/processingActor] not implemented by choice 
java.lang.IllegalArgumentException: not implemented by choice 
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23) 
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

[INFO] [07/24/2015 11:57:50.545] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2554/user/connectorActor] notifying failure to server 

Comment puis-je obtenir ce comportement?

Répondre

1

La réponse de Daydreamer fonctionnera, mais une autre approche pourrait être de regarder l'acteur enfant du parent, et lorsque vous recevez le message terminé, exécuter notifyFailure

var remoteProcessor:ActorRef = _ 

def receive = LoggingReceive { 
    case StartRemoteProcessor => 
    remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor") 
    context.watch(remoteProcessor) 
    log.info("Starting Remote Processor") 
    remoteProcessor ! "Start" 

    case Terminated(remoteProcessor) => 
    notifyFailure() 
} 

De cette façon, vous n Il n'est pas nécessaire de personnaliser les méthodes du cycle de vie d'Actor, qui, je le trouve, peuvent être une riche source de bogues.

+0

Je crois que votre approche est bien meilleure, je vais l'essayer et l'accepter une fois que je confirme ce comportement. Merci @mattinbits – daydreamer

+0

Je viens d'essayer, votre changement fonctionne bien, 2 choses à noter '1' est' context.watch (remoteProcessor) 'après avoir créé l'acteur et' 2' est 'case terminé (remoteProcessor)' – daydreamer

0

Comme recommandé par @rkuhn le chat Gitter, ce qui suit a fonctionné pour moi

override def preRestart(reason: Throwable, message: Option[Any]): Unit =() 

Tout le code?

override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent) 
    override def preRestart(reason: Throwable, message: Option[Any]): Unit =() 
    override def postStop(): Unit = context.parent ! "ProcessingStopped"