2017-10-19 29 views
2

J'ai un flux de données, avec des données rapidement entrantes. Je veux les insérer dans une base de données en gardant l'ordre. J'ai une base de données, qui renvoie une promesse, qui est résolue quand une insertion est réussie.RxJs tampon jusqu'à ce que la base de données d'insertion (promesse)

Je souhaite faire un flux Rx, qui tamponne les nouvelles données, jusqu'à ce que les données en mémoire tampon est insérée.

Comment puis-je faire cela?

+0

Et quel est le problème? Il y a les opérateurs 'buffer',' bufferToggle' ou 'bufferWhen'. – martin

+0

Le problème est que je n'ai aucune idée, comment les utiliser. Essayer de comprendre, mais je ne sais pas encore comment. –

+1

Utilisez 'concatMap', renvoyant la promesse de la fonction de projet. 'concatMap' fera le tampon pour vous, mais il n'y a pas de contre-pression dans RxJS, donc si vos données arrivent plus vite que vous ne pouvez les écrire, vous épuiserez la mémoire. – cartant

Répondre

2

Je crois d'obtenir exactement ce que vous désirez, vous devez créer votre propre opérateur. Rompant RxJS un peu, vous pouvez obtenir quelque chose comme (avertissement, n'ont pas testé) ...

export class BusyBuffer<T> { 
    private itemQueue = new Subject<T>(); 
    private bufferTrigger = new Subject<{}>(); 
    private busy = false; 

    constructor(consumerCallback: (items: T[]) => Promise<void>) { 
    this.itemQueue.buffer(this.bufferTrigger).subscribe(items => { 
     this.busy = true; 
     consumerCallback(items).then(() => { 
     this.busy = false; 
     this.bufferTrigger.next(null); 
     }); 
    }); 
    } 

    submitItem(item: T) { 
    this.itemQueue.next(item); 
    if(!busy) { 
     this.bufferTrigger.next(null); 
    } 
    } 

} 

qui peut ensuite être utilisé comme

let busyBuffer = new BusyBuffer<T>(items => { 
    return database.insertRecords(items); 
}); 
items.subscribe(item => busyBuffer.submitItem(item)); 

Il est pas exactement purement réactif et si quelqu'un peut être capable de trouver quelque chose de mieux.

+2

Merci! J'ai créé le même, mais j'espère que quelqu'un peut venir avec une solution pure et réactive :) –

+0

Pas de sueur, bonne chance. Je pensais prendre une sorte de signal occupé/libre de la base de données et le renvoyer dans la méthode tampon, mais vous devez également ajouter dans la logique que le tampon devrait émettre immédiatement si la base de données ne fait rien. – Pace

+0

N'accepte pas ma réponse si tu veux quelque chose de mieux. Cela ne me dérange pas. – Pace