Maintenant que Source.actorPublisher
est déprécié, je voulais trouver un remplacement adéquat.Remplacement de Source.actorPublisher déprécié par Source.fromGraph - comment limiter?
Attention: je suis toujours un Akb newb, essayant de trouver mon chemin!
Fondamentalement ce que j'ai est un websocket, où le serveur pousse un nouveau message toutes les secondes.
Code pertinent:
// OLD, deprecated way
//val source: Source[TextMessage.Strict, ActorRef] = Source.actorPublisher[String](Props[KeepAliveActor]).map(i => TextMessage(i))
// NEW way
val sourceGraph: Graph[SourceShape[TextMessage.Strict], NotUsed] = new KeepAliveSource
val source: Source[TextMessage.Strict, NotUsed] = Source.fromGraph(sourceGraph)
val requestHandler: HttpRequest => HttpResponse =
{
case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
req.header[UpgradeToWebSocket] match
{
case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source)
case None => HttpResponse(400, entity = "Not a valid websocket request")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
l'ancien acteur: (essentiellement pris de: Actorpublisher as source in handleMessagesWithSinkSource)
case class KeepAlive()
class KeepAliveActor extends ActorPublisher[String]
{
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, KeepAlive())
var cnt = 0
var buffer = Vector.empty[String]
override def receive: Receive =
{
case KeepAlive() =>
{
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0)
{
onNext(s"${cnt}th Message from server!")
}
else {
buffer :+= cnt.toString
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
L'ancienne fonctionne comme un charme.
Maintenant, le nouveau code, basé sur un GraphStage
class KeepAliveSource extends GraphStage[SourceShape[TextMessage.Strict]]
{
import scala.concurrent.duration._
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
new TimerGraphStageLogic(shape)
{
// All state MUST be inside the GraphStageLogic,
// never inside the enclosing GraphStage.
// This state is safe to access and modify from all the
// callbacks that are provided by GraphStageLogic and the
// registered handlers.
private var counter = 1
setHandler(out, new OutHandler
{
override def onPull(): Unit =
{
push(out, TextMessage(counter.toString))
counter += 1
schedulePeriodically(None, 1 second)
}
})
}
}
val out: Outlet[TextMessage.Strict] = Outlet("KeepAliveSource")
override def shape: SourceShape[TextMessage.Strict] = SourceShape(out)
}
Pour une raison quelconque, cela me inonde encore, même si j'avais pris la schedulePeriodically(None, 1 second)
ajouterait un délai de 1 seconde entre chaque message. Je me trompe évidemment cependant.
L'augmentation de cette valeur ne change pas le fait que mon pauvre navigateur ne peut pas gérer les demandes et les accidents (je peux le voir dans le journal de l'simple websocket client
)
Et comment faire avec 'GraphStage' et' TimerGraphStageLogic'? – Sorona
a ajouté info à la réponse –
Ne fonctionne pas pour moi comme dans 'TimerGraphStageLogic' il dit' pas encore initialisé: seulement setHandler est autorisé dans le constructeur GraphStageLogic' – Sorona