2017-10-15 4 views
1

J'ai un flux comme celui-ciComment joindre deux demi dans RxJs

---ab---ab---a---ba---bab---ab---ab---ab---> 

Et je veux que cela.

---ab---ab------ab----ab-ab-ab---ab---ab---> 

Le point est que j'ai données avec début et la fin (JSON) et parfois les données est coupée dans la moitié dans le flux, et je veux les rejoindre à nouveau. Comment puis je faire ça?

+0

Comment distinguez-vous '' ab' et bien a'? Je pensais à [bufferCount] (http://rxmarbles.com/#bufferCount) mais ce n'est pas la bonne chose, j'en suis sûr. –

Répondre

0

Voilà comment je résolu:

import Rx from 'rxjs/Rx'; 
import {last} from 'lodash'; 

const data$ = Rx.Observable.of('ab','ab','a','ba','bab','aba','b','ab'); 
const line$ = data$.flatMap(data => { 
    const lines = data.match(/[^b]+b?|b/g); // https://stackoverflow.com/a/36465144/598280 https://stackoverflow.com/a/25221523/598280 
    return Rx.Observable.from(lines); 
}); 

const isComplete$ = line$.scan((acc, value) => { 
    const isLineEndingLast = last(acc.value) === 'b'; 
    const id = isLineEndingLast ? acc.id + 1 : acc.id; 
    const complete = last(value) === 'b'; 
    return {value, id, complete}; 
}, {value: 'b', id: 0, complete: true}); 

const grouped$ = isComplete$ 
    .groupBy(data => data.id, data => data, group => group.first(data => data.complete)) 
    .flatMap(group => group.reduce((acc, data) => acc + data.value, '')); 

grouped$.subscribe(console.log); 
1

On dirait un emploi pour l'opérateur scan

// substitute appropriate real-world logic 
const isProperlyFormed = (x) => x === 'ab' 
const isIncomplete = (x) => x[0] === 'a' && x.length === 1 
const startsWithEnding = (x) => x[0] === 'b' 
const getCorrected = (buffer, x) => buffer.prev + x[0] 
const getTail = (buffer, x) => x.slice(1) 

const initialBuffer = { 
    emit: [], 
    prev: null 
} 

const result = source 
    .scan((buffer, x) => { 
    if (isProperlyFormed(x)) { 
     buffer = {emit: [x], prev:null} 
    } 
    if (isIncomplete(x)) { 
     buffer = {emit: [], prev:x} 
    } 
    if (startsWithEnding(x)) { 
     const corrected = getCorrected(buffer, x) 
     const tail = getTail(buffer, x) 
     if (isProperlyFormed(tail)) { 
     buffer = {emit: [corrected, tail], prev: null} 
     } else { 
     buffer = {emit: [corrected], prev: tail} 
     } 
    } 
    return buffer 
    }, initialBuffer) 
    .flatMap(x => x.emit) 

travail CodePen

Modifier

regardant le flux d'entrée de test, je pense qu'un cas est manquant, qui sera casser ce qui précède.

j'ai changé le test de

---ab---ab---a---ba---bab---ab---ab---ab---> 

à

---ab---ab---a---ba---bab---aba---b---ab---> 

et aussi amincie l'algorithme

const getNextBuffer = (x) => { 
    const items = x.split(/(ab)/g).filter(y => y) // get valid items plus tail 
    return { 
    emit: items.filter(x => x === 'ab'), // emit valid items 
    save: items.filter(x => x !== 'ab')[0] // save tail 
    } 
} 

const initialBuffer = { 
    emit: [], 
    save: null 
} 

const result = source 
    .scan((buffer, item) => { 
    const bufferAndItem = (buffer.save ? buffer.save : '') + item 
    return getNextBuffer(bufferAndItem) 
    }, initialBuffer) 
    .flatMap(x => x.emit) 

exemple de travail CodePen

1

Fi Premièrement, diviser le flux en réponses complètes et partielles. Ensuite, vérifiez si la réponse est complète. Les réponses complètes sont bonnes en tant que telles. Les réponses partielles doivent être synchronisées, donc nous divisons leur flux en première et seconde moitiés et juste zip ces flux ensemble.

L'aspect étrange Rx.Observable.of(g.partition(x => x[0] === 'a')) est dû au fait que l'opérateur partition renvoie une paire d'observables qui ne peuvent pas être chaînés.

const testStream = Rx.Observable.of('a1', 'a2', '_ab', 'b1', 'a3', 'b2', '_ab', 'a4', 'b3', '_ab', 'b4', 'a5', 'b5', '_ab') 
 

 
testStream 
 
    .groupBy(x => (x[0] === '_' && 'full') || 'partial') 
 
    .mergeMap(g => 
 
    Rx.Observable.if(
 
    () => g.key == 'full', 
 
     g, 
 
     Rx.Observable.of(g.partition(x => x[0] === 'a')) 
 
     .mergeMap(([as, bs]) => Rx.Observable.zip(as, bs)) 
 
    ) 
 
) 
 
    .do(x => console.log(x)) 
 
    .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>