Une façon de résoudre est de définir une étape sortance partitionner les Try
dans deux cours d'eau, en fonction de son résultat
object PartitionTry {
def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒
import GraphDSL.Implicits._
val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a })
val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t })
val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
partition ~> failure
partition ~> success
new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
}
}
Ensuite, votre flux générique peut ingérer Try
s et envoyer les Failure
s à un évier de choix, tout en passant les Success
es sur
object ErrorHandlingFlow {
def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒
import GraphDSL.Implicits._
val partition = builder.add(PartitionTry[T]())
partition.out0 ~> sink
new FlowShape[Try[T], T](partition.in, partition.out1)
}
)
}
exemple d'utilisation ci-dessous
val source : Source[String, NotUsed] = Source(List("1", "2", "hello"))
val convert : Flow[String, Try[Int], NotUsed] = Flow.fromFunction((s: String) ⇒ Try{s.toInt})
val errorsSink : Sink[Throwable, Future[Done]] = Sink.foreach[Throwable](println)
val handleErrors: Flow[Try[Int], Int, Future[Done]] = ErrorHandlingFlow(errorsSink)
source.via(convert).via(handleErrors).runForeach(println)
Notez que
- les 2 étapes définies ci-dessus sont réutilisables pour tout type (écrire une fois, utilisez partout)
- cette approche peut être réutilisé pour d'autres classes de type - comme
Either
, etc.
Ne pouvez-vous pas simplement mapper 'recover' et flatMap le résultat? Peut-être que je suis un malentendu. :) – erip