2016-01-01 1 views
3

L'objectif est d'envoyer le WSConnectEvent une fois qu'un client est connecté et que le flux commence. Avec des flux akka-1.0, je suis en mesure d'accomplir ce qui suit:Comment envoyer un message à ActorRef au début du flux Akka-Stream 2.0?

Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) { 
    implicit builder => 
    sdpSource => 

     // Incoming SDP offer flow 
     val fromWebsocket = builder.add(Flow[Message].collect { 
     case TextMessage.Strict(txt) => { 
      val event = txt.parseJson.convertTo[WSResponseEvent] 
      WSMessageEvent(callUUID, userUUID, event.id, event.data) 
     } 
     }) 

     // Outgoing SDP answer flow 
     val toWebsocket = builder.add(Flow[WSResponseEvent].map { 
     case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint) 
     }) 

     val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}") 
     val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf); 
     val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)) 

     // Join events, also sends actor for sending stuff 
     val merge = builder.add(Merge[CallControlEvent](2)) 
     val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor)) 

     fromWebsocket ~> merge.in(0) 
     actorAsSource ~> merge.in(1) 

     merge ~> callActorSink 
     sdpSource ~> toWebsocket 
     (fromWebsocket.inlet, toWebsocket.outlet) 
} 

En essayant de mettre à jour ce de travailler avec Akka-Streams 2.0.1 j'ai changé le code suivant, mais je ne suis pas loner recevoir la WSConnectEvent message. Je ne suis pas sûr si c'est parce que ma source est installée incorrectement, ou je ne matérialise pas le ActorRef correctement.

val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail) 

Flow.fromGraph(
    GraphDSL.create() { implicit builder => 

    // Incoming SDP offer flow 
    val fromWebsocket = builder.add(Flow[Message].collect { 
     case TextMessage.Strict(txt) => { 
     val event = txt.parseJson.convertTo[WSResponseEvent] 
     WSMessageEvent(callUUID, userUUID, event.id, event.data) 
     } 
    }) 

    // Outgoing SDP answer flow 
    val toWebsocket = builder.add(Flow[WSResponseEvent].map { 
     case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint) 
    }) 

    val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}") 
    val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf); 
    val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)) 

    // Join events, also sends actor for sending stuff 
    val merge = builder.add(Merge[CallControlEvent](2)) 
    val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _)) 

    fromWebsocket ~> merge.in(0) 
    actorAsSource ~> merge.in(1) 

    merge ~> callActorSink 
    sdpSource ~> toWebsocket 
    FlowShape(fromWebsocket.in, toWebsocket.out) 
    } 
) 

Répondre

1

Merci à l'aide de johanandren, mapMaterializedValue n'a pas été la bonne approche, au lieu que je devais construire un flux pour envoyer le WSConnectEvent et connecter la sortie builder.materializeValue à travers elle jusqu'à la 'fusion' dans le port, comme ceci:

// Join events, also sends actor for sending stuff 
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _)) 

builder.materializedValue ~> actorConnected ~> merge.in(1) 

l'exemple de travail complet:

val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail) 

Flow.fromGraph(GraphDSL.create(sdpSource) { 
    implicit builder => 
    { (responseSource) => 
     import GraphDSL.Implicits._ 

     // Incoming SDP offer flow 
     val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match { 
     case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_)) 
     case bm: BinaryMessage => 
      bm.dataStream.runWith(Sink.ignore) 
      Future.successful(None) 
     }).collect { 
     case Some(txt) => { 
      val event = txt.parseJson.convertTo[WSResponseEvent] 
      WSMessageEvent(callUUID, userUUID, event.id, event.data) 
     } 
     }) 

     // Outgoing SDP answer flow 
     val toWebsocket = builder.add(Flow[WSResponseEvent].map { 
     case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint) 
     }) 

     val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}") 
     val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes); 
     val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))) 

     // Join events, also sends actor for sending stuff 
     val merge = builder.add(Merge[CallControlEvent](2)) 
     val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _)) 

     fromWebsocket ~> merge.in(0) 
     builder.materializedValue ~> actorConnected ~> merge.in(1) 

     merge ~> toCallActor 
     responseSource ~> toWebsocket 

     FlowShape.of(fromWebsocket.in, toWebsocket.out) 
    } 
}) 
0

L'appel à sdpSource.mapMaterializedValue(...) transforme seulement la valeur d'un type matérialisée (ActorRef-WSConnectEvent), il n'a pas de quelque façon émet comme un élément de la Source. Toutefois, fournit une source qui émettra la valeur matérialisée une fois que le graphique est matérialisé.

Alors, ce que vous voulez faire est:

fromWebsocket     ~> merge.in(0) 
actorAsSource.materializedValue ~> merge.in(1) 
+0

Je pense que vous pourriez faire référence à builder.materializedValue, car Source n'expose pas cette propriété (du moins pas avec akka-streams 2.0.1). Après un peu plus de recherche, je ne suis pas sûr qu'il est possible de construire une Source à partir d'un Source.actorRef tel qu'il puisse émettre le premier message. –

+0

Oui, désolé si je n'étais pas clair, il est disponible depuis le constructeur, (c'est aussi une méthode sur une instance de Source, c'est pourquoi je me suis référé à ça comme ça). – johanandren

+0

Vous pouvez voir un exemple qui fait quelque chose comme ce que vous essayez de faire qui pourrait être utile ici: https://github.com/johanandren/scala-stockholm-cluster-message-broker/blob/master/src/main/ scala/WebServer.scala # L82 – johanandren