2012-04-04 1 views
2

J'essaie d'utiliser Akka pour implémenter un serveur TCP pour un protocole d'application personnalisé. J'essaie de suivre l'exemple donné ici: http://doc.akka.io/docs/akka/2.0/scala/io.html pour faire des E/S non bloquantes dans une boucle de rendement pour ....Bizarre try/catch comportement avec Scala + Akka

Je trouve que lorsque je lance une exception depuis l'intérieur du bloc de rendement, je ne peux pas l'attraper de l'extérieur du bloc. Je pense que j'ai un malentendu fondamental sur la façon dont Akka ou Scala travaille ici et j'apprécierais tous les conseils.

J'ai résumaient le code à ceci:

import akka.actor._ 
import java.net.InetSocketAddress 

class EchoServer(port: Int) extends Actor { 

    val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher) 

    override def preStart { 
    IOManager(context.system) listen new InetSocketAddress(port) 
    } 

    def receive = { 
    case IO.NewClient(server) => 
     val socket = server.accept() 
     state(socket) flatMap (_ => EchoServer.processRequest(socket)) 
    case IO.Read(socket, bytes) => 
     state(socket)(IO.Chunk(bytes)) 
    case IO.Closed(socket, cause) => 
     state(socket)(IO.EOF(None)) 
     state -= socket 
    } 
} 

object EchoServer extends App 
{ 
    def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] = 
    { 
    println("In process request") 
    try { 
     for { 
     bs <- IO take 1 
     } yield { 
     println("I'll get here") 
     throw new Exception("Hey-o!") 
     println("But not here ... as expected") 
     } 
    } catch { 
     case e: Exception => println("And not here ... wtf?"); IO.Done() // NEVER GETS HERE 
    } 
    } 

    ActorSystem().actorOf(Props(new EchoServer(8080))) 
} 

Peut-être plus pratique pour saisir l'essentiel ici: https://gist.github.com/2296554

Quelqu'un peut-il expliquer pourquoi le contrôle ne parvient pas à mon bloc catch dans cette situation?

je remarquai que si je tourne la consignation de débogage dans Akka, je vois ce message dans la sortie:

[DEBUG] [04/03/2012 22:42:25.106] [EchoServerActorSystem-akka.actor.default-dispatcher-1] [Future] Hey-o! 

donc je suppose que l'exception est gérée par le se dispatcher Akka? Quelqu'un peut-il expliquer comment c'est possible?

Répondre

6

Le point d'E/S non bloquant est bien sûr qu'il n'y a aucune garantie quand et où il est exécuté. Rappelez-vous que l'on peut écrire pour la compréhension comme

(IO take 1).map(bs => { 
    println("I'll get here"); throw // ... 
} 

Que fait ce code? IO take 1 renvoie un objet non-bloquant Future, qui est ensuite ajouté une fonction de transformation via la méthode map. C'est à dire. quand (et où) IO take 1 est prêt, il appliquera le map sur le résultat.

Tout cela se passe dans un autre thread (ou à l'aide d'une autre manière de mettre en œuvre la sémantique non-blocage), donc il n'y a aucun moyen pour le try - réagir sur les Exception s être jeté catch. La méthode bs => println(…) … ne connaîtrait pas non plus la gestion des exceptions. Tout ce qu'il sait qu'il devrait transformer une entrée bs et avoir un résultat quand il est fini. Leçon à retenir: Lorsque vous utilisez un code non bloquant, évitez les effets secondaires. Surtout si les effets secondaires sont utilisés pour modifier le flux d'exécution.

Pour attraper en fait l'exception, je pense que vous devrez structurer comme suit (non testé, voir API):

def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] = 
    { 
    println("In process request") 
    (
     for { 
     bs <- IO take 1 
     } yield { 
     println("I'll get here") 
     throw new Exception("Hey-o!") 
     println("But not here ... as expected") 
     } 
    ) recover { 
     case e: Exception => println("And here ...?"); IO.Done() 
    } 
    } 
+0

Merci pour la réponse très utile, il efface vraiment les choses. –