2016-11-22 1 views
4

je fait une méthode d'extension:Compresser avec une séquence infinie qui est vrai, toujours faux

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    return 
     source.TimeInterval() 
      .Select(
       (x, i) => 
        Observable.Return(x.Value) 
         .Delay(i == 0 
          ? TimeSpan.Zero 
          : TimeSpan.FromTicks(
            Math.Max(minDelay.Ticks - x.Interval.Ticks, 0)))) 
      .Concat(); 
} 

cela crée une nouvelle observable qui permet uniquement les éléments par une séparation minimale dans le temps.

Pour supprimer la latence initiale, il est nécessaire de traiter différemment le premier élément.

Comme on peut le voir, il y a un test pour voir si nous traitons le premier élément en testant i == 0. Le problème ici est que si nous traitons plus de int.MaxValue éléments, cela échouera.

Au lieu de cela, je pensais à la séquence suivante

var trueThenFalse = Observable.Return(true) 
        .Concat(Observable.Repeat(false)) 

et passer comme un éclair vers le haut à côté de ma source:

source.TimeInterval().Zip(trueThenFalse, ... 

mais en passant cette séquence infinie Zip, nous semblons entrer dans une boucle serrée où trueThenFalse émet tous les éléments en une fois (à l'infini). Échouer.

Je pourrais facilement coder autour de cela avec des effets secondaires (un bool dans la lunette extérieure, par exemple), mais cela représenterait une perte de pureté dont je ne serais pas content.

Des suggestions?

EDIT

Bien que pas tout à fait le même comportement, le code suivant présente quelques traits désagréables

var trueThenFalse = Observable.Return(true) 
    .Concat(Observable.Repeat(false)); 
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes 
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x)); 

et finit par mourir avec un oome. En effet, trueThenFalse semble désorganiser toutes ses valeurs, mais elles ne sont pas consommées par le Zip en temps opportun.

+0

Semble fonctionner pour moi avec trueThenFalse et Zip. – Evk

+0

Alors peut-être que cela a quelque chose à voir avec la façon dont les suites sont prévues? Étant donné que 'IObservable' ne fournit vraiment que la méthode' Subscribe', il m'est difficile de comprendre comment une séquence infinie (froide) ne tenterait pas de désimmobiliser tous ses éléments immédiatement si elle était abonnée dans ce modèle "push". – spender

+0

Mais le zip récupère l'élément suivant à partir de deux séquences. D'abord, il saisit "vrai" de la première séquence et la première valeur de la seconde (pas de retard). Ensuite, il saisit "false" et deuxième valeur de la deuxième séquence, mais ici il y a un retard maintenant. Je ne sais pas pourquoi il devrait spool tous les articles immédiatement. Peut-être que vous pouvez poster du code avec trueThenFalse et Zip qui a échoué? – Evk

Répondre

3

Il s'avère donc que Zip a another overload qui peut compresser ensemble une séquence IObservable avec une séquence IEnumerable. En combinant la sémantique de push de IObservable avec la sémantique de pull de IEnumerable, il est possible de faire fonctionner mon cas de test.

Ainsi, avec la méthode suivante:

private IEnumerable<T> Inf<T>(T item) 
{ 
    for (;;) 
    { 
     yield return item; 
    } 
} 

nous pouvons faire un IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 

puis il Zip avec une source observable:

var src = Observable.Interval(TimeSpan.FromSeconds(1)); 
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x)); 

... et tout fonctionne comme prévu.

J'ai maintenant la mise en œuvre suivante pour ma méthode de RateLimiter:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 
    return 
     source.TimeInterval() 
      .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value) 
       .Delay(firstTime 
        ? TimeSpan.Zero 
        : TimeSpan.FromTicks(
         Math.Max(minDelay.Ticks - item.Interval.Ticks, 0)))) 

      .Concat(); 
}