L'objectif est d'envoyer le WSConnectEvent
une fois qu'un client est connecté et que le flux commence. Avec des flux akka-1.0, je suis en mesure d'accomplir ce qui suit:Comment envoyer un message à ActorRef au début du flux Akka-Stream 2.0?
Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
implicit builder =>
sdpSource =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
(fromWebsocket.inlet, toWebsocket.outlet)
}
En essayant de mettre à jour ce de travailler avec Akka-Streams 2.0.1 j'ai changé le code suivant, mais je ne suis pas loner recevoir la WSConnectEvent
message. Je ne suis pas sûr si c'est parce que ma source est installée incorrectement, ou je ne matérialise pas le ActorRef
correctement.
val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(
GraphDSL.create() { implicit builder =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
FlowShape(fromWebsocket.in, toWebsocket.out)
}
)
Je pense que vous pourriez faire référence à builder.materializedValue, car Source n'expose pas cette propriété (du moins pas avec akka-streams 2.0.1). Après un peu plus de recherche, je ne suis pas sûr qu'il est possible de construire une Source à partir d'un Source.actorRef tel qu'il puisse émettre le premier message. –
Oui, désolé si je n'étais pas clair, il est disponible depuis le constructeur, (c'est aussi une méthode sur une instance de Source, c'est pourquoi je me suis référé à ça comme ça). – johanandren
Vous pouvez voir un exemple qui fait quelque chose comme ce que vous essayez de faire qui pourrait être utile ici: https://github.com/johanandren/scala-stockholm-cluster-message-broker/blob/master/src/main/ scala/WebServer.scala # L82 – johanandren