2017-09-21 5 views
0

Je ne connais pas RxJava, alors j'essaie toujours de comprendre. J'ai un Observable représentant un flux de clics sur les boutons, donc il fait chaud. Chaque fois que l'on clique sur ce bouton, je veux faire des E/S. Si cela échoue, je veux répéter et essayer de faire à nouveau cette E/S, jusqu'à ce qu'elle réussisse. Cela semble être une excellente occasion d'utiliser retry() ou repeat(), mais ceux-ci ne fonctionnent que sur des observables chauds, et non froids.Bonne façon de répéter une opération déclenchant un flux d'événements

Voici quelques pseudo-code pour obtenir ce que je suis en train de faire:

buttonRequests 
    .map(actionEvent -> doIO()) 
    .repeatAboveIfFailedUntilIOSucceeds() 
    .subscribe(...); 

J'ai pensé à utiliser flatMap pour dupliquer l'événement et que l'utilisation skip d'ignorer les autres si elle réussit, mais ne m'obtiendra pas proprement un nombre indéterminé de tentatives.

Quelle serait la bonne façon d'y penser?

+0

Voulez-vous répéter une action a échoué pour chaque bouton-clic, jusqu'à ce qu'elle réussisse? Que devrait-il se passer si l'on réessaie et si un nouveau flappe? –

+0

Oui, je réessayerais l'action pour chaque clic de bouton jusqu'à ce qu'il réussisse. Je désactiverais le bouton pour l'empêcher d'être recliqué pendant que ça se passe. – Vultan

Répondre

1

Veuillez jeter un coup d'œil au test. À chaque événement, une nouvelle demande d'E/S sera lancée. Switch-Map est comme Flat-Map, mais il se désinscrit de l'abonnement récent, lorsqu'un nouvel événement en amont entre en jeu. Flat-Map ne ferait que commencer un nouveau, si vous travaillez avec la concurrence. Supposons donc que votre observable à chaud déclenche un événement et que flatMap commence à exécuter votre travail d'E/S sur un autre thread (subscribeOn). Si un autre événement arrive, alors que le dernier est toujours en cours d'exécution, il commencera à exécuter une autre tâche E/S. Switch-Map se désinscrit du dernier et en lance un pour l'événement en cours. Jetons un coup d'oeil à l'opérateur retry(). Retry se réabonnera simplement à l'observable fourni par 'ioWorkWrapped' jusqu'à ce que l'observable se termine avec onComplete. C'est probablement très dangereux, car imaginez qu'il échouera à chaque tentative. Cela tournerait pour toujours. Il est recommandé d'utiliser 'exponentielle-backoff' et de fournir une sauvegarde de secours après X essaye. Pour l'utilisation de « retryWhen » s'il vous plaît jeter un oeil à cet excellent livre : Reactive Programming with RxJava

public class LibraryTest { 
    private AtomicInteger idx; 

    @Before 
    public void setUp() throws Exception { 
     idx = new AtomicInteger(0); 
    } 

    @Test 
    public void name() throws Exception { 
     Observable<String> stringObservable = Observable.just(1) 
       .switchMap(integer -> ioWorkWrapped() 
         .doOnError(throwable -> System.out.println("Something went wrong.")) 
         .retry() 
       ); 

     stringObservable.test() 
       .await() 
       .assertResult("value"); 


    } 

    private Observable<String> ioWorkWrapped() { 
     return Observable.defer(() -> { 
      try { 
       Thread.sleep(500); // IO Work 
       if (idx.getAndIncrement() < 5) { // for testing... 
        return Observable.error(new IllegalStateException("Wurst")); 
       } 
       return Observable.just("value"); 
      } catch (Exception ex) { 
       return Observable.error(ex); 
      } 
     }); 
    } 
} 
+0

Merci! C'était exactement ce que je cherchais. J'ai été capable d'intégrer cette idée dans mon code; J'ai aussi beaucoup appris en travaillant dessus. – Vultan

0

Vous devez utiliser l'opérateur retryWhen dans le cas où votre opération E/S échouez, vous pouvez lancer une exception Runnable qui est vérifiée dans l'opérateur . Et dans le cas où ce type d'exception vous réessayez.

Dans cet exemple, nous réessayerons 4 fois. Mais cette condition pourrait être changée par le type de lanceable que nous recevons.

int count=0; 

@Test 
public void retryWhenConnectionError() { 
    Subscription subscription = Observable.just(null) 
      .map(connection -> { 
       System.out.println("Trying to open connection"); 
       connection.toString(); 
       return connection; 
      }) 
      .retryWhen(errors -> errors.doOnNext(o -> count++) 
          .flatMap(t -> count > 3 ? Observable.error(t) : 
            Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), 
        Schedulers.newThread()) 
      .subscribe(s -> System.out.println(s)); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); 
} 

Vous pouvez voir d'autres exemples ici https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

+0

Merci J'ai trouvé une autre réponse ci-dessus qui correspond mieux à mes besoins, mais j'ai passé du temps avec celui-ci aussi et cela m'a aidé à améliorer ma compréhension. – Vultan