2016-11-08 5 views
0

Je cherche à créer une classe LocationHandler qui renvoie un observable<Location> dont je peux envoyer un nouvel emplacement et les abonnés obtiennent le dernier ajouté et toutes les valeurs suivantes.RX Java 2, Observable qui accepte de nouvelles valeurs à ajouter

J'ai écrit cette classe, cela fonctionne mais je ne sais pas si c'est la bonne façon de le faire parce que j'ai ajouté un rappel et je le sens mal.

Merci pour toute aide.

public class LocationHandler { 
    private MessageHandler<Location> onNewItem; 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = getHookedObservable() 
       .mergeWith(locationInitBuilder.build()) 
       .replay(1).autoConnect(); 
    } 


    private Observable<Location> getHookedObservable() { 
     return Observable.create(new ObservableOnSubscribe<Location>() { 
      @Override 
      public void subscribe(ObservableEmitter<Location> e) throws Exception { 
       onNewItem = location -> e.onNext(location); 
      } 
     }); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ // <---------- add new values 
     if (onNewItem != null){ 
      onNewItem.handleMessage(address); 
     } else { 
      throw new IllegalStateException("Cannot add an item to a never subscribed stream"); 
     } 
    } 
} 

Après des conseils @Blackbelt J'ai modifié avec un ReplaySubject.

public class LocationHandler { 
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1); 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = locationInitBuilder.build() 
       .mergeWith(inputStream) 
       .replay(1).autoConnect(); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ 
     inputStream.onNext(address); 
    } 
} 

Répondre

2

vous pouvez utiliser un Subject au lieu de MessageHandler. Le sujet peut agir comme observable et abonné en même temps. Vous pouvez avoir une méthode dans votre LocationHandler qui renvoie Subject#asObservable auquel vous vous abonnerez. En interne, lorsque setLocation, vous devrez appeler Subject#onNext en indiquant l'emplacement. Il existe différents types de sujets disponibles. Veuillez vous référer à la documentation pour choisir celle qui convient le mieux à vos besoins. Par exemple.

public class LocationHandler { 
    BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create(); 

    public Observable<GeevLocation> getLocation() { 
     return mLocationSubject.asObservable(); 
    } 

    public void setLocation(GeevLocation address){ 
     mLocationSubject.onNext(address); 
    } 
} 

de l'appel extérieur getLocation et abonnez-vous à la Observable retourné. Quand un setLocation est appelé, vous obtiendrez l'objet onNext

1

comme Blackbelt vous l'avait déjà dit, vous utiliseriez un sujet. En particulier j'utiliserais un BehaviorSubject. Les sujets sont chauds par défaut, mais ils peuvent rejouer les événements par abonnement. BehaviorSubject vous donnera la dernière valeur émise ou la valeur init, si vous vous abonnez. Chaque abonné recevra les valeurs en entrée. Le flux ne finira jamais parce qu'il fait chaud. S'il vous plaît n'oubliez pas de gérer les erreurs, car le second onError sera avalé.

Exemple-Code

class Location { 

} 

class LocationInitializationBuilder { 
    static Location build() { 
     return new Location(); 
    } 
} 

class LocationHandler { 
    private Subject<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     Location initialValue = LocationInitializationBuilder.build(); 

     locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized(); 
    } 

    public Observable<Location> getLocation() { 
     return locationObservable.hide(); 
    } 

    public void setLocation(Location address) { // <---------- add new values 
     locationObservable.onNext(address); 
    } 
} 

public class LocationTest { 
    @Test 
    public void name() throws Exception { 
     LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder()); 

     TestObserver<Location> test = locationHandler.getLocation().test(); 

     locationHandler.setLocation(new Location()); 

     test.assertValueCount(2); 
    } 
} 
+0

En fait, je ne peux pas utiliser un comportement parce qu'il est un flux que je reçois via LocationInitializationBuilder.build. Un comportement a besoin d'une valeur définie que je ne peux pas fournir à la création. –

+0

Oui, je vois où est le problème. Votre solution semble légitime. –

+1

'BehaviourSubject' a une méthode statique' create' qui crée un 'BehaviourSubject' vide – Blackbelt