Un acteur initialise un flux Akka qui se connecte à un socket Web. Ceci est fait en utilisant un Source.actorRef
auquel les messages peuvent être envoyés, qui sont ensuite traités par le webSocketClientFlow
et consommés par un Sink.foreach
. Cela peut être vu dans le code suivant (dérivé de akka docs):Pourquoi le framework Play ne ferme-t-il pas Akka Stream?
class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
final implicit val system: ActorSystem = ActorSystem()
final implicit val materializer: ActorMaterializer = ActorMaterializer()
def receive = {
case _ =>
}
// Consume the incoming messages from the websocket.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case misc => println(misc)
}
// Source through which we can send messages to the websocket.
val outgoing: Source[TextMessage, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
// Materialized the stream
val ((ws,upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// Check whether the server has accepted the websocket request.
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Failed: ${upgrade.response.status}")
}
}
// When the connection has been established.
connected.onComplete(println)
// When the stream has closed
closed.onComplete {
case Success(_) => println("Test Websocket closed gracefully")
case Failure(e) => log.error("Test Websocket closed with an error\n", e)
}
}
Lorsque le cadre de jeu recompile il ferme la TestActor mais ne ferme pas le flux Akka. Uniquement lorsque les délais d'attente websocket le flux est fermé.
Est-ce que cela signifie que je dois fermer le flux manuellement par exemple, l'envoi de l'acteur créé avec Source.actorRef
un PoisonPill
dans la TestActor fonction PostStop
?
Remarque: J'ai aussi essayé d'injecter la Materializer
et la Actorsystem
-à-dire:
@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)
Lorsque la lecture recompile, le flux est fermé, mais produit également une erreur:
[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]]
terminated abruptly