2017-10-09 8 views
1

Je travaille actuellement sur une application avec un processus d'inscription. Ce processus d'inscription communiquera, à un moment donné, avec des systèmes externes de manière asynchrone. Pour garder cette question concise, je vous montre deux acteurs importants que j'ai écrit:Acteurs enfants, futures et exceptions

SignupActor.scala

class SignupActor extends PersistentFSM[SignupActor.State, Data, DomainEvt] { 
    private val apiActor = context.actorOf(ExternalAPIActor.props(new HttpClient)) 

    // At a certain point, a CreateUser(data) message is sent to the apiActor 
} 

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor { 
    override def preRestart(reason: Throwable, message: Option[Any]) = { 
     message.foreach(context.system.scheduler.scheduleOnce(3 seconds, self, _)) 
     super.preRestart(reason, message) 
    } 

    def receive: Receive = { 
     case CreateUser(data) => 
      Await.result(
       apiClient.post(data) 
        .map(_ => UserCreatedInAPI()) 
        .pipeTo(context.parent), 
       Timeout(5 seconds).duration 
      ) 
    } 
} 

Cette la configuration semble fonctionner comme prévu. Lorsqu'il y a un problème avec l'API externe (par exemple un problème de délai d'attente ou de réseau), le Future renvoyé par HttpClient::post échoue et génère une exception grâce à Await.result. Ceci, à son tour grâce au SupervisorStrategy de l'acteur parent SignupActor, va redémarrer le ExternalAPIActor où nous pouvons renvoyer le dernier message à lui-même avec un petit délai pour éviter les interblocages.

Je vois quelques problèmes avec cette configuration:

  • Dans la méthode receive de ExternalAPIActor, le blocage se produit. Pour autant que je comprends, le blocage au sein des Acteurs est considéré comme un anti-pattern.
  • Le délai utilisé pour renvoyer le message est statique. Si l'API est indisponible pour des périodes plus longues, nous continuerons à envoyer des requêtes HTTP toutes les 3 secondes. Je voudrais plutôt une sorte de mécanisme exponentiel de recul ici.

Pour continuer avec ce dernier, je l'ai essayé ce qui suit dans le SignupActor:

SignupActor.scala

val supervisor = BackoffSupervisor.props(
    Backoff.onFailure(
     ExternalAPIActor.props(new HttpClient), 
     childName = "external-api", 
     minBackoff = 3 seconds, 
     maxBackoff = 30 seconds, 
     randomFactor = 0.2 
    ) 
) 

private val apiActor = context.actorOf(supervisor) 

Malheureusement, cela ne semble pas faire quoi que ce soit à tout - la méthode preRestart de ExternalAPIActor n'est pas appelée du tout. Lors du remplacement de Backoff.onFailure par Backoff.onStop, la méthode est appelée, mais sans aucun type de remise exponentielle du tout.

Compte tenu de ce qui précède, mes questions sont les suivantes:

  • Est-ce en utilisant Await.result le recommandé (la seule?) Façon de vous assurer les exceptions lancées dans un Future retour des services appelés au sein des acteurs sont capturés et manipulés en conséquence ? Une partie particulièrement importante de mon cas d'utilisation particulier est le fait que les messages ne doivent pas être supprimés mais retentés lorsque quelque chose ne va pas. Ou existe-t-il une autre manière (idiomatique) de gérer les exceptions dans des contextes asynchrones dans Actors? Comment utiliser le BackoffSupervisor comme prévu dans ce cas? Encore une fois: il est très important que le message responsable de l'exception ne soit pas abandonné, mais réessayé jusqu'à un nombre N fois de fois (à déterminer par l'argument maxRetries de SupervisorStrategy.
+0

J'aime le titre. – Beta

Répondre

3

est à l'aide Await.result la recommandation (la seule?) Façon de faire sûr des exceptions lancées dans un avenir retour de services appelés au sein acteurs sont capturés et manipulés en conséquence?

Non. Généralement, ce n'est pas comme cela que vous voulez gérer les échecs dans Akka. Une meilleure alternative est de tuyau l'absence de votre propre acteur, évitant ainsi la nécessité d'utiliser Await.result du tout:

def receive: Receive = { 
    case CreateUser(data) => 
    apiClient.post(data) 
     .map(_ => UserCreatedInAPI()) 
     .pipeTo(self) 
    case Success(res) => context.parent ! res 
    case Failure(e) => // Invoke retry here 
} 

Cela signifie pas de redémarrage est nécessaire pour gérer l'échec, ils font partie du flux normal de votre acteur.

Un moyen supplémentaire de gérer cela peut être de créer un «futur supervisé». REPRISES DE this blog post:

object SupervisedPipe { 

    case class SupervisedFailure(ex: Throwable) 
    class SupervisedPipeableFuture[T](future: Future[T])(implicit executionContext: ExecutionContext) { 
    // implicit failure recipient goes to self when used inside an actor 
    def supervisedPipeTo(successRecipient: ActorRef)(implicit failureRecipient: ActorRef): Unit = 
     future.andThen { 
     case Success(result) => successRecipient ! result 
     case Failure(ex) => failureRecipient ! SupervisedFailure(ex) 
     } 
    } 

    implicit def supervisedPipeTo[T](future: Future[T])(implicit executionContext: ExecutionContext): SupervisedPipeableFuture[T] = 
    new SupervisedPipeableFuture[T](future) 

    /* `orElse` with the actor receive logic */ 
    val handleSupervisedFailure: Receive = { 
    // just throw the exception and make the actor logic handle it 
    case SupervisedFailure(ex) => throw ex 
    } 

    def supervised(receive: Receive): Receive = 
    handleSupervisedFailure orElse receive 
} 

De cette façon, vous ne conduit à l'auto une fois que vous obtenez un Failure, et par ailleurs l'envoyer à l'acteur le message était destiné à envoyer, en évitant la nécessité de case Success j'ai ajouté la méthode receive. Tout ce que vous devez faire est de remplacer supervisedPipeTo avec le cadre d'origine fourni pipeTo.

+0

Merci pour votre réponse Yuval. Transférer le résultat à l'acteur lui-même semble être la bonne voie.Cependant, dans votre exemple de "futur supervisé", le message original responsable du message "SupervisedFailure" ne serait-il pas perdu? –

+1

Dans la méthode 'receive' du premier extrait de code,' pipeTo (self) 'envoie le _result_ de' Future' à 'self', donc vous devriez faire correspondre le pattern sur' UserCreatedInAPI' et 'akka.actor.Status. Failure' au lieu de 'Success' et' Failure'. – chunjef

0

D'accord, j'ai fait plus de réflexion et de bricolage et j'ai trouvé ce qui suit.

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor with Stash { 
     import ExternalAPIActor._ 

     def receive: Receive = { 
      case msg @ CreateUser(data) => 
       context.become(waitingForExternalServiceReceive(msg)) 
       apiClient.post(data) 
        .map(_ => UserCreatedInAPI()) 
        .pipeTo(self) 
     } 

     def waitingForExternalServiceReceive(event: InputEvent): Receive = LoggingReceive { 
      case Failure(_) => 
       unstashAll() 
       context.unbecome() 
       context.system.scheduler.scheduleOnce(3 seconds, self, event) 

      case msg:OutputEvent => 
       unstashAll() 
       context.unbecome() 
       context.parent ! msg 

      case _ => stash() 
     } 
} 

object ExternalAPIActor { 
    sealed trait InputEvent 
    sealed trait OutputEvent 

    final case class CreateUser(data: Map[String,Any]) extends InputEvent 
    final case class UserCreatedInAPI() extends OutputEvent 
} 

Je l'ai utilisé cette technique pour empêcher le message d'origine d'être perdu dans le cas où il y a un problème avec le service externe que nous appelons. Au cours du processus d'une demande à un service externe, je change de contexte, j'attends soit une réponse d'échec, soit un retour par la suite. Grâce au trait Stash, je peux m'assurer que les autres requêtes aux services externes ne sont pas perdues.

Depuis que j'ai plusieurs acteurs dans ma demande d'appel des services externes, j'abstraire le waitingForExternalServiceReceive à son propre trait:

WaitingForExternalService.scala

trait WaitingForExternalServiceReceive[-tInput, +tOutput] extends Stash { 

    def waitingForExternalServiceReceive(event: tInput)(implicit ec: ExecutionContext): Receive = LoggingReceive { 
    case akka.actor.Status.Failure(_) => 
     unstashAll() 
     context.unbecome() 
     context.system.scheduler.scheduleOnce(3 seconds, self, event) 

    case msg:tOutput => 
     unstashAll() 
     context.unbecome() 
     context.parent ! msg 

    case _ => stash() 
    } 
} 

Maintenant, le ExternalAPIActor peut étendre ce trait :

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor with WaitingForExternalServiceReceive[InputEvent,OutputEvent] { 
     import ExternalAPIActor._ 

     def receive: Receive = { 
      case msg @ CreateUser(data) => 
       context.become(waitingForExternalServiceReceive(msg)) 
       apiClient.post(data) 
        .map(_ => UserCreatedInAPI()) 
        .pipeTo(self) 
     } 
} 

object ExternalAPIActor { 
    sealed trait InputEvent 
    sealed trait OutputEvent 

    final case class CreateUser(data: Map[String,Any]) extends InputEvent 
    final case class UserCreatedInAPI() extends OutputEvent 
} 

Maintenant, l'acteur ne sera pas redémarré en cas de défaillance/erreurs et le message est pas perdu. De plus, le flux entier de l'acteur est maintenant non-bloquant.

Cette configuration est (très probablement) loin d'être parfaite, mais elle semble fonctionner exactement comme j'en ai besoin.