2015-10-18 4 views
2

J'ai besoin de zip groupables observables (pour former le produit cartésien des groupes liés, mais ce n'est pas pertinent pour la question). Lorsque vous exécutez le code ci-dessous, seuls les groupes observables enfants émettent réellement des valeurs à l'intérieur du #zip - Pourquoi est-ce?RxJS #zip groupes créés en utilisant #groupBy

https://jsbin.com/coqeqaxoci/edit?js,console

var parent = Rx.Observable.from([1,2,3]).publish(); 
var child = parent.map(x => x).publish(); 
var groupedParent = parent.groupBy(x => x); 
var groupedChild = child.groupBy(x => x); 

Rx.Observable.zip([groupedChild, groupedParent]) 
    .map(groups => { 
    groups[0].subscribe(x => console.log('zipped child ' + x)); // -> emitting 
    groups[1].subscribe(x => console.log('zipped parent ' + x)); // -> not emitting 
    }) 
    .subscribe(); 

groupedChild.subscribe(group => { 
    group.subscribe(value => console.log('child ' + value)); // -> emitting 
}); 

groupedParent.subscribe(group => { 
    group.subscribe(value => console.log('parent ' + value)); // -> emitting 
}); 

child.connect(); 
parent.connect(); 

Modifier: Comme expliqué dans la réponse par user3743222, les groupes émis par groupBy sont hot et l'abonnement au groupe parent (groupes [1]) se produit après que les premières valeurs ont déjà été émis. Cela se produit lorsque #zip attend à la fois que groupedChild et groupedParent émettent, ce dernier émettant plus tôt (ce qui signifie que ses groupes émettent des valeurs avant l'exécution de la fonction #zip).

Répondre

1

I modifié votre code comme suit:

var countChild = 0, countParent = 0; 
function emits (who) { 
    return function (x) {console.log(who + " emits : " + x);}; 
} 
function checkCount (who) { 
    return function () { 
    if (who === "parent") { 
     countParent++; 
    } 
    else { 
     countChild++; 
    } 
    console.log("Check : Parent groups = " + countParent + ", Child groups = " + countChild); 
    }; 
} 
function check (who, where) { 
    return function (x) { 
    console.log("Check : " + who + " : " + where + " :" + x); 
    }; 
} 
function completed (who) { 
    return function() { console.log(who + " completed!");}; 
} 
function zipped (who) { 
    return function (x) { console.log('zipped ' + who + ' ' + x); }; 
} 
function plus1 (x) { 
    return x + 1; 
} 
function err() { 
    console.log('error'); 
} 

var parent = Rx.Observable.from([1, 2, 3, 4, 5, 6]) 
    .do(emits("parent")) 
    .publish(); 
var child = parent 
    .map(function (x) {return x;}) 
    .do(emits("child")) 
// .publish(); 

var groupedParent = parent 
    .groupBy(function (x) { return x % 2;}, function (x) {return "P" + x;}) 
    .do(checkCount("parent")) 
    .share(); 

var groupedChild = child 
    .groupBy(function (x) { return x % 3;}, function (x) {return "C" + x;}) 
    .do(checkCount("child")) 
    .share(); 

Rx.Observable.zip([groupedChild, groupedParent]) 
// .do(function (x) { console.log("zip args : " + x);}) 
    .subscribe(function (groups) { 
       groups[0] 
        .do(function (x) { console.log("Child group observable emits : " + x);}) 
        .subscribe(zipped('child'), err, completed('Child Group Observable')); 
       groups[1] 
        .do(function (x) { console.log("Parent group observable emits : " + x);}) 
        .subscribe(zipped('parent'), err, completed('Parent Group Observable')); 
       }, err, completed('zip')); 

//child.connect(); 
parent.connect(); 

est ici la sortie:

"parent emits : 1" 
"child emits : 1" 
"Check : Parent groups = 0, Child groups = 1" 
"Check : Parent groups = 1, Child groups = 1" 
"Parent group observable emits : P1" 
"zipped parent P1" 
"parent emits : 2" 
"child emits : 2" 
"Check : Parent groups = 1, Child groups = 2" 
"Check : Parent groups = 2, Child groups = 2" 
"Parent group observable emits : P2" 
"zipped parent P2" 
"parent emits : 3" 
"child emits : 3" 
"Check : Parent groups = 2, Child groups = 3" 
"Parent group observable emits : P3" 
"zipped parent P3" 
"parent emits : 4" 
"child emits : 4" 
"Child group observable emits : C4" 
"zipped child C4" 
"Parent group observable emits : P4" 
"zipped parent P4" 
"parent emits : 5" 
"child emits : 5" 
"Child group observable emits : C5" 
"zipped child C5" 
"Parent group observable emits : P5" 
"zipped parent P5" 
"parent emits : 6" 
"child emits : 6" 
"Parent group observable emits : P6" 
"zipped parent P6" 
"Child Group Observable completed!" 
"Child Group Observable completed!" 
"Parent Group Observable completed!" 
"Parent Group Observable completed!" 
"zip completed!" 

Il y a deux points à souligner ici:

  1. Comportement du zip et le groupe par rapport au moment de l'abonnement

    • groupBy crée observables comme prévu, à la fois dans le parent et l'enfant

    Avec ces valeurs, vous pouvez vérifier dans le journal que Child crée trois groupes, Parent crée deux

    • Zip attendra à avoir une valeur dans chacune des sources que vous transmettez en tant que paramètres. Dans votre cas, cela signifie que vous serez abonné à l'enfant et au parent regroupés par observables quand ils auront tous les deux été émis. Dans le journal, vous verrez "Parent group observable emits : P1" seulement après les numéros correspondants sur "Check : Parent groups = 1, Child groups = 1". Vous pouvez ensuite vous abonner à des observables groupés par, et consigner tout ce qui sort de là. Le problème ici est que le parent groupé - par observable a une valeur à transmettre, MAIS le groupe 'enfant' par 'observable a été créé avant et a déjà passé sa valeur, donc quand vous vous abonnez après le fait, vous ne pouvez pas voir cette valeur - mais vous verrez les prochains. Donc, les valeurs de [1-3] généreront 3 nouvelles observables groupées par enfant, et vous n'en verrez aucune, car vous vous abonnez trop tard. Mais vous verrez les valeurs dans [4-6]. Vous pouvez consulter le journal: "zipped child C4" etc.

    • Vous verrez toutes les valeurs dans le parent regroupées par observables, parce que vous vous y abonnez immédiatement après leur création.

  2. connecter et publier

    • Je n'ai pas comprendre complètement claire de connexion et de publier, mais en tant que votre enfant a le parent comme source, vous ne avez pas besoin de connexion à retarder il. Si vous vous connectez au parent, l'enfant commencera automatiquement à émettre ses valeurs. D'où ma modification de votre code.

    • Cela devrait répondre à votre question immédiate, mais pas à votre objectif original de produit cartésien. Peut-être devriez-vous plutôt formuler cette question comme une question et voir quelles réponses les gens peuvent apporter.

+0

Merci! Pouvez-vous expliquer pourquoi l'enfant observable a déjà transmis sa valeur lors de l'inscription dans #zip? Je ne comprends pas comment cela se passe ... En ce qui concerne 2 .: publier n'est pas nécessaire dans l'exemple que j'ai donné, je l'ai simplement inclus car il sera nécessaire dans le script complet. –

+0

'groupBy' est lié à' groupByUntil': 'https: // github.com/Reactive-Extensions/RxJS/blob/maître/src/core/linq/observable/groupbyuntil.js' Chaque fois qu'il y a une nouvelle clé (ie groupe), une observable est créée. Mais cet observable est un 'Rx.Subject' qui est aussi une source chaude: il émet sa valeur immédiatement (cf L35). Cette observable est emballée et émise à l'observateur (L41, L49). Ensuite, la valeur de cette clé (groupe) est émise (immédiatement) à travers le sujet (L75). En bref, si vous ne vous abonnez pas à cet observable dès qu'il est transmis, vous perdrez toujours la première valeur. – user3743222

+0

Si vous n'êtes pas familier avec les observables chauds et froids, voici deux ressources traitant de la question: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold- vs-observables à chaud; http://jaredforsyth.com/2015/03/06/visualizing-reactive-streams-hot-and-cold/ – user3743222