Je crois d'obtenir exactement ce que vous désirez, vous devez créer votre propre opérateur. Rompant RxJS un peu, vous pouvez obtenir quelque chose comme (avertissement, n'ont pas testé) ...
export class BusyBuffer<T> {
private itemQueue = new Subject<T>();
private bufferTrigger = new Subject<{}>();
private busy = false;
constructor(consumerCallback: (items: T[]) => Promise<void>) {
this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
this.busy = true;
consumerCallback(items).then(() => {
this.busy = false;
this.bufferTrigger.next(null);
});
});
}
submitItem(item: T) {
this.itemQueue.next(item);
if(!busy) {
this.bufferTrigger.next(null);
}
}
}
qui peut ensuite être utilisé comme
let busyBuffer = new BusyBuffer<T>(items => {
return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));
Il est pas exactement purement réactif et si quelqu'un peut être capable de trouver quelque chose de mieux.
Et quel est le problème? Il y a les opérateurs 'buffer',' bufferToggle' ou 'bufferWhen'. – martin
Le problème est que je n'ai aucune idée, comment les utiliser. Essayer de comprendre, mais je ne sais pas encore comment. –
Utilisez 'concatMap', renvoyant la promesse de la fonction de projet. 'concatMap' fera le tampon pour vous, mais il n'y a pas de contre-pression dans RxJS, donc si vos données arrivent plus vite que vous ne pouvez les écrire, vous épuiserez la mémoire. – cartant