2016-06-07 2 views
0

L'exemple de code suivant (que vous pouvez copier et exécuter) affiche un MyParentActor qui crée un MyChildActor.Akka: Ordre des messages après le redémarrage d'Akka

Le MyChildActor lève une exception pour son premier message qui provoque son redémarrage.

Cependant, ce que je veux réaliser, c'est que le "Message 1" soit toujours traité avant "Message 2" au redémarrage du MyChildActor. Au lieu de cela, ce qui se passe est que le message 1 est ajouté à la queue de la file d'attente de boîte aux lettres, et le message 2 est donc traité en premier.

Comment puis-je commander les messages originaux au redémarrage d'un acteur, sans avoir à créer ma propre boîte aux lettres, etc.?

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
     childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case _: CustomException => Restart 
      case _: Exception   => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
     case Some(e) => self ! e 
     } 
    } 

    override def receive = { 
     case message: String => { 
     if (count == 0) { 
      count += 1 
      throw new CustomException("Exception occurred") 
     } 
     log.info("Received message {}", message) 
     } 
    } 
    } 

    class CustomException(message: String) extends RuntimeException(message) 
} 

Répondre

1

Vous pouvez marquer le message à défaut d'une enveloppe spéciale et tout planquer à la réception de ce message (voir la mise en œuvre de l'acteur de l'enfant). Définissez simplement un comportement dans lequel l'acteur stocke tous les messages à l'exception de l'enveloppe spécifique, traite sa charge utile, puis annule tous les autres messages et retourne à son comportement normal.

Cela me donne:

INFO TestApp$MyChildActor - Received message Message 1 
INFO TestApp$MyChildActor - Received message Message 2 

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
      childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case e: CustomException => Restart 
      case _: Exception => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with Stash with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
      case Some(e) => 
       self ! Unstash(e) 
     } 
    } 

    override def postRestart(reason: Throwable): Unit = { 
     context.become(stashing) 
     preStart() 
    } 

    override def receive = { 
     case message: String => { 
      if (count == 0) { 
       count += 1 
       throw new CustomException("Exception occurred") 
      } 
      log.info("Received message {}", message) 
     } 
    } 

    private def stashing: Receive = { 
     case Unstash(payload) => 
      receive(payload) 
      unstashAll() 
      context.unbecome() 
     case m => 
      stash() 
    } 
    } 

    case class Unstash(payload: Any) 
    class CustomException(message: String) extends RuntimeException(message) 
}