2017-10-19 23 views
2

I ont deux observables:RxJs comment fusionner deux observable se chevauchent dans une

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-| 
-13--14--15--16--17--18--19-----20---------21--------------22------23--24--> 

Le premier contient un certain nombre croissant, mais arrête après un certain temps (ce sont les résultats d'un curseur à partir de la base de données) Les seconds sont émettant en continu un nombre croissant. Contient un certain nombre de la première, mais n'arrête pas d'émettre. (Ce sont les nouvelles données insérées dans la base de données)

Je veux que ce deux observables à regarder observable continue comme ceci:

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24--> 

Cette observable contient chaque numéro qu'une seule fois, en gardant l'ordre d'émission. Comment peut-il être résolu en utilisant aussi peu de mémoire que possible?

+0

Vous vous demandez comment avez-vous obtenu comme deux Observables se chevauchent les cours d'eau? Quelle est leur signification dans votre problème? –

+1

J'utilise Rethinkdb. J'ai d'anciennes données dans la base de données qui sont lues par le curseur, et des données nouvellement insérées, qui sont émises par le changement de vitesse. Pendant que je lis les données du curseur, les données nouvellement insérées sont également lues par le curseur. Cela provoque le chevauchement –

Répondre

2

Je pense que la meilleure approche ici est de mettre en tampon b $ jusqu'à ce qu'un flux $ atteigne b $, puis d'émettre tous les éléments bufférisés de b $ et de passer à b $. Quelque chose comme ceci:

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'; 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'; 
 

 
const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x)); 
 

 
const a$ = fromMarble(a).share(); 
 
const b$ = fromMarble(b).share(); 
 

 
const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share(); 
 

 
const distinct$ = Rx.Observable.merge(
 
\t a$.takeUntil(switchingSignal$).map(x => x + '(from a)'), 
 
\t b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'), 
 
\t b$.skipUntil(switchingSignal$).map(x => x + '(from b$)') 
 
); 
 

 
distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>

2

Vous pouvez le faire en prenant tous les éléments du premier flux concaténé (.concat) avec le second courant, sauf (.skipWhile inclus) éléments avant le dernier (.last)

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15' 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24' 
 
const fromMarble = str => Rx.Observable.defer(() => { 
 
    console.log('side effect from subscribing to: ' + str); 
 
    return Rx.Observable.from(str.split('-').filter(v => v.length)); 
 
}); 
 

 
const a$ = fromMarble(a); 
 
const b$ = fromMarble(b); 
 

 
const distinct$ = Rx.Observable.concat(
 
    a$, 
 
    a$.last().switchMap(latest => 
 
    // .skipWhile + .skip(1) => skipWhile but inclusive 
 
    b$.skipWhile(v => v !== latest).skip(1) 
 
), 
 
); 
 

 
distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Aussi, si vous avez des effets secondaires lors de l'abonnement (par exemple, lorsque vous vous abonnez, le nouveau curseur sera créé), vous pouvez partager cet effet secondaire pour tous les abonnés en utilisant par exemple const a$ = fromMarble(a).shareReaplay().

Vous pouvez lire plus sur le partage des effets secondaires:

+0

Belle solution. Mais, dans le cas où le flux b $ émet après le flux a $, le flux $ distinct sera retardé inutilement. Je ne sais pas si c'est un problème, mais il pourrait être résolu sans délai. – ZahiC

+0

Nice catch @ ZahiC et un autre problème est quand 'b $' est vide - vous n'obtiendrez rien. J'ai donc modifié ma réponse pour gérer ces cas aussi –

+0

Cela résout définitivement le délai si b $ émet après un $, mais maintenant vous pouvez vous abonner à b $ trop tard (le flux concaténé est souscrit lorsque le premier flux est terminé). Si l'abonnement b $ prend du temps (par exemple, se connecte à un db), le passage à b $ sera plus lent. Vous pouvez résoudre ce problème en vous connectant à b $ plus tôt (hot Observable), mais vous risquez de perdre quelques objets. Je vais ajouter une autre solution juste pour référence – ZahiC