Je construis un service basé sur l'acteur à Scala où les consommateurs peuvent demander si les clients sont autorisés et peuvent également autoriser les clients.Scala acteur: receiveWithin() ne reçoit pas de messages
Si un consommateur interroge l'état d'autorisation d'un client et que ce client n'est pas encore autorisé, l'acteur doit attendre les messages Authorize
entrants dans un délai d'attente spécifié, puis envoyer une réponse. IsAuthorized
devrait pouvoir être exécuté de manière synchrone dans le code du consommateur afin qu'il bloque et attende une réponse. Quelque chose comme
service !? IsAuthorized(client) => {
case IsAuthorizedResponse(_, authorized) => // do something
}
Cependant receiveWithin()
dans mon acteur ne reçoit jamais un message et fonctionne toujours dans le délai d'attente.
Voici mon code
case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)
class ClientAuthorizationService {
private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
def actor = Actor.actor {
loop {
react {
case IsAuthorized(client: Client) => reply {
if (authorized contains client) {
IsAuthorizedResponse(client, true)
} else {
waiting += client
var matched = false;
val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)
while (!matched && Instant.now.isBefore(end)) {
// ERROR HERE: Never receives Authorize messages
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(authorizedClient: Client) => {
authorizeClient(authorizedClient)
if (authorizedClient == client) matched = true
}
case TIMEOUT => // do nothing since we handle the timeout in the while loop
}
}
IsAuthorizedResponse(client, matched)
}
}
case Authorize(client: Client) => authorizeClient(client)
case WaitingForAuthorization => reply {
WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
}
}
}
}
private def authorizeClient(client: Client) = synchronized {
authorized += client
waiting -= client
}
}
object ClientAuthorizationService {
val AUTH_TIMEOUT: Long = 60 * 1000;
}
Quand j'envoie un message Authorize
à l'acteur alors qu'il est dans le receiveWithin bloquer les messages est pris par la deuxième déclaration de cas ci-dessous qui devrait en fait que rattraper ces messages lorsqu'aucune on attend une réponse à ce moment-là.
Quel est le problème avec mon code?
Mise à jour:
Voici une version abrégée du code correspondant qui représentent en fait une logique beaucoup plus simple et différent, mais peut-être une meilleure définition du problème:
loop {
react {
case IsAuthorized(client: Client) => reply {
var matched = false
// In the "real" logic we would actually loop here until either the
// authorized client matches the requested client or the timeout is hit.
// For the sake of the demo we only take the first Authorize message.
receiveWithin(60*1000) {
// Although Authorize is send to actor it's never caught here
case Authorize(authorizedClient: Client) => matched = authorizedClient == client
case TIMEOUT =>
}
IsAuthorizedResponse(client, matched)
}
case Authorize(client: Client) => // this case is hit
}
}
Mise à jour 2:
J'ai finalement résolu le problème. Je pense que le problème était que l'acteur bloquait en essayant de recevoir un message Authorize
dans la réponse au précédent message IsAuthorized
.
J'ai réécrit le code de sorte qu'un acteur anonyme est démarré lorsque nous attendons un Authorized
. Voici le code pour ceux qui sont intéressés. waiting
est un Map[Client, Actor]
.
loop {
react {
case IsAuthorized(client: Client) =>
if (authorized contains client) {
sender ! IsAuthorizedResponse(client, true)
} else {
val receipient = sender
// Start an anonymous actor that waits for an Authorize message
// within a given timeout and sends a reply to the consumer.
// The actor will be notified by the parent actor below.
waiting += client -> Actor.actor {
val cleanup =() => {
waiting -= client
exit()
}
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(c) =>
receipient ! IsAuthorizedResponse(client, true)
cleanup()
case TIMEOUT =>
receipient ! IsAuthorizedResponse(client, false)
cleanup()
}
}
}
case Authorize(client: Client) =>
authorized += client
waiting.get(client) match {
case Some(actor) => actor ! Authorize(client)
case None =>
}
case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
}
}
S'il existe de meilleurs moyens pour résoudre ce problème, faites-le moi savoir!
Pouvez-vous nettoyer/raccourcir le code et afficher uniquement les pièces pertinentes? – Jus12
J'ai mis à jour mon message. –
Pourquoi ne pas utiliser 'reactWithin'? – Jus12