2012-05-11 2 views
1

Laissez-moi expliquer ce que je veux atteindre en premier.mise en mémoire tampon basée sur les données dans Rx

Disons que je la forme d'entrée de données suivant le flux d'événements

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 

Quand je souscris à la source de données, je voudrais obtenir le résultat suivant

"ok:michael" 
"ok" 
"begin:events 1:232 2:343 end:events" 
"error:dfljsdf" 
"error:fjkdjslf" 
"ok" 

Fondamentalement, je veux obtenir toutes les données que commencent par ok ou erreur et les données entre début et fin.

J'ai essayé jusqu'à présent ..

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 



      var dataStream = Observable.Generate(
           data.GetEnumerator(), 
           e => e.MoveNext(), 
           e => e, 
           e => e.Current.ToString(), 
           e => TimeSpan.FromSeconds(0.1));   

      var onelineStream = from d in dataStream 
           where d.StartsWith("ok") || d.StartsWith("error") 
           select d; 

      // ??? 
      // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events" 
      // but it is not working... 
      var multiLineStream = from list in dataStream.Buffer<string, string, string>(
           bufferOpenings: dataStream.Where(d => d.StartsWith("begin")), 
           bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end"))) 
           select String.Join(" ", list); 

      // merge two stream???? 
      // but I have no clue how to merge these twos :(

      mergeStream .Subscribe(d => 
      { 
       Console.WriteLine(d); 
       Console.WriteLine(); 
      }); 

Depuis que je suis très nouveau à la programmation réactive, je ne peux pas me faire penser de manière réactive. :(

Merci à l'avance.

Répondre

6

Tu étais si, si près de la bonne réponse!

Essentiellement, vous aviez les onelineStream & multiLineStream requêtes à peu près droit.

les fusionner ensemble est Très simple: faites simplement ceci:

onelineStream.Merge(multiLineStream) 

Cependant, où vos questions sont tombées t était dans le Observable.Generate que vous avez utilisé pour introduire le délai entre les valeurs. Cela crée un observable que, si vous avez plusieurs abonnés, sorte de "ventilateurs" les valeurs.

Compte tenu de vos données et votre définition pour dataStream voir comment ce code se comporte:

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine); 
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine); 

Vous obtenez ces valeurs:

!hello 
@Using 
!ok:michael 
@ok 
@1:232 
!begin:events 
@2:343 
!end:events 
!fdl 
@error:dfljsdf 
!error:fjkdjslf 
@ok 

Notez que certains ont eu manutentionnés par une souscription et les autres se sont traitées par l'autre. Cela signifie que même si vos requêtes onelineStream & multiLineStream étaient à peu près correctes, elles ne verront que certaines données et ne se comporteront donc pas comme prévu.

Vous pouvez également obtenir des conditions de course qui peuvent ignorer et dupliquer des valeurs. Donc, il vaut mieux éviter ce genre d'observable.

Une meilleure approche pour introduire un délai entre les valeurs est de le faire:

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100)); 

Maintenant, cela crée un « froid » observable, ce qui signifie que chaque nouvel abonné recevra un abonnement frais de l'observable si à partir de la première valeur.

Votre requête multiLineStream ne fonctionnera pas correctement sur un observable froid.

Pour rendre le flux de données observable "à chaud" (qui partage des valeurs parmi les abonnés), nous utilisons l'opérateur Publish.

Alors, multiLineStream ressemble maintenant à ceci:

var multiLineStream = 
    dataStream.Publish(ds => 
     from list in ds.Buffer(
      ds.Where(d => d.StartsWith("begin")), 
      b => ds.Where(d => d.StartsWith("end"))) 
     select String.Join(" ", list)); 

Vous pouvez alors obtenir vos résultats comme ceci:

onelineStream.Merge(multiLineStream).Subscribe(d => 
{ 
    Console.WriteLine(d); 
    Console.WriteLine(); 
}); 

C'est ce que je suis:

ok:michael 
ok 
begin:events 1:232 2:343 end:events 
error:dfljsdf 
error:fjkdjslf 
ok 

Permettez-moi savoir si cela fonctionne pour vous.

+0

Je souhaiterais pouvoir faire la promotion 10 fois. :) –

+1

pour les informations pour les autres visiteurs, si 'dataStream' est créé à partir d'événements (dans mon cas, méthode d'usine' Observable.FromEvent'), la méthode 'Publish' n'est pas nécessaire. –

+1

C'est vraiment une excellente réponse qui couvre complètement toutes les questions dans le code original dans un style très éducatif. Crédit complet. – yamen

Questions connexes