Je me bats depuis un moment avec ce que je crois être une question assez simple.Pull réactif de soutien observable
J'ai un Flowable
qui récupère un ensemble d'éléments du réseau et les émet.
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retrieveNext();
while (item != null) {
emitter.onNext(item);
if (item.isLast) {
break;
}
item = retrieveNext();
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
// ...
}
});
Je dois travailler à la demande. C'est-à-dire, je garde un Subscription
et appelle subscription.request(n)
si nécessaire.
L'article sur Backpressure (section "traction réactive") décrit la perspective de Observer
et juste mentionne brièvement que
Pour travailler [traction réactive], bien que, Observables A et B doivent répondre correctement la demande() < ...> un tel soutien est pas une exigence pour observables
Il ne va pas dans les détails sur la façon dont ce soutien peut en principe être accompli.
Y a-t-il un modèle commun pour implémenter un tel Observable
/Flowable
? Je n'ai pas trouvé un bon exemple ou une référence. Une mauvaise option que je peux penser est d'avoir un moniteur que je verrouillerais à l'intérieur de ma boucle while
quand emitter.requested() == 0
et déverrouillerai dans doOnRequest()
d'un autre fil. Mais cette approche semble juste lourde.
Alors comment ce "ralentissement" est-il généralement atteint?
Merci.
Pour clarifier, les opérateurs d'écriture pour 2.x sont décrits [dans le wiki] (https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0). – akarnokd