2017-10-05 6 views
0

Je suis nouveau sur RxJava, et j'ai besoin d'utiliser la fonction Observable de manière asynchrone.Timeout dans RxJava

Je dois également utiliser des délais d'attente: dans mon exemple, je veux que chaque processus se termine en 1 seconde ou moins.

Voici ce que je l'ai fait pour l'instant:

public static void hello(String name) throws IOException { 
Observable<String> obs2 = Observable.just(name).timeout(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()); 
    obs2.subscribe(new Action1<String>() { 
     @Override 
     public void call(String s) { 
      if("CCCCC".equals(s)){ 
       try { 
        Thread.sleep(3200); 
       } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     System.out.println(s + " " + new Date() +" "+Thread.currentThread().getName()); 
     } 
    }); 
} 

public static void main(final String[] args) throws InterruptedException, IOException {  
    hello("AAAAA"); 
    hello("CCCCC"); 
    hello("BBBBBB"); 
    System.in.read(); 
} 

Résultat:

AAAAA Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-2 
BBBBBB Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-4 
CCCCC Thu Oct 05 09:43:49 CEST 2017 RxIoScheduler-3 

Je me attendais en fait d'obtenir un TimeoutException du fil nommé "RxIoScheduler-3", car il a dormait depuis 3 secondes.

Quel est le problème avec mon code et mon approche des délais d'attente dans RxJava?

Merci de votre aide.

Répondre

2

Selon the docs l'opérateur timeout sera:

miroir

la source Observable, mais émettre une notification d'erreur si une période de temps écoulé sans éléments émis

Ainsi, un délai d'attente est réputé avoir eu lieu s'il y a un retard dans émettant des événements mais que vous avez mis un délai dans en consommant événements et que cela ne causera pas de délai d'expiration.

Si vous modifiez votre code pour faire une pause pendant l'émission, un délai d'expiration se produit. Par exemple:

public static void hello(String name) throws IOException { 
    Observable<String> obs2 = Observable.fromCallable(() -> { 
       if ("CCCCC".equals(name)) { 
        // pause for 150ms before emitting "CCCCC" 
        try { 
         Thread.sleep(150); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
       } 
       return name; 
      } 
    ).timeout(100, MILLISECONDS) // timeout if there is a pause in emission of more than 100ms 
      .subscribeOn(Schedulers.io()); 

    obs2.subscribe(s -> System.out.println(s + " " + new Date() + " " + Thread.currentThread().getName()), 
      throwable -> System.err.println(throwable.getClass().getSimpleName() + " " + new Date() + " " + Thread.currentThread().getName())); 
} 

En utilisant le formulaire ci-dessus hello() vous obtiendrez la sortie suivante écrite à la console:

AAAAA Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-2 
BBBBBB Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-4 
TimeoutException Thu Oct 05 10:10:33 IST 2017 RxComputationScheduler-1 
+0

vous remercie pour votre aide, il fonctionne bien! – tlebras