0

J'ai un flux de données comme ceci:Comment passer des exceptions silencieusement dans RxJava2?

Observable 
    .fromFuture(
     CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> 
      listOf(1, 2, 3, 57005, 5) 
     }, 
     Schedulers.computation() 
    ) 
    .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one 
    .map { 
     CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred. 
      if (it == 0xDEAD) { 
       throw IOException("Dead value!") 
      } 

      it 
     } 
    } 
    .flatMap { 
     Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again 
    } 
    .doOnNext { 
     println(it) // Debug 
    } 
    .blockingSubscribe() 

J'ai remplacé la logique métier (qui renvoient en fait Future s) avec CompletableFuture.supplyAsync. Et, oui, c'est Kotlin, mais je suppose que vous avez l'intention.

Quand je commente la valeur "morte" (57005), la sortie est:

1 
4 
9 
25 

Mais si cette valeur "morte" apparaît dans le flux, il échoue:

1 
4 
9 
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! 
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45) 
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86) 
    at io.reactivex.Observable.blockingSubscribe(Observable.java:5035) 
    at by.dev.madhead.rx.TestKt.main(test.kt:41) 
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! 
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
... 

Je suis un débutant en RX, si rapidement googlé pour une solution: onExceptionResumeNext: Observable.fromFuture(it) ->Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }. Mais maintenant mon application est suspendue pour toujours (après avoir produit la sortie que j'attends). On dirait que le flux ne finit jamais.

Dois-je "éteindre" que Observable en quelque sorte ou quoi? Ou, plus généralement, est-ce une bonne approche lorsque vous travaillez avec RX? Dois-je le repenser d'une autre manière?

Répondre

1

exceptions Swallow comme ceci:

Observable.fromFuture(it).onErrorResumeNext(Observable.empty()) 
+0

Cela a effectivement fait l'affaire, me stupide. Merci! – madhead