2011-08-16 5 views
8

J'essaie d'obtenir un comportement tolérant aux fautes chez les Acteurs akka. Je travaille sur un code qui dépend de la disponibilité des acteurs dans le système pour une longue période de traitement. Je constate que mon traitement s'arrête après quelques heures (cela devrait prendre environ 10 heures) et il ne se passe pas grand-chose. Je crois que mes acteurs ne récupèrent pas d'exceptions.Comment configurer la tolérance aux pannes akka Actor?

Que dois-je faire pour redémarrer Actors de façon permanente? Je pense que cela peut être fait à partir de cette documentation http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

Je travaille avec akka 1.1.3 et scala 2,9

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached 
import akka.dispatch.Dispatchers 
import akka.routing.CyclicIterator 
import akka.routing.LoadBalancer 
import akka.config.Supervision._ 


object TestActor { 
    val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") 
        .setCorePoolSize(100) 
        .setMaxPoolSize(100) 
        .build 
} 

class TestActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.dispatcher = TestActor.dispatcher 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure") 
     println("Actor: " + name + " Received: " + num) 
     //Thread.sleep(100) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    //callback method for restart handling 
    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    //callback method for restart handling 
    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

trait CyclicLoadBalancing extends LoadBalancer { this: Actor => 
    val testActors: List[ActorRef] 
    val seq = new CyclicIterator[ActorRef](testActors) 
} 

trait TestActorManager extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) 
    val testActors: List[ActorRef] 
    override def preStart = testActors foreach { self.startLink(_) } 
    override def postStop = { System.out.println("postStop") } 
} 


    object FaultTest { 
    def main(args : Array[String]) : Unit = { 
     println("starting FaultTest.main()") 
     val numOfActors = 5 
     val supervisor = actorOf(
     new TestActorManager with CyclicLoadBalancing { 
      val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); 
     } 
    ) 

     supervisor.start(); 

     println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) 

     val testActor = Actor.registry.actorsFor(classOf[TestActor]).head 

     (1 until 200 toList) foreach { testActor ! _ } 

    } 
    } 

Ce code met en place 5 acteurs derrière un LoadBalancer qui vient imprimer Entiers qui sont envoyé à eux, sauf qu'ils lancent des exceptions sur les nombres pairs pour simuler des fautes. Les entiers 0 à 200 sont envoyés à ces acteurs. Je m'attends à ce que les nombres impairs obtiennent la sortie mais tout semble s'arrêter après quelques défauts sur les nombres pairs. L'exécution de ce code avec les résultats de SBT dans cette sortie:

[info] Running FaultTest 
starting FaultTest.main() 
Loading config [akka.conf] from the application classpath. 
Number of Actors: 5 
Actor: 2 Received: 1 
Actor: 2 Received: 9 
Actor: 1 Received: 3 
Actor: 3 Received: 7 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM 

Ce que je pense qui se passe ici est que 5 acteurs commencent, et les 5 premiers nombres pairs les mettre hors de l'entreprise et ils ne reçoivent pas redémarrés.

Comment ce code peut-il être modifié afin que les acteurs récupèrent des exceptions?

Je m'attends à ce que cela imprime tous les nombres impairs de 1 à 200. Je pense que chaque acteur échouerait sur les nombres pairs mais serait redémarré avec une boîte aux lettres intacte sur les exceptions. Je m'attends à voir le println de preRestart et postRestart. Qu'est-ce qui doit être configuré dans cet exemple de code pour que cela se produise?

Voici quelques hypothèses supplémentaires à propos de l'akka et des acteurs qui pourraient mener à mon malentendu. Je suppose qu'un acteur peut être configuré avec un superviseur ou un faultHandler afin qu'il soit redémarré et continue d'être disponible lorsqu'une exception est levée pendant la réception. Je suppose que le message qui a été envoyé à l'acteur sera perdu s'il déclenche une exception lors de la réception. Je suppose que les commandes preRestart() et postRestart() sur l'acteur qui lance l'exception seront appelées.

L'exemple de code représente ce que je suis en train de faire et est basé sur Why is my Dispatching on Actors scaled down in Akka?

** Un autre exemple de code **

Voici un autre exemple de code qui est plus simple. Je commence un acteur qui jette des exceptions sur les nombres pairs. Il n'y a pas d'équilibreur de charge ou d'autres choses sur le chemin. J'essaie d'imprimer des informations sur l'acteur. J'attends de sortir du programme une minute après que les messages aient été envoyés à l'acteur et que je surveille ce qui se passe.

Je m'attends à ce que cela imprime les nombres impairs, mais il semble que l'acteur se trouve avec des messages dans sa boîte aux lettres.

L'ensemble OneForOneStrategy est-il incorrect? Ai-je besoin de lier l'acteur à quelque chose? Ce type de configuration est-il fondamentalement mal orienté de ma part? Est-ce qu'un Dispatcher doit être configuré avec une tolérance de panne d'une certaine manière? Pourrais-je me tromper sur les threads dans le Dispatcher?

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.ActorRegistry 
import akka.config.Supervision._ 

class SingleActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure, where does this get logged?") 
     println("Actor: " + name + " Received: " + num) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

object TestSingleActor{ 

    def main(args : Array[String]) : Unit = { 
     println("starting TestSingleActor.main()") 

     val testActor = Actor.actorOf(new SingleActor(1)).start() 

     println("number of actors: " + registry.actors.size) 
     printAllActorsInfo 

     (1 until 20 toList) foreach { testActor ! _ } 

     for(i <- 1 until 120){ 
     Thread.sleep(500) 
     printAllActorsInfo 
     } 
    } 

    def printAllActorsInfo() ={ 
    registry.actors.foreach((a) => 
     println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " 
       .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) 
    } 
} 

Je reçois une sortie comme:

[info] Running TestSingleActor 
starting TestSingleActor.main() 
Loading config [akka.conf] from the application classpath. 
number of actors: 1 
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1 
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ... 

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM 

Répondre

5

Le problème était que j'étais avec mon fichier akka.conf. J'utilisais le fichier akka.conf de référence 1.1.3 à l'exception de la ligne qui configurait les gestionnaires d'événements.

mien (le qui est brisé):

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

référence 1.1.3 (celui qui fonctionne):

event-handlers = ["akka.event.EventHandler$DefaultListener"] 

Avec mes gestionnaires d'événements-ligne config, redémarre acteur ne se produisent pas. Avec la référence 1.1.3 les redémarrages de ligne se passent à merveille.

J'ai fait ce changement en fonction de ces instructions http://akka.io/docs/akka/1.1.3/general/slf4j.html

Ainsi, en se débarrassant des suggestions dans cette page et revenir à la référence 1.1.3 akka.conf je suis en mesure d'obtenir des acteurs tolérants aux pannes.

1

Je crois que votre problème se termine après que les messages sont envoyés, vous n'êtes pas essayer de garder votre application asynchrone en vie, et donc les principales sorties de fil et prend tout avec.

+0

Si j'ajoute un Trhead.sleep (100000) à la fin de main() je reçois: '[info] Exécution FaultTest à partir FaultTest.main() Chargement config [akka.conf] du classpath de l'application. Nombre d'acteurs: 5 Acteur: 0 Reçu: 1 Acteur: 4 Reçu: 3 Acteur: 1 Reçu: 7 Acteur: 1 Reçu: 9' et les pauses de sortie, mais les numéros supplémentaires ne sont pas imprimés. Je n'ai pas attendu l'application pour sortir mais après 30-40sec il n'y avait rien. De plus, si j'enlève la faute, les numéros s'impriment très rapidement, en moins de 2 secondes. –

Questions connexes