2017-06-16 2 views
0

Je cherche à combiner de nombreux flux IObservable<bool> de sorte que lorsque la dernière valeur pour tous est vrai, un vrai est émis, et sinon un faux est émis.Combiner efficacement de nombreux flux IObservable <bool> avec des opérateurs booléens

CombinedLast me permettrait de construire quelque chose comme ça pour deux flux facilement, mais a si c'était possible.

AllAll est un peu similaire à ce que je veux, sauf que je suppose que cela fonctionne sur une seule séquence et une fois faux ne peut pas revenir dynamiquement à la vérité.

De même, j'ai besoin que les valeurs soient "distinctes jusqu'à ce qu'elles soient modifiées", bien que l'opérateur DistintUntilChanged ne soit pas efficace pour cela? J'espere pour un algorithme O (1).

+0

Par "all d'autres fois ", voulez-vous dire que le résultat Observable devrait émettre une valeur lorsque l'un des flux fournit une valeur? – supertopi

+0

Le nombre de flux est-il constant? Ou dynamique? – Shlomo

+0

@shlomo oui le nombre de flux est constant – Schneider

Répondre

1

Une bonne approche pour combiner le dernier est de commencer par IObservable<IObservable<T>> et de le transformer en IObservable<T[]>. Cela devient une manière très dynamique de combiner autant de valeurs dont vous avez besoin.

est ici une méthode d'extension pour ce faire:

public static IObservable<T[]> CombineLatest<T>(this IObservable<IObservable<T>> sources) 
{ 
    return 
     sources.Publish(ss => 
      Observable.Create<T[]>(o => 
      { 
       var composite = new CompositeDisposable(); 
       var list = new List<T>(); 
       composite.Add(
        ss.Subscribe(source => 
        { 
         var index = list.Count; 
         list.Add(default(T)); 
         composite.Add(source.Subscribe(x => list[index] = x)); 
        })); 
       composite.Add(ss.Merge().Select(x => list.ToArray()).Subscribe(o)); 
       return composite; 
      })); 
} 

Cela crée bien et suit tous les abonnements et utilise une fermeture pour définir la index que chaque abonnement doit utiliser pour mettre à jour sa valeur dans le list qui est utilisé pour la sortie.

Si vous l'utilisez comme ceci:

var sources = new Subject<IObservable<bool>>(); 

var output = sources.CombineLatest(); 

output.Subscribe(x => Console.WriteLine(x)); 

var s1 = new Subject<bool>(); 
sources.OnNext(s1); 
s1.OnNext(true); 
var s2 = new Subject<bool>(); 
sources.OnNext(s2); 
s2.OnNext(false); 
var s3 = new Subject<bool>(); 
sources.OnNext(s3); 
s3.OnNext(true); 
s2.OnNext(true); 
s1.OnNext(false); 

Ensuite, vous obtenez cette sortie:

output

Si vous modifiez la définition de output à var output = sources.CombineLatest().Select(xs => xs.Aggregate((x, y) => x & y)); alors vous obtenez la sortie que je pense vous êtes après:

 
True 
False 
False 
True 
False 
+0

Merci. On dirait que ça marcherait.Mais j'ai l'intuition qu'il y a une approche fonctionnelle intelligente à cela qui ne vous obligerait pas à réévaluer constamment l'opérateur booléen à travers votre tableau sur chaque OnNext. @schlomo utilise le suivi d'état mutable (counting), ce qui devrait fonctionner mais j'ai l'impression qu'il doit y avoir une manière intelligente d'aborder cela, peut-être même celle qui fonctionnerait avec tous les différents opérateurs booléens. Mais peut-être pas ¯ \ _ (ツ) _/¯ – Schneider

+0

@Schneider - Qu'entendez-vous par "réévaluer constamment l'opérateur booléen à travers votre tableau sur chaque OnNext"? – Enigmativity

+1

Pour chaque OnNext dans l'un des n flots enfants, vous appelez .Aggregate, une opération O (n). – Shlomo

1

Je ne sais pas comment le faire d'une manière classique et fonctionnelle et encore atteindre O (1). Cet état utilisé mutable, et O (1) pour l'observation de chaque message, mais O (n) de la mémoire:

public IObservable<bool> CombineBooleans(this IObservable<bool>[] source) 
{ 
    return source.Select((o, i) => o.Select(b => (value: b, index: i))) 
     .Merge() 
     .Scan((array: new bool[source.Length], countFalse: source.Length), (state, item) => 
     { 
      var countFalse = state.countFalse; 

      if (state.array[item.index] == item.value) 
       return (state.array, countFalse);   //nothing to change, emit same state 
      else if (state.array[item.index])    //previous/current state is true, becoming false 
      { 
       countFalse++; 
       state.array[item.index] = false; 
      } 
      else           //previous/current state is false, becoming true 
      { 
       countFalse--; 
       state.array[item.index] = true; 
      } 
      return (state.array, countFalse); 
     }) 
     .Scan((countFalse: source.Length, oldCountFalse: source.Length), (state, item) => (countFalse: item.countFalse, oldCountFalse: state.countFalse)) 
     .SelectMany(state => 
      state.countFalse == 1 && state.oldCountFalse == 0 
       ? Observable.Return(false) 
       : state.countFalse == 0 && state.oldCountFalse == 1 
        ? Observable.Return(true) 
        : Observable.Empty<bool>() 
     ) 
     .Publish() 
     .RefCount(); 
} 

EDIT: Ajout .Publish().Refcount() pour éliminer des bugs multiples abonnés.

+0

Attention à l'utilisation de '.Publish () .Refcount() 'car il rend l'observable seulement produire des valeurs une fois. Ce n'est pas un bon modèle à utiliser tout le temps. Quels bugs d'abonnés voyez-vous? – Enigmativity

+0

Mon erreur; n'a pas vraiment d'importance ici. Je pensais que si vous avez un double abonné, alors vous obtenez une double mutation. Mais cela n'a aucun effet sur l'état de toute façon. – Shlomo