2011-10-11 4 views
3

Je construis un service basé sur l'acteur à Scala où les consommateurs peuvent demander si les clients sont autorisés et peuvent également autoriser les clients.Scala acteur: receiveWithin() ne reçoit pas de messages

Si un consommateur interroge l'état d'autorisation d'un client et que ce client n'est pas encore autorisé, l'acteur doit attendre les messages Authorize entrants dans un délai d'attente spécifié, puis envoyer une réponse. IsAuthorized devrait pouvoir être exécuté de manière synchrone dans le code du consommateur afin qu'il bloque et attende une réponse. Quelque chose comme

service !? IsAuthorized(client) => { 
    case IsAuthorizedResponse(_, authorized) => // do something 
} 

Cependant receiveWithin() dans mon acteur ne reçoit jamais un message et fonctionne toujours dans le délai d'attente.

Voici mon code

case object WaitingForAuthorization 
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client]) 
case class IsAuthorized(client: Client) 
case class IsAuthorizedResponse(client: Client, authorized: Boolean) 
case class Authorize(client: Client) 

class ClientAuthorizationService { 
    private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client] 
    private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client] 

    def actor = Actor.actor { 
    loop { 
     react { 
     case IsAuthorized(client: Client) => reply { 
      if (authorized contains client) { 
      IsAuthorizedResponse(client, true) 
      } else { 
      waiting += client 
      var matched = false; 
      val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT) 

      while (!matched && Instant.now.isBefore(end)) { 
       // ERROR HERE: Never receives Authorize messages 
       receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) { 
       case Authorize(authorizedClient: Client) => { 
        authorizeClient(authorizedClient) 
        if (authorizedClient == client) matched = true 
       } 
       case TIMEOUT => // do nothing since we handle the timeout in the while loop 
       } 
      } 

      IsAuthorizedResponse(client, matched) 
      } 
     } 

     case Authorize(client: Client) => authorizeClient(client) 
     case WaitingForAuthorization => reply { 
      WaitingForAuthorizationResponse(immutable.Set() ++ waiting) 
     } 
     } 
    } 
    } 

    private def authorizeClient(client: Client) = synchronized { 
    authorized += client 
    waiting -= client 
    } 
} 

object ClientAuthorizationService { 
    val AUTH_TIMEOUT: Long = 60 * 1000; 
} 

Quand j'envoie un message Authorize à l'acteur alors qu'il est dans le receiveWithin bloquer les messages est pris par la deuxième déclaration de cas ci-dessous qui devrait en fait que rattraper ces messages lorsqu'aucune on attend une réponse à ce moment-là.

Quel est le problème avec mon code?

Mise à jour:

Voici une version abrégée du code correspondant qui représentent en fait une logique beaucoup plus simple et différent, mais peut-être une meilleure définition du problème:

loop { 
    react { 
    case IsAuthorized(client: Client) => reply { 
     var matched = false 

     // In the "real" logic we would actually loop here until either the 
     // authorized client matches the requested client or the timeout is hit. 
     // For the sake of the demo we only take the first Authorize message. 

     receiveWithin(60*1000) { 
     // Although Authorize is send to actor it's never caught here 
     case Authorize(authorizedClient: Client) => matched = authorizedClient == client 
     case TIMEOUT => 
     } 

     IsAuthorizedResponse(client, matched) 
    } 

    case Authorize(client: Client) => // this case is hit 
    } 
} 

Mise à jour 2:

J'ai finalement résolu le problème. Je pense que le problème était que l'acteur bloquait en essayant de recevoir un message Authorize dans la réponse au précédent message IsAuthorized.

J'ai réécrit le code de sorte qu'un acteur anonyme est démarré lorsque nous attendons un Authorized. Voici le code pour ceux qui sont intéressés. waiting est un Map[Client, Actor].

loop { 
    react { 
    case IsAuthorized(client: Client) => 
     if (authorized contains client) { 
     sender ! IsAuthorizedResponse(client, true) 
     } else { 
     val receipient = sender 
     // Start an anonymous actor that waits for an Authorize message 
     // within a given timeout and sends a reply to the consumer. 
     // The actor will be notified by the parent actor below. 
     waiting += client -> Actor.actor { 
      val cleanup =() => { 
      waiting -= client 
      exit() 
      } 

      receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) { 
      case Authorize(c) => 
       receipient ! IsAuthorizedResponse(client, true) 
       cleanup() 
      case TIMEOUT => 
       receipient ! IsAuthorizedResponse(client, false) 
       cleanup() 
      } 
     } 
     } 

    case Authorize(client: Client) => 
     authorized += client 

     waiting.get(client) match { 
     case Some(actor) => actor ! Authorize(client) 
     case None => 
     } 

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet) 
    } 
} 

S'il existe de meilleurs moyens pour résoudre ce problème, faites-le moi savoir!

+0

Pouvez-vous nettoyer/raccourcir le code et afficher uniquement les pièces pertinentes? – Jus12

+0

J'ai mis à jour mon message. –

+0

Pourquoi ne pas utiliser 'reactWithin'? – Jus12

Répondre

0

J'ai finalement résolu le problème. Je pense que le problème était que l'acteur bloquait en essayant de recevoir un message Authorize dans la réponse au précédent message IsAuthorized.

J'ai réécrit le code de sorte qu'un acteur anonyme est démarré lorsque nous attendons un Authorized.Voici le code pour ceux qui sont intéressés. waiting est un Map[Client, Actor].

loop { 
    react { 
    case IsAuthorized(client: Client) => 
     if (authorized contains client) { 
     sender ! IsAuthorizedResponse(client, true) 
     } else { 
     val receipient = sender 
     // Start an anonymous actor that waits for an Authorize message 
     // within a given timeout and sends a reply to the consumer. 
     // The actor will be notified by the parent actor below. 
     waiting += client -> Actor.actor { 
      val cleanup =() => { 
      waiting -= client 
      exit() 
      } 

      receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) { 
      case Authorize(c) => 
       receipient ! IsAuthorizedResponse(client, true) 
       cleanup() 
      case TIMEOUT => 
       receipient ! IsAuthorizedResponse(client, false) 
       cleanup() 
      } 
     } 
     } 

    case Authorize(client: Client) => 
     authorized += client 

     waiting.get(client) match { 
     case Some(actor) => actor ! Authorize(client) 
     case None => 
     } 

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet) 
    } 
} 

S'il existe de meilleurs moyens pour résoudre ce problème, faites-le moi savoir!

0

Ne répond pas le problème? Dans

case IsAuthorized(client: Client) => reply { ... } 

tout le code est en argument pour répondre bloc, il est exécuté (inclus le receiveWithing) avant que la réponse soit envoyé. Ce qui signifie que lorsque votre client traitera votre réponse, vous n'attendrez plus son.

Dans votre code d'origine, il devrait probablement être quelque chose comme

case IsAuthorized(client: Client) => 
    if(ok) reply(AuthorizedReply(client, true)) 
    else { 
    reply(AuthorizedReply(client, false)) 
    receiveWithin(...) 
    } 
+0

"ainsi il est exécuté (inclus le receiveWithing) avant que la réponse soit réellement envoyée" C'est exactement ce que je veux :-) La réponse devrait être bloquante jusqu'à ce que le client ait été autorisé ou le délai d'expiration. Ce code est inspiré de cet exemple [ici] (http://www.scala-lang.org/node/242#SecondExample) (voir le troisième extrait de code du chapitre "Deuxième exemple"). Votre code n'est pas correct non plus car si le client n'est pas encore autorisé, vous enverrez instantanément une réponse avec "false" et attendez les messages Authorize entrants. –

Questions connexes