2016-12-19 3 views
0

J'ai une couche de données dans mon application qui est soutenue par un service de mise à niveau. (Jusqu'à présent, il n'y a que la persistance sur le réseau.Quand je suis en développement, j'ajouterai le premier stockage local hors ligne)Requérir et mettre à jour des abonnés avec RxJava2

Retrofit me renvoie un Observable<List<Item>> lorsque j'appelle le serveur. Cela fonctionne bien. Dans mon abonné, je reçois la liste quand je m'abonne et peux alors remplir mon interface avec le Items.

Le problème que je suis est: Si la liste est modifiée (par un mécanisme externe) comment puis-je rendre l'observable re-interroger le service de rénovation et d'émettre une nouvelle liste d'éléments. I serait être conscient que les données sont périmées, mais je ne suis pas sûr de savoir comment lancer une nouvelle requête.

Voici une version épurée de mon DataManager

class DataManager { 

    // Retrofit 
    RetrofitItemsService itemsService; 

    // The observalble provided by retrofit 
    Observable<List<Item>> itemsObservable; 

    //ctor 
    public DataManager(RetrofitItemsService itemsService) { 
     this.itemsService = itemsService; 
    } 

    /* Creates and stores an observable if one has not been created yet. 
    * Returns the observable so that it can be subscribed to by the function caller 
    */ 
    public Observable<List<Item>> getItems(){ 
     if(itemsObservable == null){ 
      itemsObservable = itemsService.getItems(); 
     } 

     return itemsObservable; 
    } 

    /* Adds a new Item to the list. 
    */ 
    public Completable addItem(Item item){ 
     Completable call = itemsService.addItem(item); 

     call.subscribe(()->{ 
      /* 
      < < <Here> > > 
      If someone has previously called getItems before this item was added, they now have stale data. 

      How can I call something like: 

      itemsObservable.refreshAllSubscribers() 
      */ 
     }); 

     return call; 
    } 
} 

Répondre

2

Le problème que vous combattez ici réside dans la différence entre chaud et froid observable. Il y a beaucoup d'excellents articles que vous pouvez google out qui décrivent les différences dans les détails, alors permettez-moi de décrire les bases.

froide observable crée un nouveau producteur pour chaque abonné. Cela signifie que lorsque deux abonnés distincts s'abonnent à le même froid observables, ils reçoivent chacun des instances différentes de ces émissions. Ils pourraient (!) Être égaux mais ce sont des objets différents. Appliqué à votre cas ici, chaque abonné obtient son propre producteur qui demande au serveur des données et les transmet dans le flux. Chaque abonné reçoit des données de son propre producteur.

Chaud observable partage un producteur avec tous ses observateurs. Si le producteur est par exemple en train de parcourir une collection d'objets, sauter avec un second abonné au milieu des émissions signifie qu'il n'obtiendra que les éléments émis par la suite (s'il n'est pas modifié par des opérateurs comme replay). Chaque objet reçu par un abonné est également la même instance pour tous les observateurs puisqu'il provient d'un seul producteur. Par conséquent, vous devez disposer d'un observable à chaud pour répartir vos données, de sorte que lorsque vous savez qu'elles ne sont plus valides, vous pouvez les émettre une seule fois à travers cette observable chaude et chaque observateur sera satisfait d'une mise à jour. .

Heureusement, transformer un froid observable en chaud n'est généralement pas une grosse affaire. Vous pouvez soit créer votre propre producteur qui imite ce comportement, utilisez l'un des opérateurs populaires comme share ou vous pouvez simplement transformer le flux afin qu'il se comporte comme un.

Je conseillerais à l'aide PublishSubject pour rafraîchir les données et la fusion avec l'observable à froid d'origine comme ceci:

class DataManager { 

    ..... 

    PublishSubject<Boolean> refreshSubject = PublishSubject.create(); 

    // The observable for retrieving always fresh data 
    Observable<List<Item>> itemsObservable; 

    //ctor 
    public DataManager(RetrofitItemsService itemsService) { 
     this.itemsService = itemsService; 
     itemsObservable = itemsService.getItems() 
           .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems())) 
    } 


    public Observable<List<Item>> getItems(){ 
     return itemsObservable; 
    } 

    /* Adds a new Item to the list. 
    */ 
    public Completable addItem(Item item){ 
     Completable call = itemsService.addItem(item); 

     call.subscribe(()->{ 
      refreshSubject.onNext(true); 
     }); 

     return call; 
    } 
} 
1

Je suppose que le itemsService.getItems() retourne un seul élément Observable ainsi les consommateurs doivent réabonner pour obtenir des données fraîches de toute façon et ils l'obtiendront comme Retrofit Observables sont différés/paresseux aussi bien.

Vous pourriez avoir une procédure distincte, « long » Observable, via l'aide de PublishSubject que vous pouvez déclencher lorsque les changements de données:

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized(); 

public Observable<Object> itemsChanged() { 
    return onItemsChanged; 
} 

public Completable addItem(Item item){ 
    Completable call = itemsService.addItem(item); 

    // prevent triggering the addItem multiple times 
    // Needs RxJava 2 Extensions library for now 
    // as there is no Completable.cache() or equivalent in 2.0.3 
    CompletableSubject cs = CompletableSubject.create(); 

    call.doOnComplete(() -> onItemsChanged.onNext("changed")) 
    .subscribe(cs); 

    return cs; 
}