2017-07-01 3 views
0

Maintenant que Source.actorPublisher est déprécié, je voulais trouver un remplacement adéquat.Remplacement de Source.actorPublisher déprécié par Source.fromGraph - comment limiter?

Attention: je suis toujours un Akb newb, essayant de trouver mon chemin!

Fondamentalement ce que j'ai est un websocket, où le serveur pousse un nouveau message toutes les secondes.

Code pertinent:

// OLD, deprecated way 
//val source: Source[TextMessage.Strict, ActorRef] = Source.actorPublisher[String](Props[KeepAliveActor]).map(i => TextMessage(i)) 

// NEW way 
val sourceGraph: Graph[SourceShape[TextMessage.Strict], NotUsed] = new KeepAliveSource 
val source: Source[TextMessage.Strict, NotUsed] = Source.fromGraph(sourceGraph) 

val requestHandler: HttpRequest => HttpResponse = 
{ 
    case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) => 
    req.header[UpgradeToWebSocket] match 
    { 
     case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source) 
     case None => HttpResponse(400, entity = "Not a valid websocket request") 
    } 
    case r: HttpRequest => 
    r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
    HttpResponse(404, entity = "Unknown resource!") 
} 

l'ancien acteur: (essentiellement pris de: Actorpublisher as source in handleMessagesWithSinkSource)

case class KeepAlive() 

class KeepAliveActor extends ActorPublisher[String] 
{ 
    import scala.concurrent.duration._ 
    implicit val ec = context.dispatcher 

    val tick = context.system.scheduler.schedule(1 second, 1 second, self, KeepAlive()) 

    var cnt = 0 
    var buffer = Vector.empty[String] 

    override def receive: Receive = 
    { 
    case KeepAlive() => 
    { 
     cnt = cnt + 1 
     if (buffer.isEmpty && totalDemand > 0) 
     { 
     onNext(s"${cnt}th Message from server!") 
     } 
     else { 
     buffer :+= cnt.toString 
     if (totalDemand > 0) { 
      val (use,keep) = buffer.splitAt(totalDemand.toInt) 
      buffer = keep 
      use foreach onNext 
     } 
     } 
    } 
    } 

    override def postStop() = tick.cancel() 
} 

L'ancienne fonctionne comme un charme.

Maintenant, le nouveau code, basé sur un GraphStage

class KeepAliveSource extends GraphStage[SourceShape[TextMessage.Strict]] 
{ 
    import scala.concurrent.duration._ 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    { 
    new TimerGraphStageLogic(shape) 
    { 
     // All state MUST be inside the GraphStageLogic, 
     // never inside the enclosing GraphStage. 
     // This state is safe to access and modify from all the 
     // callbacks that are provided by GraphStageLogic and the 
     // registered handlers. 

     private var counter = 1 
     setHandler(out, new OutHandler 
     { 
     override def onPull(): Unit = 
     { 
      push(out, TextMessage(counter.toString)) 
      counter += 1 
      schedulePeriodically(None, 1 second) 
     } 
     }) 
    } 
    } 

    val out: Outlet[TextMessage.Strict] = Outlet("KeepAliveSource") 
    override def shape: SourceShape[TextMessage.Strict] = SourceShape(out) 
} 

Pour une raison quelconque, cela me inonde encore, même si j'avais pris la schedulePeriodically(None, 1 second) ajouterait un délai de 1 seconde entre chaque message. Je me trompe évidemment cependant.

L'augmentation de cette valeur ne change pas le fait que mon pauvre navigateur ne peut pas gérer les demandes et les accidents (je peux le voir dans le journal de l'simple websocket client)

Répondre

1

Ce schedulePeriodically appel n'affecte pas le comportement de votre étape. Chaque fois qu'une étape en aval demande un message, le gestionnaire onPull est appelé et un message est push immédiatement supprimé. C'est pourquoi vous ne voyez aucune limitation.

Bien que le DSL GraphStage (celui que vous avez choisi) est très flexible, il est également plus difficile à obtenir. Pour les tâches simples comme celle-ci, il serait préférable de tirer parti des étapes de niveau supérieur qu'Akka fournit. Comme Source.tick (docs).

val tickingSource: Source[String, Cancellable] = 
    Source.tick(initialDelay = 1.second, interval = 5.seconds, tick = "hello") 

Dans votre exemple, vous avez besoin d'un compteur incrémentée à publier, afin que vous puissiez attacher plus logique à la source à retardement, par exemple

val tickingSource: Source[Strict, Cancellable] = 
    Source.tick(initialDelay = 1.second, interval = 5.seconds, tick = NotUsed) 
     .zipWithIndex 
     .map{ case (_, counter) ⇒ TextMessage(counter.toString) } 

Si vous êtes intéressé par la façon dont les GraphStage œuvres sous-jacentes, vous pouvez toujours jeter un oeil à un code TickSource lui-même (voir github). La principale différence est que TickSource appelle push dans le rappel onTimer (qui provient de TimerGraphStageLogic et que vous pouvez remplacer).

+0

Et comment faire avec 'GraphStage' et' TimerGraphStageLogic'? – Sorona

+0

a ajouté info à la réponse –

+0

Ne fonctionne pas pour moi comme dans 'TimerGraphStageLogic' il dit' pas encore initialisé: seulement setHandler est autorisé dans le constructeur GraphStageLogic' – Sorona