2015-12-04 3 views
1

J'utilise Rx.js pour traiter le contenu d'un fichier, faire une requête http pour chaque ligne, puis agréger les résultats. Cependant, le fichier source contient des milliers de lignes et je surcharge l'API http distante à laquelle j'effectue la requête http. Je dois m'assurer que j'attends la requête http existante pour rappeler avant de commencer une autre. Je serais ouvert à la mise en lots et à l'exécution des requêtes n à la fois, mais pour ce script, l'exécution des requêtes en série est suffisante.Rx.js attendre la fin du rappel

Je donne les résultats suivants:

const fs = require('fs'); 
const rx = require('rx'); 
const rxNode = require('rx-node'); 

const doHttpRequest = rx.Observable.fromCallback((params, callback) => { 
    process.nextTick(() => { 
    callback('http response'); 
    }); 
}); 

rxNode.fromReadableStream(fs.createReadStream('./source-file.txt')) 
    .flatMap(t => t.toString().split('\r\n')) 
    .take(5) 
    .concatMap(t => { 
    console.log('Submitting request'); 

    return doHttpRequest(t); 
    }) 
    .subscribe(results => { 
    console.log(results); 
    }, err => { 
    console.error('Error', err); 
    },() => { 
    console.log('Completed'); 
    }); 

Cependant, cela ne fonctionne pas les requêtes http en série. Il produit:

 
Submitting request 
Submitting request 
Submitting request 
Submitting request 
Submitting request 
http response 
http response 
http response 
http response 
http response 
Completed 

Si je supprime l'appel à concatAll() alors les demandes sont en série, mais ma fonction s'abonner est de voir les avant que les demandes observables http sont revenus.

Comment puis-je effectuer les demandes HTTP en série afin que la sortie soit comme ci-dessous?

 
Submitting request 
http response 
Submitting request 
http response 
Submitting request 
http response 
Submitting request 
http response 
Submitting request 
http response 
Completed 
+2

En note, vous pouvez réduire la complexité en fusionnant les opérateurs. 'map' +' flatMap' => 'flatMap',' map' + 'concatAll' =>' concatMap'. – paulpdaniels

+0

Merci, j'ai mis à jour l'exemple pour refléter cette – toby

Répondre

1

Le problème ici est sans doute que lorsque vous utilisez rx.Observable.fromCallback, la fonction que vous avez passé en argument est exécuté immédiatement. L'observable renvoyé conservera la valeur transmise au rappel à un moment ultérieur. Pour avoir une meilleure vision de ce qui se passe, vous devez utiliser une simulation un peu plus complexe: numérotez vos requêtes, faites-leur retourner un résultat réel (différent pour chaque requête) que vous pouvez observer à travers l'abonnement.

Ce que je pose arrive ici:

  • take(5) questions 5 valeurs
  • map problèmes 5 messages du journal, exécute 5 fonctions et passe sur 5 observables
  • ces 5 observables sont traitées par concatAll et les valeurs émis par ces observables seront en ordre comme prévu. Ce que vous commandez ici est le résultat de l'appel aux fonctions, pas des appels aux fonctions elles-mêmes.

Pour atteindre votre objectif, vous devez appeler votre usine observable (rx.Observable.fromCallback) que lorsque concatAll est abonnée à elle et non au moment de la création. Pour cela, vous pouvez utiliser defer: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md

donc votre code se transformerait en:

rxNode.fromReadableStream(fs.createReadStream('./path-to-file')) 
    .map(t => t.toString().split('\r\n')) 
    .flatMap(t => t) 
    .take(5) 
    .map(t => { 
    console.log('Submitting request'); 

    return Observable.defer(function(){return doHttpRequest(t);}) 
    }) 
    .concatAll() 
    .subscribe(results => { 
    console.log(results); 
    }, err => { 
    console.error('Error', err); 
    },() => { 
    console.log('Completed'); 
    }); 

Vous pouvez voir un problème similaire avec une excellente explication ici: How to start second observable *only* after first is *completely* done in rxjs

Votre journal est susceptible de montrer encore 5 messages consécutifs «Demande de soumission». Mais votre demande devrait être exécutée l'une après l'autre a terminé comme vous le souhaitez.