2017-06-13 1 views
2

J'ai travaillé à travers les exemples dans le livre Reactive Programming with RxJava, qui est destiné à la version 1 et non 2. Une introduction aux flux infinis a l'exemple suivant (et note il y a de meilleurs moyens de traiter le simultanéité):RxJava 2 équivalent à isUnsubscribed

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { 
    Runnabler =() -> { 
     BigInteger i = ZERO; 
     while (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(i); 
      i = i.add(ONE); 
     } 
    }; 
    new Thread(r).start(); 
}); 

... 

Subscription subscription = naturalNumbers.subscribe(x -> log(x)); 
/* after some time... */ 
subscription.unsubscribe(); 

Cependant, dans RxJava 2, l'expression lambda transmis à la méthode create() est de type ObservableEmitter ce qui ne dispose pas d'un procédé isUnsubscribed(). J'ai jeté un oeil dans What's Different in 2.0 et a également effectué une recherche du référentiel, mais ne peux pas trouver une telle méthode.

Comment cette même fonctionnalité serait-elle atteinte dans la version 2.0?

modifié pour inclure la solution comme indiqué ci-dessous (en utilisant N.B. Kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter -> 
    Thread({ 
     var int: BigInteger = BigInteger.ZERO 
     while (!emitter.isDisposed) { 
      emitter.onNext(int) 
      int = int.add(BigInteger.ONE) 
     } 
    }).start() 
} 

val first = naturalNumbers.subscribe { log("First: $it") } 
val second = naturalNumbers.subscribe { log("Second: $it") } 

Thread.sleep(5) 
first.dispose() 
Thread.sleep(5) 
second.dispose() 

Répondre

2

Après vous vous abonnez à Observable, Disposable est retourné. Vous pouvez l'enregistrer dans votre variable locale et vérifier disposable.isDisposed() pour voir s'il est encore souscrit ou non.

+1

Super, ça a fait l'affaire. Merci. – amb85