2017-01-06 1 views
0

Pourquoi l'opérateur RxJava 1.x flatMap() est implémenté avec la fusion?Pourquoi flatMap est implémenté avec fusion dans RxJava?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) { 
     if (getClass() == ScalarSynchronousObservable.class) { 
      return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); 
     } 
     return merge(map(func)); 
    } 

De flatMap() appellent je peux retourner une seule Observable conforme à <? extends Observable<? extends R>>. Than appel de carte (func) l'enveloppe dans un autre Observable, de sorte que nous avons quelque chose comme ceci Observable<? extends Observable<? extends R>>. Cela me fait penser que l'appel de merge() après map (func) est inutile.

L'opérateur merge() est dit pour effectuer ce qui suit:

aplatit une observable qui émet observables en une seule observable qui émet les articles émis par les observables, sans aucune transformation .

Maintenant à l'intérieur de la carte, nous pouvons seulement avoir un Observable qui émet un Observable. Pourquoi fusionner? Qu'est-ce que j'oublie ici?

Merci.

Répondre

3

En regardant les signatures peuvent vous aider:

Imaginez que, avec un Observable<String> vous voulez flatMap aux caractères individuels. La façon de le faire avec flatMap est:

Observable.just("foo", "hello") 
      .flatMap(s -> Observable.from(s.split(""))) 

Quel est le type de cet Observable? C'est un Observable<String>.

Maintenant, au lieu d'utiliser flatMap, utilisez avec la même fonction. Quel est le type?

Observable.just("foo", "hello") 
      .map(s -> Observable.from(s.split(""))) 

Vous verrez qu'il est en fait Observable<Observable<String>> ... Et si nous adhérons à cette observable et imprimer les éléments émis, nous obtenons ceci:

[email protected] 
[email protected] 

Pas très utile. Pire encore, ces Observable s n'ont pas été souscrites, afin qu'ils n'émettre aucune donnée :(

Nous voyons que l'objectif de flatMap est de laisser la fonction produire un Observable<T> intérieur par élément source, puis abonnez-vous à ces observables internes et aplatir leur émission ensemble dans la sortie Observable<T> et merge fait juste cela!

Pour vérifier que, enroulez le résultat de carte ci-dessus dans un Observable.merge(...):

Observable<Observable<String>> mapped = 
    Observable.just("foo", "hello") 
       .map(s -> Observable.from(s.split(""))); 

Observable.merge(mapped) 
      .subscribe(System.out::println); 

Ce sorties:

f 
o 
o 
h 
e 
l 
l 
o 
1

Maintenant à l'intérieur de la carte plane, nous ne pouvons avoir qu'un observable qui émet un observable.

Non - vous avez une observable qui émet une observable par source de l'élément observable. Donc, si votre source observable a plus d'éléments, vous allez avoir plusieurs observables émises. C'est pourquoi vous avez besoin de merge().