2016-11-10 4 views
0

Jemémoire tampon abonnés multiples à rx.js observables

var subject = new rx.Subject(); 
    var stream =  rx.Observable.fromEvent(blah, 'event') 
        .filter(blah) 
        .map(blah) 
        .subscribe(subject); 

       return subject; 

alors je passe soumis à plusieurs gestionnaires différents qui vont traiter l'événement de différentes façons et à des vitesses différentes.
donc ce que j'ai dans chaque gestionnaire est

subject.subscribe(async function (x) { 
     const func = self[x.eventName]; 
     if (func) { 
      await eventHandlerWrapper(self.handlerName, func, x); 
     } 
     }) 

J'ai deux questions, a) si les événements sont disponibles en super rapide est le gestionnaire va les traiter de manière synchrone et dans l'ordre donné la façon dont je il? et b) si les différents gestionnaires gèrent l'événement à des vitesses différentes, vont-ils tous attendre que le gestionnaire le plus lent soit passé avant que l'événement suivant ne soit fourni? ou vont-ils tous tampon et gérer à leur propre rythme?

merci, R

Répondre

1

Tout d'abord, la création du sujet peut être simplifié comme ceci:

const subject = rx.Observable.fromEvent(blah, 'event') 
       .filter(blah) 
       .map(blah) 
       .share(); 

La méthode de partage va créer un sujet du flux. Si vous renvoyez cette instance de sujet à chaque abonné, vous obtiendrez le même comportement et cela sera mieux.

a) if the events come in super fast is the handler going to process 
them synchronously and in the right order given the way I have it? 

Les événements vont être poussés dans toute la chaîne un par un et dans le bon ordre. En d'autres termes, un événement qui arrive à travers le 'fromEvent' sera poussé à travers toute la chaîne jusqu'au moment où vous y êtes abonné, avant de gérer la valeur suivante (sauf s'il y a un opérateur async entre :)). Ben Lesh a expliqué cela à angular connect 2015: https://www.youtube.com/watch?v=KOOT7BArVHQ (vous pouvez regarder toute la conversation mais c'est autour de min 17 où il compare les tableaux aux observables).

b) if the different handlers handle the event at different speeds are 
they all going to wait till the slowest handler is through before the  
next event is provided? or will they all sort of buffer and handle at 
they're own pace? 

Ils géreront les événements à leur propre rythme. Vérifiez l'exemple suivant:

let interval$ = Rx.Observable.interval(1000).share(); 

interval$.concatMap((val) => { 
    console.log('called'); 
    return Rx.Observable.of(val).delay(3000) 
    }) 
    .subscribe((val) => console.log("slow ", val)); 

interval$.subscribe((val) => console.log("fast ", val)); 

Ici, j'utilise un intervalle observable que je convertis en un sujet. Donc, il va envoyer un événement chaque seconde. J'ai un abonnement qui prend une valeur, manipulant cette valeur (qui prend 2secondes) et prenant ensuite le prochain (avec le concatMap). Et un autre abonnement qui les traite immédiatement. Si vous exécutez ce code (jsbin ici: https://jsbin.com/zekalab/edit?js,console), vous verrez qu'ils traitent tous deux les événements à leur propre rythme. Donc, ils n'attendent pas le gestionnaire le plus lent et il sera tamponné en interne. La situation que vous décrivez pourrait avoir une situation potentiellement dangereuse si le processeur le plus lent est plus lent que la fréquence à laquelle les événements sont lancés. Dans ce cas, votre tampon continuerait de croître et votre application finirait par tomber en panne. C'est un concept appelé pression de retour. Vous obtenez des événements plus rapidement que vous les traitez. Dans ce cas, vous devez utiliser des opérateurs comme 'buffer' ou 'window' sur les processeurs les plus lents pour éviter cette situation.

+0

génial. très bonne réponse. Je vous remercie. J'ai regardé tampon et il semblait s'agir d'événements de regroupement. Ce dont j'ai besoin est fondamentalement une file d'attente à la fin que je puisse me connecter au sujet et qui sera tampon. Tamponnerait-il pour ça? – Raif

+0

Yep devrait faire l'affaire – KwintenP