2017-01-31 2 views
2

J'ai un PublishSubject qui appelle onNext() sur un événement de l'interface utilisateur. L'abonné prend généralement 2 secondes pour terminer son travail. Je dois ignorer tous les appels à onNext() sauf le dernier alors que l'abonné est occupé. J'ai essayé ce qui suit, mais je suis incapable de contrôler le flux. Les demandes semblent être mises en file d'attente et chaque demande est traitée (et donc la pression inverse ne semble pas fonctionner). Comment puis-je faire en sorte qu'il ignore toutes les demandes, sauf la dernière? (Je ne veux pas utiliser debounce car le code doit réagir immédiatement et tout délai raisonnablement court ne fonctionnera pas).RxJava Sujet avec Backpressure - seulement laisser la dernière valeur émise une fois que aval a fini de consommer

En outre, je réalise en utilisant subscribeOn avec un sujet n'a aucun effet, donc j'utilise observeOn pour faire un travail asynchrone dans l'un des opérateurs. Est-ce la bonne approche?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized(); 

loadingQueue 
    .toFlowable(BackpressureStrategy.LATEST) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .map(discarded -> { 
    // PRE-LOADING 
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName()); 
    return discarded; 
    }) 
    .observeOn(Schedulers.computation()) 
    .map(b -> { 
    Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName()); 
    Thread.sleep(2000); 
    return b; 
    }) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(b -> { 
     Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n"); 
    }); 


loadingQueue.onNext(true); 
loadingQueue.onNext(true); 
loadingQueue.onNext(true); 
.... 

La sortie que je vois est:

PRE-LOADING: main 
PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
FINISHED: main 

lieu Je pense que le code pour faire une fois les éléments suivants (c.-à-charge, et pendant son chargement, de contre-pression pour retenir sur toutes les demandes et émettent le dernier, une fois que le premier observateur a fini - donc au total, il devrait idéalement que charger au maximum deux fois):

PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 

PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 

Répondre

4

Vous ne pouvez pas le faire avec observeOn car elle tampon au moins 1 élément et donc toujours exécuter l'étape "PRE-LOADING" s'il y a déjà un "LOADING" qui se passe.

Cependant, vous pouvez le faire avec delay car il ne manipule pas les montants de demande sur la chaîne et les horaires chaque OnNext individuellement sur le planificateur sans faire la queue sur son propre:

public static void main(String[] args) throws Exception { 
    Subject<Boolean> loadingQueue = 
     PublishSubject.<Boolean>create().toSerialized(); 

    loadingQueue 
     .toFlowable(BackpressureStrategy.LATEST) 
     .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())  // <------- 
     .map(discarded -> { 
     // PRE-LOADING 
     System.out.println("PRE-LOADING: " 
      + Thread.currentThread().getName()); 
     return discarded; 
     }) 
     .delay(0, TimeUnit.MILLISECONDS, Schedulers.computation()) // <------- 
     .map(b -> { 
      System.out.println("LOADING: " 
      + Thread.currentThread().getName()); 
     Thread.sleep(2000); 
     return b; 
     }) 
     .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())  // <------- 
     .rebatchRequests(1)    // <----------------------------------- one-by-one 
     .subscribe(b -> { 
      System.out.println("FINISHED: " 
       + Thread.currentThread().getName() + "\n\n"); 
     }); 


    loadingQueue.onNext(true); 
    loadingQueue.onNext(true); 
    loadingQueue.onNext(true); 

    Thread.sleep(10000); 
} 
+0

Cela fonctionne à merveille, merci beaucoup. Je dois admettre que je n'aurais jamais pu arriver à ça. Je vais maintenant devoir étudier le délai et rebatchRequests plus en détail. – strangetimes

+0

@akarnokd J'étais curieux, j'ai trouvé que remplacer le premier 'observerOn' dans le code original par' observeOn (Schedulers.single(), false, 1) 'aboutit au même comportement - est-ce une solution alternative valide, ou est Y at-il un inconvénient à le faire de cette façon? Merci! – ahmedre

+0

Si les articles pouvaient être distingués, alors oui; il stockerait l'article # 2 tandis que le dernier court devant l'article #N. Lorsque le traitement se termine à la fin, une nouvelle demande traitera l'élément 2 et le premier observOn stockera ensuite l'élément #N. Vous n'obtiendrez pas les instantanés les plus récents mais des instantanés. – akarnokd