2017-09-06 1 views
10

Je voudrais empêcher qu'une émission se produise si et seulement si le même élément exact a été émis au cours des x dernières millisecondes. J'ai regardé les opérateurs d'accélérateur et de débogage mais je ne suis pas sûr qu'ils puissent m'aider ici. Y a-t-il un autre opérateur que je peux utiliser, ou puis-je les composer d'une manière ou d'une autre?Empêcher l'émission si la même émission a eu lieu il y a quelques millisecondes

+0

Pouvez-vous fournir plus d'informations? Par exemple, avec quels objets travaillez-vous, à quelle vitesse les éléments sont-ils émis? D'où émettez-vous ces articles? Est-ce important pour vous que ces articles soient émis un par un, ou peuvent-ils être émis par lots? –

Répondre

1

Vous pouvez horodater et associer chaque élément, puis vérifier vos conditions de limite de temps et d'égalité.

randomSource 
       .timestamp() 
       .pairwise() 
       .where(pair => pair[0].timestamp - pair[1].timestamp < limit && pair[0].value === pair[1].value); 

Puis appliquez un .select(pair => pair[0].value) pour récupérer votre élément d'origine.

Exemple de travail en C#, avec une source qui génère des éléments aléatoires entre 1 et 5 espacées sur des temps aléatoires:

static IObservable<T[]> Pairwise<T>(this IObservable<T> source) 
    { 
     source = source.Publish().RefCount(); 
     return source.Skip(1).Zip(source, (a, b) => new[] { a, b }); 
    } 

    static void Main(string[] args) 
    { 

     var randomSource = 
      Observable.Defer(() => Observable.Timer(TimeSpan.FromSeconds(new Random().NextDouble() * 2))).Repeat().Publish().RefCount().Select(_ => new Random().Next(1, 5)); 

     var limit = TimeSpan.FromSeconds(1); 

     var sameDebounce = 
     randomSource 
      .Timestamp() 
      .Pairwise() 
      .Where(pair => pair[0].Timestamp - pair[1].Timestamp < limit && pair[0].Value == pair[1].Value); 


     sameDebounce.Subscribe(c => Console.WriteLine("{0} {1}", c[0], c[1])); 
     Console.ReadLine(); 

    } 

Sortie:

[email protected]/7/2017 5:00:04 AM +00:00 [email protected]/7/2017 5:00:04 AM +00:00 
[email protected]/7/2017 5:00:09 AM +00:00 [email protected]/7/2017 5:00:08 AM +00:00 
[email protected]/7/2017 5:00:23 AM +00:00 [email protected]/7/2017 5:00:23 AM +00:00 
[email protected]/7/2017 5:00:33 AM +00:00 [email protected]/7/2017 5:00:32 AM +00:00 
1

Comme votre question n'explique pas complètement le scénario comme comparer la prochaine valeur émise à la dernière valeur émise ou toute dernière valeur émise ou autre chose. Je voudrais prendre un moyen général d'atteindre la solution. L'exemple est en RxJava.

Vous pouvez utiliser timestamp() avec filter() opérateur comme suit:

ArrayList<String> list = new ArrayList<>(); 
     final long[] timeOfSubscribe = {-1}; 
     final long timeDuration = 2 * 1000; // 2 seconds 
     Observable.fromIterable(list) 
       .timestamp() 
       .filter(item -> item.time() > (timeDuration + timeOfSubscribe[0]) && item.value().equals("your last value")) 
       .doOnSubscribe(__ -> timeOfSubscribe[0] = Calendar.getInstance().getTimeInMillis()) 
       .subscribe(); 

Je suppose que cet extrait peut vous aider avez juste besoin de changer votre comparaison de la valeur émise connexion qui est dans l'opérateur filter(). Si vous cherchez la dernière valeur émise, vous pouvez arrêter la dernière valeur émise en utilisant l'opérateur doOnNext() (pour avoir un cas simple) ou si vous cherchez toutes les dernières valeurs émises, vous devez stocker les valeurs émises dans la liste et vérifier.

J'espère que ça aide.

3

Vous pouvez le faire avec groupByUntil pour l'essentiel des éléments individuels

anti-rebond
o 
    .groupByUntil(x => x, x => x, x => Observable.timer(1000)) 
    .flatMap(grp => grp.first()) 
+0

Voilà comment je le ferais. –

+0

Cela semble bon pour RxJS - RxJava2 ne semble pas avoir l'opérateur groupByUntil, tirer :) –

+0

ouais ne savait pas pourquoi vous avez répertorié à la fois rxjs et rx-java. Vous pouvez essayer de porter la version C# vers Java: https://github.com/Reactive-Extensions/Rx.NET/blob/83710cfa9395355af6eb908f96261ea632a49009/Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs – Brandon