J'ai travaillé à travers les exemples dans le livre Reactive Programming with RxJava, qui est destiné à la version 1 et non 2. Une introduction aux flux infinis a l'exemple suivant (et note il y a de meilleurs moyens de traiter le simultanéité):RxJava 2 équivalent à isUnsubscribed
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler =() -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
Cependant, dans RxJava 2, l'expression lambda transmis à la méthode create()
est de type ObservableEmitter
ce qui ne dispose pas d'un procédé isUnsubscribed()
. J'ai jeté un oeil dans What's Different in 2.0 et a également effectué une recherche du référentiel, mais ne peux pas trouver une telle méthode.
Comment cette même fonctionnalité serait-elle atteinte dans la version 2.0?
modifié pour inclure la solution comme indiqué ci-dessous (en utilisant N.B. Kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
Super, ça a fait l'affaire. Merci. – amb85