2017-10-19 31 views
0

Je suis abonné à chaud observable, puis en appliquant opérateur timeout à elle, mais je ne veux pas se désabonner lorsque TimeoutException est levé, produire uniquement un élément spécial (Je sais que la source émettra finalement de nouvelles articles). Comment puis-je y parvenir?RxJava timeout sans se désabonner de la source

J'essaie de combiner le délai d'attente avec onErrorReturn mais encore une fois cela provoque onComplete appel sur l'abonné.

Répondre

0

publier, délai d'attente et réessayer (adapté de my older answer):

Observable<Long> source = 
    Observable.just(100L, 200L, 500L, 1000L, 5000L, 5500L, 6000L) 
    .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(a -> v)); 

source.publish(co -> 
    co.timeout(750, TimeUnit.MILLISECONDS, 
     Observable.just(-1L) 
     .concatWith(Observable.error(new RuntimeException())) 
    ) 
    .retry() 
).blockingForEach(System.out::println); 
+0

fonctionne comme un charme. Dans mon cas, la source est déjà chaude, donc pas besoin d'utiliser publier. Ce qui me manquait, c'est la combinaison de concatWith() et retry() qui est la clé je pense. – O10