2017-09-30 8 views
1

Un acteur initialise un flux Akka qui se connecte à un socket Web. Ceci est fait en utilisant un Source.actorRef auquel les messages peuvent être envoyés, qui sont ensuite traités par le webSocketClientFlow et consommés par un Sink.foreach. Cela peut être vu dans le code suivant (dérivé de akka docs):Pourquoi le framework Play ne ferme-t-il pas Akka Stream?

class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging { 

    final implicit val system: ActorSystem = ActorSystem() 
    final implicit val materializer: ActorMaterializer = ActorMaterializer() 

    def receive = { 
    case _ => 
    } 

    // Consume the incoming messages from the websocket. 
    val incoming: Sink[Message, Future[Done]] = 
    Sink.foreach[Message] { 
    case message: TextMessage.Strict => 
     println(message.text) 
    case misc => println(misc) 
    } 

    // Source through which we can send messages to the websocket. 
    val outgoing: Source[TextMessage, ActorRef] = 
    Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail) 

    // flow to use (note: not re-usable!) 
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com")) 

    // Materialized the stream 
    val ((ws,upgradeResponse), closed) = 
    outgoing 
    .viaMat(webSocketFlow)(Keep.both) 
    .toMat(incoming)(Keep.both) // also keep the Future[Done] 
    .run() 

    // Check whether the server has accepted the websocket request. 
    val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
    } else { 
     throw new RuntimeException(s"Failed: ${upgrade.response.status}") 
    } 
    } 

    // When the connection has been established. 
    connected.onComplete(println) 

    // When the stream has closed 
    closed.onComplete { 
    case Success(_) => println("Test Websocket closed gracefully") 
    case Failure(e) => log.error("Test Websocket closed with an error\n", e) 
    } 

} 

Lorsque le cadre de jeu recompile il ferme la TestActor mais ne ferme pas le flux Akka. Uniquement lorsque les délais d'attente websocket le flux est fermé.

Est-ce que cela signifie que je dois fermer le flux manuellement par exemple, l'envoi de l'acteur créé avec Source.actorRef un PoisonPill dans la TestActor fonction PostStop?

Remarque: J'ai aussi essayé d'injecter la Materializer et la Actorsystem-à-dire:

@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem) 

Lorsque la lecture recompile, le flux est fermé, mais produit également une erreur:

[error] a.a.ActorSystemImpl - Websocket handler failed with 
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]] 
terminated abruptly 

Répondre

1

En votre premier exemple, vous créez un système d'acteur dans votre acteur. Vous ne devriez pas faire cela - les systèmes d'acteur sont chers, en créant un moyen de démarrer les pools de threads, en démarrant les planificateurs, etc. De plus, vous ne le fermez jamais, ce qui signifie que vous avez un problème beaucoup plus important que le flux - vous avez une fuite de ressources, les pools de threads créés par le système d'acteur ne sont jamais fermés. Donc, essentiellement, chaque fois que vous recevez une connexion WebSocket, vous créez un nouveau système d'acteur avec un nouveau jeu de pools de threads, et vous ne les fermez jamais. En production, même avec une petite charge (quelques requêtes par seconde), votre application va manquer de mémoire en quelques minutes.

En général dans Play, vous ne devez jamais créer votre propre système d'acteur, mais plutôt en injecter un. De l'intérieur d'un acteur, vous n'avez même pas besoin de l'avoir injecté parce qu'il est automatiquement - context.system vous donne accès au système d'acteur qui a créé l'acteur. De même pour les matérialisateurs, ceux-ci ne sont pas aussi lourds, mais si vous en créez un par connexion, vous pourriez aussi manquer de mémoire si vous ne le fermez pas, alors vous devriez l'avoir injecté. Donc quand vous l'avez injecté, vous obtenez une erreur - c'est difficile à éviter, mais pas impossible. La difficulté est qu'Akka elle-même ne peut pas vraiment savoir automatiquement quel ordre les choses doivent être fermées pour fermer les choses gracieusement, si elle éteint d'abord votre acteur, afin qu'il puisse fermer les flux gracieusement, ou devrait-il fermer les flux vers le bas, afin qu'ils puissent informer votre acteur qu'ils sont fermés et répondre en conséquence?

Akka 2.5 a une solution pour cela, une séquence d'arrêt géré, où vous pouvez enregistrer des choses à être arrêté avant que le système d'acteur commence à tuer les choses dans un ordre un peu au hasard:

https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown

Vous pouvez utiliser En combinaison avec Akka, les flux kill switches permettent d'arrêter vos flux avant que le reste de l'application ne soit arrêté.

Mais généralement, les erreurs d'arrêt sont assez bénignes, donc si c'était moi, je ne m'en soucierais pas.