2015-11-21 4 views
2

Je souhaite traiter un tableau d'objets en les déplaçant via une série d'opérations asynchrones/réseau (requêtes HTTP distantes).Rx.js concurrence avec promesses

Dans certaines de ces opérations, je voudrais m'assurer que pas plus de X éléments sont traités en même temps.

Comment puis-je y parvenir?

code Exemple:

function someAsyncOp(item) {...} // returns a promise 

var source = Rx.Observable.from([{item1},{item2},...]) 
source 
    .flatMap((item) => { 

    // I WANT THE FOLLOWING OPERATION TO BE EXECUTING 
    // ON AT MAX 10 ITEMS AT A TIME, NEXT ITEM SHOULD 
    // BE SUBMITTED ONLY WHEN A SLOT GETS FREED AS A 
    // RESULT OF THE PROMISE SUCCEEDING OR FAILING 

    return Rx.Observable.fromPromise(someAsyncOp(item)) 

    }) 
    .subscribe(
    console.log, 
    console.error, 
    () => console.log('completed') 
) 
+0

prendre de vente (X) avant .flatMap [voir diagramm interactive] (http://rxmarbles.com/#take) –

+0

@ valery.sntx ce qu'arrêter va mon flux source après le premier X qui signifie que tous les articles suivants ne seront jamais traités .. Je suppose. Je veux que tous les articles soient traités. – mettjus

Répondre

5

Il y a un frère de flatMap appelé flatMapWithMaxConcurrent qui prend un argument de concurrence. Il est fonctionnellement similaire à map(fn).merge(n) qui a été suggéré par la réponse de Benjamin.

function someAsyncOp(item) {...} // returns a promise 

var source = Rx.Observable.from([{item1},{item2},...]) 
source 
    //Only allow a max of 10 items to be subscribed to at once 
    .flatMapWithMaxConcurrent(10, (item) => { 

    //Since a promise is eager you need to defer execution of the function 
    //that produces it until subscription. Defer will implicitly accept a promise 
    return Rx.Observable.defer(() => someAsyncOp(item)) 

    //If you want the whole thing to continue regardless of exceptions you should also 
    //catch errors from the individual processes 
         .catch(Rx.Observable.empty()) 
    }) 
    .subscribe(
    console.log, 
    console.error, 
    () => console.log('completed') 
) 
+0

Je pense que la partie '.defer (...' est ce que je cherchais aussi la partie de gestion des erreurs sera utile – mettjus

+0

une idée sur la façon de gérer la concurrence en cas de 'withLatestFrom' à la place de' flatMap'? – mettjus

0

Vous pouvez utiliser merge avec map au lieu de flatMap:

var concurrency = 10; 
source.map(someAsyncOp).merge(concurrency).subscribe(x => console.log(x)); 

Notez que depuis les promesses sont avides et observables sont fromPromise paresseux ne couperait pas (et Rx peut assimiler des promesses sans elle de toute façon). Je recommande de l'emballer dans un create.

var delay = function(ms){ return new Promise(function(r){ setTimeout(r, 2000, ms) }); } 
 

 
var log = function(msg){ document.body.innerHTML += msg + "<br />"; } 
 

 
Rx.Observable.range(1000, 10).map(delay).merge(2).subscribe(log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>