J'ai eu quelques problèmes avec une chaîne récursive d'observables.RxJS: liste récursive d'observables et observateur unique
Je travaille avec RxJS, qui est actuellement en version 1.0.10621, et contient la plupart des fonctionnalités Rx de base, en conjonction avec Rx pour jQuery. Permettez-moi de présenter un exemple de scénario pour mon problème: J'interroge la réponse Twitter search API (réponse JSON) pour les tweets/mises à jour contenant un mot-clé donné. La réponse comprend également un "refresh_url" que l'on devrait utiliser pour générer une demande de suivi. La réponse à cette demande de suivi contiendra de nouveau un nouveau refresh_url, etc.
Rx.jQuery permet de faire de l'appel de l'API de recherche Twitter un événement observable, qui produit un onNext et se termine ensuite. Ce que j'ai essayé jusqu'à présent, c'est que le gestionnaire onNext se souvienne du refresh_url et l'utilise dans le gestionnaire onCompleted pour produire à la fois un observateur observable et correspondant pour la requête suivante. De cette façon, une paire observable + observateur suit l'autre indéfiniment.
Le problème avec cette approche est la suivante:
L'observateur/observable suivi sont déjà en vie quand n'ont pas encore été résorbés leurs prédécesseurs.
Je dois faire beaucoup de mauvaise comptabilité pour maintenir une référence valide à l'observateur actuellement vivant, dont il peut effectivement y en avoir deux. (Un dans l'un terminé et l'autre ailleurs dans son cycle de vie) Cette référence est, bien sûr, nécessaire pour désinscrire/disposer de l'observateur. Une alternative à la comptabilité serait de mettre en œuvre un effet secondaire par le biais d'un "booléen", comme je l'ai fait dans mon exemple.
code Exemple:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function() {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function() {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Ne vous laissez pas berner par la double couche de/observateurs de observables demandes de tweets. Mon exemple concerne principalement la première couche: demander des données de Twitter. Si, en résolvant ce problème, la deuxième couche (conversion des réponses en tweets) peut devenir une avec la première, ce serait fantastique; Mais je pense que c'est une toute autre chose. Pour l'instant.
Erik Meijer m'a indiqué l'opérateur Expand (voir l'exemple ci-dessous), et a suggéré Join patterns comme alternative.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => (i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Ceci devrait pouvoir être copié dans LINQPad. Il suppose des observables singleton et produit un observateur final. Donc, ma question est: Comment puis-je faire le plus grand tour d'expansion dans RxJS?
EDIT:
L'opérateur expand peut probablement être implémenté comme indiqué dans this thread. Mais il faudrait generators (et je n'ai que JS < 1.6).
Malheureusement, RxJS 2.0.20304-beta n'implémente pas la méthode Extend.
Je suis venu à la conclusion que la solution à ce problème est en effet le [Expand opérateur] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9) qui n'est pas encore implémenté dans RxJS jusqu'à la version 2.0.20304-beta. – derabbink
expand est exactement ce dont vous avez besoin. Je me rends compte que ce post est vieux, mais il est possible de simplement transplanter les opérateurs dont vous avez besoin d'une future version de Rx dans votre propre version de Rx. Cela peut nécessiter une manipulation, mais la base du code n'a pas beaucoup changé depuis la version précédente. –
En outre, la mise à niveau de Rx (si possible) est une bonne décision. Il y a beaucoup de corrections de bugs et d'améliorations dans les dernières versions. –