2017-09-25 5 views
0

règles de limitation:Throttling dans RxJava

  1. s'il n'y a pas d'autre élément émis dans 100ms récentes, élément suivant serait livrer à l'abonné immédiatement
  2. Sinon, tampon éléments entrants, jusqu'à ce que 1000ms délai d'attente ou la taille du tampon atteindre 500, puis fournir la liste des éléments mis en mémoire tampon à l'abonné

Est-ce que RxJava peut atteindre cet objectif?

Répondre

0

Il existe un opérateur (théorique) appelé conflate() qui fait une partie de cela. J'ai vu plusieurs implémentations, plus récemment sur this discussion thread. L'adaptation de la réponse de valeriyo, nous développons l'opérateur suivant:

public static <T> Transformer<T, List<T>> adaptiveSample(long time, TimeUnit unit, Scheduler scheduler) { 
    return source -> source 
     .publish(shared -> concat(
      shared.take(1).toList(), 
      shared.buffer(500, time, unit, scheduler)) 
      .repeatWhen(a -> shared.debounce(time, unit, scheduler))); 
} 

Vous pouvez ensuite l'appliquer en utilisant les éléments suivants:

observable 
    .compose(adaptiveSample(1000, TimeUnit.MILLISECONDS, scheduler)) 
    .subscribe(listOfSamples -> doStuff(listOfSamples)); 

Vous devrez tester vous-même. Je ne suis pas clair sur quelques-unes de vos hypothèses.