0

J'essaie de créer un outil pour analyser les cours boursiers. J'ai un flux de données de prix pour différents stocks, et je veux avoir un observable pour émettre des événements chaque fois qu'il reçoit un nouvel ensemble de prix distincts et complets.Comment recombiner correctement des observables groupés?

Mon plan: regrouper le flux en différents sous-flux pour différents stocks, et recombiner leurs dernières valeurs.

Disons que j'ai un flux d'événements comme celui-ci:

from rx import Observable 

stock_events = [ 
    {'stock': 'A', 'price': 15}, 
    {'stock': 'A', 'price': 16}, 
    {'stock': 'B', 'price': 24}, 
    {'stock': 'C', 'price': 37}, 
    {'stock': 'A', 'price': 18}, 
    {'stock': 'D', 'price': 42}, 
    {'stock': 'B', 'price': 27}, 
    {'stock': 'B', 'price': 27}, 
    {'stock': 'C', 'price': 31}, 
    {'stock': 'D', 'price': 44} 
] 

price_source = Observable.from_list(stock_events) 

Voici ma première (naïve) approche:

a_source = price_source.filter(lambda x: x['stock'] == 'A').distinct_until_changed() 
b_source = price_source.filter(lambda x: x['stock'] == 'B').distinct_until_changed() 
c_source = price_source.filter(lambda x: x['stock'] == 'C').distinct_until_changed() 
d_source = price_source.filter(lambda x: x['stock'] == 'D').distinct_until_changed() 

(Observable 
    .combine_latest(a_source, b_source, c_source, d_source, lambda *x: x) 
    .subscribe(print)) 

Ce bien me donne:

({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 24}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42}) 
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42}) 
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 42}) 
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 44}) 

Pourtant, je pense que cela devrait être mieux géré par group_by, au lieu de plusieurs filtrages, alors voici Est un re-écriture:

(price_source 
.group_by(lambda e: e['stock']) 
.map(lambda obs: obs.distinct_until_changed()) 
.combine_latest(lambda *x: x) 
.subscribe(print)) 

Mais cette fois-ci, je reçois:

(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000105EA20>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776AB00>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776A438>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000775E7F0>,) 

Qu'ai-je manqué ici? Comment puis-je "déballer" les observables imbriqués?

+0

Je ne suis pas sûr que GroupBy est la bonne façon de procéder dans votre situation. Puisque vous ne voulez pas sortir un élément jusqu'à ce que tous les stocks aient été émis, cela signifie que vous savez ce que sont les tickers boursiers. Je pense que Combine latest est parfait pour votre cas d'utilisation. Vous pourriez vouloir partager le sous-jacent sous-jacent avec Publier. – user630190

Répondre

0

Si vous vouliez utiliser groupby, ce serait comme ci-dessous en C#. Cela ne répond pas à votre exigence d'un ensemble "complet" cependant. Selon les commentaires, suspect CombineLatest serait mieux ici.

price_source.GroupBy(x => x.Stock) 
      .Select(gp => gp.DistinctUntilChanged(x => x.Price)) 
      .SelectMany(x => x) 
      .Subscribe(s => Console.WriteLine($"{s.Stock} : {s.Price}"));