2016-02-02 4 views
2

J'ai une observable qui extrait les événements d'un serveur, filtre les événements pour le type d'application, puis souscrit et envoie l'événement à un ou plusieurs gestionnaires à gérer. Les gestionnaires se mettent alors à faire une mise à jour asynchrone de la base de données, et je trouve que l'observable déclenchera des événements si rapidement que les mises à jour se chevaucheront les unes les autres. Ce que j'aurais dû attendre.rx.js comment chaîner les observables

Donc je pense que j'ai besoin de mes gestionnaires pour chacun employer son propre observable pour agir comme une file d'attente qui va gérer un événement et attendre un accusé de réception. Donc, ma question est, comment puis-je créer une observable qui reçoit des messages en continu et distribue un message à la fois en attente d'un accusé de réception avant de libérer le message suivant.

Les observables doivent également être froids. Je pense, comme je ne peux pas perdre des messages.

Merci,

Raif

Répondre

2

Je pense que l'opérateur concatMap fait quelque chose à ce que vous recherchez. Vous pouvez passer en revue une ancienne réponse ici sur le SO pour illustrer un cas d'utilisation similaire pour concatMap: RxJS queueing dependent tasks

Il est proche mais pas exactement ce que vous voulez car il n'y a pas d'attente pour un signal ACK pour libérer la valeur suivante. A la place, concatMap utilise le signal d'achèvement de l'observable actuellement 'exécuté' pour s'abonner au suivant. Si votre observable contient quelque part l'exécution d'une mise à jour sur un db alors ces mises à jour seront exécutées dans l'ordre. Par exemple:

function handler (source$) { 
    // source$ is your source of events from which you generate the update calls 
    return source$.concatMap(function (event){ 
    return updateDB(event); 
    }) 
} 

function updateDB(event) { 
    return Rx.Observable.create(function(observer){ 
    // do the update in the db 
    // you probably have a success and error handler 
    // you plug the observer notification into those handlers 
    if (success) { 
     // if you need to pass down some value from the update 
     observer.onNext(someValue); 
     // In any case, signal completion to allow concatMap to move to next update 
     observer.onCompleted(); 
    } 
    if (error) {observer.onError(error);} 
    }) 
} 

Ceci est un code générique à se spécialiser dans la bibliothèque que vous utilisez. Vous pouvez utiliser directement l'opérateur fromNodeCallback ou fromCallback en fonction de l'API de votre fonction de mise à jour de base de données. Toutefois, n'oubliez pas qu'il peut y avoir une mise en mémoire tampon pour conserver l'observable suivant alors que l'actuel est en cours d'exécution, et que ce tampon ne peut être fini que si vous avez des différences significatives de vitesse entre producteur et consommateur, ou la limitation de la mémoire, vous voudrez peut-être gérer les choses différemment.

En outre, si vous utilisez RxJS v5, onError devient error, onComplete devient complete, onNext devient next (cf. new observer interface). Dernier commentaire, la nature lossy/lossless de votre stream est un concept différent de la nature hot/cold du stream. Vous pouvez jeter un oeil à illustrated subscription and data flows pour les deux types de flux.

+0

Très bien, merci. C'est proche quand même. Donc, si j'ai un observable, je peux régler le flux en appelant onNext (ou suivant selon le cas) pour obtenir l'élément suivant de la file d'attente. Cependant, la connexion entre les observables est toujours un problème, je pense. J'ai un module qui traite les messages et envoie ensuite chacun à 2 ou 3 autres modules. Le producteur observable ne doit pas attendre ou être limité par l'un de ces modules de gestionnaire, mais en interne, les modules de gestionnaire doivent agir de manière séquentielle. – Raif

+0

whow, c'est jusqu'à v5? On dirait que j'utilise la version 2.5.3. Pas que ce soit le problème pour le moment mais je suis sûr que ce sera le cas, je vais le mettre à niveau. – Raif

+0

v5 est en version bêta, donc je recommanderais la mise à niveau vers la version 4, sauf si vous savez ce que vous faites – user3743222