2014-08-27 5 views
1

J'ai enlevé le passe-partout pour arriver au pointrxjs zip n'est pas paresseux?

// a.js

// My observables from stream and event 
this.a = Rx.Node.fromStream(this.aStream()); 
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem'); 

// Zip 'em 
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) { 
    return {item: s2, a: s1.toString()}; 
}); 

// Streams the lowercase alphabet 
rb.prototype.aStream = function aStream() { 
var rs = Readable(); 

var c = 97; 
rs._read = function() { 
    rs.push(String.fromCharCode(c++)); 
    console.log('Hit!'); 
    if (c > 'z'.charCodeAt(0)) { 
     rs.push(null); 
    } 
}; 

return rs; 
}; 

// b.js (nécessite le module exporté ci-dessus)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...) in the module to trigger the itemSource observable 

Ce que je pensais voir:

{item: 'a', a: 'a'} imprimé dans le console

Qu'est-il arrivé:

Hit! a été imprimé 24 fois avant {item: 'a', a: 'a'}. Cela signifie que zip a pris toutes les valeurs de aStream, les a mises en mémoire tampon et a ensuite fait ce qu'il était censé faire.

Comment puis-je obtenir la même fonctionnalité zip offre mais paresseusement? Mon but est d'utiliser un flux infini/observable et de le compresser avec un flux fini (asynchrone).

Modifier

Voir/Modifier via runnable: RX Zip testEdit 2 code mis à jour en fonction de réponse -> pas de sortie maintenant.

+0

S'il vous plaît rajouter le passe-partout et de simplifier l'exemple. –

+0

http://www.yoda.arachsys.com/csharp/complete.html –

+0

@DaveSexton Voir: http://pastebin.com/mnc82KuV et http://pastebin.com/8HxURWYc pour une version copier/coller/exécuter - Merci! Je ne pense pas que l'exemple puisse être simplifié davantage. C'est 2 flux et la fonction zip. J'ai inclus le flux que j'ai utilisé pour référence, il aurait pu être omis, il est prudent d'ignorer cette partie, mais j'ai pensé que cela pourrait être utile. – rollingBalls

Répondre

1

zip est en effet paresseux. Il s'abonne simplement à a et b et fait son travail chaque fois que produit une nouvelle valeur.

Votre problème est que fromStream émet toutes ses valeurs de façon synchrone dès que zip s'y abonne. Cela arrive parce que votre coutume Readable dit constamment "Il y a plus de données disponibles!"

Rendez votre Readable asynchrone et vous obtiendrez le comportement souhaité.

Essayez quelque chose comme ça (non testé)

var rs = Readable(); 
var subscription = null; 
rs._read = function() { 
    if (!subscription) { 
     // produce the values once per second 
     subscription = Rx.Observable 
      .generateWithRelativeTime(
       97, // start value 
       function (c) { return c > 'z'.charCodeAt(0); }, // end condition 
       function (c) { return c + 1; }, // step function 
       function (c) { return String.fromCharCode(c); }, // result selector 
       function() { return 1000; }) // 1000ms between values 
      .subscribe(
       function (s) { 
        rs.push(s); 
        console.log("Hit!"); 
       }, 
       function (error) { rs.push(null); }, 
       function() { rs.push(null); }); 
    } 
}; 
+0

Merci pour votre réponse. J'ai mis à jour le violon runnable pour utiliser votre code, il sort simplement "Terminé" et rien d'autre ne se passe maintenant. Aussi, même si nous corrigeons cela en modifiant ce flux spécifique en interne, ma question est, est-il possible de prendre n'importe quel flux arbitraire et de ne pas déborder de 'zip' (pour pouvoir utiliser ce pattern dans n'importe quel projet)? – rollingBalls

+0

J'accepte cette réponse. Après avoir appris un peu plus sur la programmation réactive, je pense que je dois refactoriser mon approche. – rollingBalls