Une bonne approche pour combiner le dernier est de commencer par IObservable<IObservable<T>>
et de le transformer en IObservable<T[]>
. Cela devient une manière très dynamique de combiner autant de valeurs dont vous avez besoin.
est ici une méthode d'extension pour ce faire:
public static IObservable<T[]> CombineLatest<T>(this IObservable<IObservable<T>> sources)
{
return
sources.Publish(ss =>
Observable.Create<T[]>(o =>
{
var composite = new CompositeDisposable();
var list = new List<T>();
composite.Add(
ss.Subscribe(source =>
{
var index = list.Count;
list.Add(default(T));
composite.Add(source.Subscribe(x => list[index] = x));
}));
composite.Add(ss.Merge().Select(x => list.ToArray()).Subscribe(o));
return composite;
}));
}
Cela crée bien et suit tous les abonnements et utilise une fermeture pour définir la index
que chaque abonnement doit utiliser pour mettre à jour sa valeur dans le list
qui est utilisé pour la sortie.
Si vous l'utilisez comme ceci:
var sources = new Subject<IObservable<bool>>();
var output = sources.CombineLatest();
output.Subscribe(x => Console.WriteLine(x));
var s1 = new Subject<bool>();
sources.OnNext(s1);
s1.OnNext(true);
var s2 = new Subject<bool>();
sources.OnNext(s2);
s2.OnNext(false);
var s3 = new Subject<bool>();
sources.OnNext(s3);
s3.OnNext(true);
s2.OnNext(true);
s1.OnNext(false);
Ensuite, vous obtenez cette sortie:
Si vous modifiez la définition de output
à var output = sources.CombineLatest().Select(xs => xs.Aggregate((x, y) => x & y));
alors vous obtenez la sortie que je pense vous êtes après:
True
False
False
True
False
Par "all d'autres fois ", voulez-vous dire que le résultat Observable devrait émettre une valeur lorsque l'un des flux fournit une valeur? – supertopi
Le nombre de flux est-il constant? Ou dynamique? – Shlomo
@shlomo oui le nombre de flux est constant – Schneider