2017-09-06 2 views
1

J'ai une quantité variable de promesses et je veux les exécuter en séquence avec le résultat de la promesse précédente comme paramètre du suivant. Actuellement, je réussi à les concaténer afin de les exécuter dans l'ordre:concatMap quantité variable de promesses et résultats de tuyau en tant que paramètre pour la promesse suivante

const promises = [p1, p2, p3, p4, ....]; 
const source$ = promises.map(p => Rx.Observable.defer(p)); 
const combination$ = Rx.Observable.concat(...source$); 
combination.subscribe((x) => console.log(x)); 

Mais comment puis-je gérer maintenant les paramètres tuyau dans chaque promesse? J'ai lu que je pourrais utiliser concatMap. quelque chose comme ça:

Observable.from(p1).concatMap(res => p2(res)).concatMap(res => p3(res)) 

Je pense que je dois toujours retourner un nouveau concatMap dans Observable. Et aussi comment enchaîner concatMaps si un certain nombre de promesses devrait être variable?

Est-ce que quelqu'un peut me diriger dans la bonne direction? Je suis novice en matière de programmation réactive, mais pour les parties que je comprends déjà, c'est tout à fait génial!

Merci

Répondre

0

Ok les gars, j'ai enfin le résultat je comptais avoir

const promises = [ 
    (i=0) => Promise.resolve(i+2), 
    (i=2) => Promise.resolve(i+4), 
    (i=2) => Promise.resolve(i+8) 
]; 

const initial$ = Rx.Observable.from(promises[0]()); 

promises.reduce((o, p) => o.concatMap(p), initial$) 
.subscribe(console.log); 
0

Il semble que vous pouvez utiliser l'opérateur expand() qui projette récursive une valeur émise par une Observable intérieure.

Par exemple ici, j'ai un tableau de Observables et je veux ajouter la valeur cumulée avec la valeur actuelle:

const arr = [1, 2, 3, 4, 5].map(v => Observable.of(v)); 

Observable.of(arr) 
    .concatMap(arr => { 
     return Observable 
      .concat(arr[0]) // get the first item 
      .expand((prev, i) => { 
       i = i + 1; // We skipped the first Observable in `arr` 
       if (i === arr.length) { 
        return Observable.empty(); 
       } else { 
        return arr[i].map(result => prev + result) 
       } 
      }); 
    }) 
    .subscribe(console.log); 

Cette impression:

1 
3 
6 
10 
15 

Ou vous pouvez obtenir que le dernier valeur si vous utilisez takeLast(1):

return Observable 
    .concat(arr[0]) 
    .expand((prev, i) => { 
     ... 
    }) 
    .takeLast(1); 

Ou il est encore plus facile solutio n en utilisant mergeScan, mais je ne pense pas qu'il est facile de comprendre:

Observable.of(arr) 
    .concatAll() 
    .mergeScan((acc, observable) => { 
     return observable.map(result => result + acc); 
    }, 0) 
    .subscribe(console.log); 

Le résultat est le même que dans l'exemple précédent.

+0

merci pour votre réponse. J'ai adapté ta réponse à mes besoins, mais ça jette une erreur (arr [i] n'est pas une fonction) http://jsbin.com/vocomebini/edit?js,console – vacetahanna

+0

@vacetahanna Tu tournes chaque fonction dans 'arr 'dans un Observable avec' .map (v => Rx.Observable.defer (v)); 'c'est pourquoi il lance l'erreur. Quand vous appelez 'arr [i] (prev)' 'arr [i]' n'est pas un tableau, c'est un observable. – martin

+0

ah ok, mais quand j'essaie d'utiliser .of ou .from, alors ça ne marchera pas non plus. J'ai pensé que je pourrais tenir l'exécution des promesses jusqu'à un appel explicite. – vacetahanna