7

J'ai eu quelques problèmes avec une chaîne récursive d'observables.RxJS: liste récursive d'observables et observateur unique

Je travaille avec RxJS, qui est actuellement en version 1.0.10621, et contient la plupart des fonctionnalités Rx de base, en conjonction avec Rx pour jQuery. Permettez-moi de présenter un exemple de scénario pour mon problème: J'interroge la réponse Twitter search API (réponse JSON) pour les tweets/mises à jour contenant un mot-clé donné. La réponse comprend également un "refresh_url" que l'on devrait utiliser pour générer une demande de suivi. La réponse à cette demande de suivi contiendra de nouveau un nouveau refresh_url, etc.

Rx.jQuery permet de faire de l'appel de l'API de recherche Twitter un événement observable, qui produit un onNext et se termine ensuite. Ce que j'ai essayé jusqu'à présent, c'est que le gestionnaire onNext se souvienne du refresh_url et l'utilise dans le gestionnaire onCompleted pour produire à la fois un observateur observable et correspondant pour la requête suivante. De cette façon, une paire observable + observateur suit l'autre indéfiniment.

Le problème avec cette approche est la suivante:

  1. L'observateur/observable suivi sont déjà en vie quand n'ont pas encore été résorbés leurs prédécesseurs.

  2. Je dois faire beaucoup de mauvaise comptabilité pour maintenir une référence valide à l'observateur actuellement vivant, dont il peut effectivement y en avoir deux. (Un dans l'un terminé et l'autre ailleurs dans son cycle de vie) Cette référence est, bien sûr, nécessaire pour désinscrire/disposer de l'observateur. Une alternative à la comptabilité serait de mettre en œuvre un effet secondaire par le biais d'un "booléen", comme je l'ai fait dans mon exemple.

code Exemple:

  running = true; 
      twitterUrl = "http://search.twitter.com/search.json"; 
      twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); 
      twitterMaxId = 0; //actually twitter ignores its since_id parameter 

      newTweetObserver = function() { 
       return Rx.Observer.create(
         function (tweet) { 
          if (tweet.id > twitterMaxId) { 
           twitterMaxId = tweet.id; 
           displayTweet(tweet); 
          } 
         } 
        ); 
      } 

      createTwitterObserver = function() { 
       twitterObserver = Rx.Observer.create(
         function (response) { 
          if (response.textStatus == "success") { 
           var data = response.data; 
           if (data.error == undefined) { 
            twitterQuery = data.refresh_url; 
            var tweetObservable; 
            tweetObservable = Rx.Observable.fromArray(data.results.reverse()); 
            tweetObservable.subscribe(newTweetObserver()); 
           } 
          } 
         }, 
         function(error) { alert(error); }, 
         function() { 
          //create and listen to new observer that includes a delay 
          if (running) { 
           twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); 
           twitterObservable.subscribe(createTwitterObserver()); 
          } 
         } 
        ); 
       return twitterObserver; 
      } 
      twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); 
      twitterObservable.subscribe(createTwitterObserver()); 

Ne vous laissez pas berner par la double couche de/observateurs de observables demandes de tweets. Mon exemple concerne principalement la première couche: demander des données de Twitter. Si, en résolvant ce problème, la deuxième couche (conversion des réponses en tweets) peut devenir une avec la première, ce serait fantastique; Mais je pense que c'est une toute autre chose. Pour l'instant.

Erik Meijer m'a indiqué l'opérateur Expand (voir l'exemple ci-dessous), et a suggéré Join patterns comme alternative.

var ys = Observable.Expand 
(new[]{0}.ToObservable() // initial sequence 
        , i => (i == 10 ? Observable.Empty<int>() // terminate 
     : new[]{i+1}.ToObservable() // recurse 
) 
); 

ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

Ceci devrait pouvoir être copié dans LINQPad. Il suppose des observables singleton et produit un observateur final. Donc, ma question est: Comment puis-je faire le plus grand tour d'expansion dans RxJS?

EDIT:
L'opérateur expand peut probablement être implémenté comme indiqué dans this thread. Mais il faudrait generators (et je n'ai que JS < 1.6).
Malheureusement, RxJS 2.0.20304-beta n'implémente pas la méthode Extend.

+2

Je suis venu à la conclusion que la solution à ce problème est en effet le [Expand opérateur] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9) qui n'est pas encore implémenté dans RxJS jusqu'à la version 2.0.20304-beta. – derabbink

+1

expand est exactement ce dont vous avez besoin. Je me rends compte que ce post est vieux, mais il est possible de simplement transplanter les opérateurs dont vous avez besoin d'une future version de Rx dans votre propre version de Rx. Cela peut nécessiter une manipulation, mais la base du code n'a pas beaucoup changé depuis la version précédente. –

+0

En outre, la mise à niveau de Rx (si possible) est une bonne décision. Il y a beaucoup de corrections de bugs et d'améliorations dans les dernières versions. –

Répondre

4

Donc, je vais essayer de résoudre votre problème légèrement différent de ce que vous avez fait et de prendre quelques libertés que vous pouvez résoudre plus facilement.

donc 1 chose que je ne peux pas dire est que vous essayez de ce qui suit les étapes

  • Obtenez la première liste de tweet, qui contient l'URL suivante
  • Tweet Une fois la liste reçue, OnNext l'observateur en cours et obtenir la prochaine série de tweets
    • faire indéfiniment

Ou y a-t-il une action de l'utilisateur (la commande get more/scrolling to bottom). De toute façon, c'est vraiment le même problème. Cependant, je peux lire votre problème incorrectement. Voici la réponse pour cela.

function getMyTweets(headUrl) { 
    return Rx.Observable.create(function(observer) { 

     innerRequest(headUrl); 
     function innerRequest(url) { 
      var next = ''; 

      // Some magic get ajax function 
      Rx.get(url).subscribe(function(res) { 
       observer.onNext(res); 
       next = res.refresh_url; 
      }, 
      function() { 
       // Some sweet handling code 
       // Perhaps get head? 
      }, 
      function() { 
       innerRequest(next); 
      }); 
     } 
    }); 
} 

Ceci peut ne pas être la réponse que vous demandiez. Si non, désolé! Editer: Après avoir parcouru votre code, il semblerait que vous vouliez prendre des résultats sous forme de tableau et que vous puissiez l'observer.

// From the results perform a select then a merge (if ordering does not matter). 
getMyTweets('url') 
    .selectMany(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }); 

// Ensures ordering 
getMyTweets('url') 
    .select(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }) 
    .concat(); 
Questions connexes