J'ai un flux de données comme ceci:Comment passer des exceptions silencieusement dans RxJava2?
Observable
.fromFuture(
CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>>
listOf(1, 2, 3, 57005, 5)
},
Schedulers.computation()
)
.flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one
.map {
CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred.
if (it == 0xDEAD) {
throw IOException("Dead value!")
}
it
}
}
.flatMap {
Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again
}
.doOnNext {
println(it) // Debug
}
.blockingSubscribe()
J'ai remplacé la logique métier (qui renvoient en fait Future
s) avec CompletableFuture.supplyAsync
. Et, oui, c'est Kotlin, mais je suppose que vous avez l'intention.
Quand je commente la valeur "morte" (57005
), la sortie est:
1
4
9
25
Mais si cette valeur "morte" apparaît dans le flux, il échoue:
1
4
9
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86)
at io.reactivex.Observable.blockingSubscribe(Observable.java:5035)
at by.dev.madhead.rx.TestKt.main(test.kt:41)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
...
Je suis un débutant en RX, si rapidement googlé pour une solution: onExceptionResumeNext
: Observable.fromFuture(it)
->Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }
. Mais maintenant mon application est suspendue pour toujours (après avoir produit la sortie que j'attends). On dirait que le flux ne finit jamais.
Dois-je "éteindre" que Observable
en quelque sorte ou quoi? Ou, plus généralement, est-ce une bonne approche lorsque vous travaillez avec RX? Dois-je le repenser d'une autre manière?
Cela a effectivement fait l'affaire, me stupide. Merci! – madhead