2017-06-26 1 views
2

je voudrais chaîne une série de Flow « s de la forme a -> Try[b], où chaque étage successif gère l'Success cas de la précédente, et le Sink à la fin gère l'ensemble Failure générique.monadique court-circuit à Acre Streams

Est-ce que ceci ou quelque chose d'équivalent peut être encodé succinctement? C'est en fait un flux linéaire, mais je ne suis pas sûr de savoir à court de diffusion et de fusion à chaque étape.

+0

Ne pouvez-vous pas simplement mapper 'recover' et flatMap le résultat? Peut-être que je suis un malentendu. :) – erip

Répondre

1

Une façon de résoudre est de définir une étape sortance partitionner les Try dans deux cours d'eau, en fonction de son résultat

object PartitionTry { 
    def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒ 
     import GraphDSL.Implicits._ 

     val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a }) 
     val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t }) 
     val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1))) 

     partition ~> failure 
     partition ~> success 

     new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out) 
    } 
    } 

Ensuite, votre flux générique peut ingérer Try s et envoyer les Failure s à un évier de choix, tout en passant les Success es sur

object ErrorHandlingFlow { 
    def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
     GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒ 
     import GraphDSL.Implicits._ 

     val partition = builder.add(PartitionTry[T]()) 

     partition.out0 ~> sink 

     new FlowShape[Try[T], T](partition.in, partition.out1) 
     } 
    ) 
    } 

exemple d'utilisation ci-dessous

val source  : Source[String, NotUsed]   = Source(List("1", "2", "hello")) 
    val convert  : Flow[String, Try[Int], NotUsed] = Flow.fromFunction((s: String) ⇒ Try{s.toInt}) 
    val errorsSink : Sink[Throwable, Future[Done]]  = Sink.foreach[Throwable](println) 
    val handleErrors: Flow[Try[Int], Int, Future[Done]] = ErrorHandlingFlow(errorsSink) 

    source.via(convert).via(handleErrors).runForeach(println) 

Notez que

  • les 2 étapes définies ci-dessus sont réutilisables pour tout type (écrire une fois, utilisez partout)
  • cette approche peut être réutilisé pour d'autres classes de type - comme Either, etc.
+0

Merci beaucoup! –