2017-08-05 3 views
0

Je suis en train de mettre en œuvre un Akka personnalisé Sink, mais je ne pouvais pas trouver un moyen de gérer l'avenir à l'intérieur.Comment faire face à l'avenir à l'intérieur d'un évier aka personnalisé?

class EventSink(...) { 

    val in: Inlet[EventEnvelope2] = Inlet("EventSink") 
    override val shape: SinkShape[EventEnvelope2] = SinkShape(in) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new GraphStageLogic(shape) { 

     // This requests one element at the Sink startup. 
     override def preStart(): Unit = pull(in) 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val future = handle(grab(in)) 
      Await.ready(future, Duration.Inf) 
      /* 
      future.onComplete { 
      case Success(_) => 
       logger.info("pulling next events") 
       pull(in) 
      case Failure(failure) => 
       logger.error(failure.getMessage, failure) 
       throw failure 
      }*/ 
      pull(in) 
     } 
     }) 
    } 
    } 

    private def handle(envelope: EventEnvelope2): Future[Unit] = { 
    val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope 
    ... 
    db.run(statements.transactionally) 
    } 
} 

Je dois aller bloquer l'avenir en ce moment, ce qui ne semble pas bon. Le non-bloquant que j'ai commenté ne fonctionne que pour le premier événement. Quelqu'un pourrait-il m'aider?


Mise à jour Merci @ViktorKlang. Cela semble fonctionner maintenant.

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
{ 
    new GraphStageLogic(shape) { 
     val callback = getAsyncCallback[Try[Unit]] { 
     case Success(_) => 
      //completeStage() 
      pull(in) 
     case Failure(error) => 
      failStage(error) 
     } 

     // This requests one element at the Sink startup. 
     override def preStart(): Unit = { 
     pull(in) 
     } 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val future = handle(grab(in)) 
      future.onComplete { result => 
      callback.invoke(result) 
      } 
     } 
     }) 
    } 
    } 

Je suis en train de mettre en œuvre un récepteur d'événements Rational DB connnecting à ReadJournal.eventsByTag. Donc, c'est un flux continu, qui ne finira jamais à moins qu'il y ait une erreur - C'est ce que je veux. Est-ce que mon approche est correcte?

Deux autres questions:

  1. Est-ce que le GraphStage jamais fin à moins que j'invoque manuellement completeStage ou failStage?

  2. Ai-je raison ou normal de déclarer le rappel à l'extérieur méthode prédémarrage? et ai-je raison d'invoquer pull (in) dans preStart dans ce cas?

Merci, Cheng

+0

http://doc.akka.io/docs/akka/current/scala/ stream/stream-customise.html # using-asynchronous-side-channels –

+0

Merci @ViktorKlang, j'ai déjà lu cela avant. Je n'ai rien trouvé d'utile. – Cheng

+0

Pourquoi le segment que j'ai lié n'est-il pas utile? Est-ce que tu l'as essayé? –

Répondre

0

contournements personnalisés étapes

En général, vous devriez essayer d'épuiser toutes les possibilités avec les méthodes données de la bibliothèque de Source, Flow et Sink. Les étapes personnalisées ne sont presque jamais nécessaires et rendent votre code difficile à maintenir.

Rédaction de votre « Custom » étape utilisant des méthodes standard

Basé sur les détails de votre code d'exemple de question que je ne vois aucune raison pour laquelle vous utiliseriez une coutume Sink pour commencer.

Étant donné votre méthode handle, vous pouvez légèrement modifier pour faire l'enregistrement que vous avez spécifié dans la question:

val loggedHandle : (EventEnvelope2) => Future[Unit] = 
    handle(_) transform { 
    case Success(_)  => { 
     logger.info("pulling next events") 
     Success(Unit) 
    } 
    case Failure(failure) => { 
     logger.error(failure.getMessage, failure) 
     Failure(failure) 
    } 
    } 

Ensuite, il suffit d'utiliser Sink.foreachParallel pour gérer les enveloppes:

val createEventEnvelope2Sink : (Int) => Sink[EventEnvelope2, Future[Done]] = 
    (parallelism) => 
    Sink[EventEnvelope2].foreachParallel(parallelism)(handle _) 

Maintenant, même si vous voulez que chaque EventEnvelope2 à envoyer à la db afin que vous pouvez simplement utiliser 1 pour le parallélisme:

val inOrderDBInsertSink : Sink[EventEnvelope2, Future[Done]] = 
    createEventEnvelope2Sink(1) 

En outre, si la base de données lance une exception, vous pouvez toujours obtenir une prise de celui-ci lorsque le foreachParallel termine:

val someEnvelopeSource : Source[EventEnvelope2, _] = ??? 

someEnvelopeSource 
    .to(createEventEnvelope2Sink(1)) 
    .run() 
    .andThen { 
    case Failure(throwable) => { /*deal with db exception*/ } 
    case Success(_)   => { /*all inserts succeeded*/ } 
    }