Je suis en train de mettre en œuvre un Akka personnalisé Sink, mais je ne pouvais pas trouver un moyen de gérer l'avenir à l'intérieur.Comment faire face à l'avenir à l'intérieur d'un évier aka personnalisé?
class EventSink(...) {
val in: Inlet[EventEnvelope2] = Inlet("EventSink")
override val shape: SinkShape[EventEnvelope2] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
// This requests one element at the Sink startup.
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
Await.ready(future, Duration.Inf)
/*
future.onComplete {
case Success(_) =>
logger.info("pulling next events")
pull(in)
case Failure(failure) =>
logger.error(failure.getMessage, failure)
throw failure
}*/
pull(in)
}
})
}
}
private def handle(envelope: EventEnvelope2): Future[Unit] = {
val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope
...
db.run(statements.transactionally)
}
}
Je dois aller bloquer l'avenir en ce moment, ce qui ne semble pas bon. Le non-bloquant que j'ai commenté ne fonctionne que pour le premier événement. Quelqu'un pourrait-il m'aider?
Mise à jour Merci @ViktorKlang. Cela semble fonctionner maintenant.
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
new GraphStageLogic(shape) {
val callback = getAsyncCallback[Try[Unit]] {
case Success(_) =>
//completeStage()
pull(in)
case Failure(error) =>
failStage(error)
}
// This requests one element at the Sink startup.
override def preStart(): Unit = {
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
future.onComplete { result =>
callback.invoke(result)
}
}
})
}
}
Je suis en train de mettre en œuvre un récepteur d'événements Rational DB connnecting à ReadJournal.eventsByTag. Donc, c'est un flux continu, qui ne finira jamais à moins qu'il y ait une erreur - C'est ce que je veux. Est-ce que mon approche est correcte?
Deux autres questions:
Est-ce que le GraphStage jamais fin à moins que j'invoque manuellement completeStage ou failStage?
Ai-je raison ou normal de déclarer le rappel à l'extérieur méthode prédémarrage? et ai-je raison d'invoquer pull (in) dans preStart dans ce cas?
Merci, Cheng
http://doc.akka.io/docs/akka/current/scala/ stream/stream-customise.html # using-asynchronous-side-channels –
Merci @ViktorKlang, j'ai déjà lu cela avant. Je n'ai rien trouvé d'utile. – Cheng
Pourquoi le segment que j'ai lié n'est-il pas utile? Est-ce que tu l'as essayé? –