2017-02-28 2 views
0

Je me bats depuis un moment avec ce que je crois être une question assez simple.Pull réactif de soutien observable

J'ai un Flowable qui récupère un ensemble d'éléments du réseau et les émet.

Flowable 
    .create(new FlowableOnSubscribe<Item>() { 
     @Override 
     public void subscribe(FlowableEmitter<Item> emitter) throws Exception { 
      Item item = retrieveNext(); 

      while (item != null) { 
       emitter.onNext(item); 

       if (item.isLast) { 
        break; 
       } 

       item = retrieveNext(); 
      } 

      emitter.onComplete(); 
     } 
    }, BackpressureStrategy.BUFFER) 
    .doOnRequest(new LongConsumer() { 
     @Override 
     public void accept(long t) throws Exception { 
      // ... 
     } 
    }); 

Je dois travailler à la demande. C'est-à-dire, je garde un Subscription et appelle subscription.request(n) si nécessaire.

L'article sur Backpressure (section "traction réactive") décrit la perspective de Observer et juste mentionne brièvement que

Pour travailler [traction réactive], bien que, Observables A et B doivent répondre correctement la demande() < ...> un tel soutien est pas une exigence pour observables

Il ne va pas dans les détails sur la façon dont ce soutien peut en principe être accompli.

Y a-t-il un modèle commun pour implémenter un tel Observable/Flowable? Je n'ai pas trouvé un bon exemple ou une référence. Une mauvaise option que je peux penser est d'avoir un moniteur que je verrouillerais à l'intérieur de ma boucle while quand emitter.requested() == 0 et déverrouillerai dans doOnRequest() d'un autre fil. Mais cette approche semble juste lourde.

Alors comment ce "ralentissement" est-il généralement atteint?

Merci.

+0

Pour clarifier, les opérateurs d'écriture pour 2.x sont décrits [dans le wiki] (https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0). – akarnokd

Répondre

2

Utilisation Flowable.generate:

Flowable.generate(
() -> generateItemRetriever(), 
    (Retriever<Item> retriever, Emitter<Item> emitter) -> { 
    Item item = retriever.retrieveNext(); 
    if(item!=null && !item.isLast()) { 
     emitter.onNext(item); 
    } else { 
     emitter.onComplete(); 
    } 
    } 
) 

Thgenerated Flowable sera-contre-pression au courant et conscient désabonnement. Si votre Retriever a un état qui doit être éliminé, utilisez la surcharge generate à 3 arguments. Cependant, si vous faites des appels HTTP, je vous suggérerais de chercher quelque chose comme Retrofit, qui s'intègre très bien avec RxJava.