J'ai un PublishSubject qui appelle onNext()
sur un événement de l'interface utilisateur. L'abonné prend généralement 2 secondes pour terminer son travail. Je dois ignorer tous les appels à onNext()
sauf le dernier alors que l'abonné est occupé. J'ai essayé ce qui suit, mais je suis incapable de contrôler le flux. Les demandes semblent être mises en file d'attente et chaque demande est traitée (et donc la pression inverse ne semble pas fonctionner). Comment puis-je faire en sorte qu'il ignore toutes les demandes, sauf la dernière? (Je ne veux pas utiliser debounce
car le code doit réagir immédiatement et tout délai raisonnablement court ne fonctionnera pas).RxJava Sujet avec Backpressure - seulement laisser la dernière valeur émise une fois que aval a fini de consommer
En outre, je réalise en utilisant subscribeOn
avec un sujet n'a aucun effet, donc j'utilise observeOn
pour faire un travail asynchrone dans l'un des opérateurs. Est-ce la bonne approche?
Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....
La sortie que je vois est:
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main
lieu Je pense que le code pour faire une fois les éléments suivants (c.-à-charge, et pendant son chargement, de contre-pression pour retenir sur toutes les demandes et émettent le dernier, une fois que le premier observateur a fini - donc au total, il devrait idéalement que charger au maximum deux fois):
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
Cela fonctionne à merveille, merci beaucoup. Je dois admettre que je n'aurais jamais pu arriver à ça. Je vais maintenant devoir étudier le délai et rebatchRequests plus en détail. – strangetimes
@akarnokd J'étais curieux, j'ai trouvé que remplacer le premier 'observerOn' dans le code original par' observeOn (Schedulers.single(), false, 1) 'aboutit au même comportement - est-ce une solution alternative valide, ou est Y at-il un inconvénient à le faire de cette façon? Merci! – ahmedre
Si les articles pouvaient être distingués, alors oui; il stockerait l'article # 2 tandis que le dernier court devant l'article #N. Lorsque le traitement se termine à la fin, une nouvelle demande traitera l'élément 2 et le premier observOn stockera ensuite l'élément #N. Vous n'obtiendrez pas les instantanés les plus récents mais des instantanés. – akarnokd