2009-03-20 3 views
126

Je pourrais utiliser du pseudo-code, ou mieux, Python. J'essaye d'implémenter une file d'attente de limitation de débit pour un bot Python IRC, et cela fonctionne partiellement, mais si quelqu'un déclenche moins de messages que la limite (par exemple, la limite de débit est de 5 messages par 8 secondes, et le déclencheur suivant est sur les 8 secondes (par exemple, 16 secondes plus tard), le bot envoie le message, mais la file d'attente devient pleine et le bot attend 8 secondes, même si ce n'est pas nécessaire depuis la période de 8 secondes.Qu'est-ce qu'un bon algorithme de limitation de débit?

Répondre

192

Voici le simplest algorithm, si vous voulez juste déposer des messages quand ils arrivent trop vite (au lieu de les faire la queue, ce qui est logique car la file d'attente peut obtenir arbitrairement grand):

rate = 5.0; // unit: messages 
per = 8.0; // unit: seconds 
allowance = rate; // unit: messages 
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds 

when (message_received): 
    current = now(); 
    time_passed = current - last_check; 
    last_check = current; 
    allowance += time_passed * (rate/per); 
    if (allowance > rate): 
    allowance = rate; // throttle 
    if (allowance < 1.0): 
    discard_message(); 
    else: 
    forward_message(); 
    allowance -= 1.0; 

Il y a pas datastructures, minuteries, etc. dans cette solution et cela fonctionne proprement :) Pour voir cela, « indemnité » à la vitesse croît 5/8 unités par seconde au plus, soit au maximum cinq unités par huit secondes. Chaque message transféré déduit une unité, donc vous ne pouvez pas envoyer plus de cinq messages toutes les huit secondes.

Notez que rate doit être un nombre entier, c'est-à-dire sans partie décimale non nulle, sinon l'algorithme ne fonctionnera pas correctement (le taux réel ne sera pas rate/per). Par exemple. rate=0.5; per=1.0; ne fonctionne pas car allowance ne passera jamais à 1.0. Mais rate=1.0; per=2.0; fonctionne bien.

+0

C'est un beau travail, c'est-à-dire. Est-ce le vôtre, ou un algorithme standard? – skaffman

+2

Il convient également de souligner que la dimension et l'échelle de 'time_passed' doivent être les mêmes que 'per', par ex. secondes. – skaffman

+2

Salut Skaffman, merci pour les compliments --- Je l'ai sorti de ma manche mais avec une probabilité de 99,9% quelqu'un a déjà trouvé une solution similaire :) –

23

Un Token Bucket est assez simple à implémenter.

Commencez avec un seau avec 5 jetons.

Toutes les 5/8 secondes: Si le seau a moins de 5 jetons, en ajouter un.

Chaque fois que vous souhaitez envoyer un message: Si le compartiment a un jeton ≥1, retirez un jeton et envoyez le message. Sinon, attendez/déposez le message/peu importe.

(évidemment, dans le code réel, vous utiliseriez un compteur entier au lieu de jetons réels et vous pouvez optimiser le tout 5/8s étape par horodatages stockage)


En relisant la question, si la limite de taux est entièrement réinitialisée toutes les 8 secondes, alors voici une modification:

Commencez avec un horodatage, last_send, il ya un moment (par exemple, à l'époque). Commencez également avec le même compartiment à 5 jetons.

Frappez la règle toutes les 5/8 secondes.

Chaque fois que vous envoyez un message: Vérifiez d'abord si last_send ≥ 8 secondes. Si oui, remplissez le seau (réglez-le sur 5 jetons). Deuxièmement, s'il y a des jetons dans le compartiment, envoyez le message (sinon, drop/wait/etc.). Troisièmement, définissez last_send à maintenant.

Cela devrait fonctionner pour ce scénario. J'ai effectivement écrit un bot IRC en utilisant une stratégie comme celle-ci (la première approche). C'est en Perl, pas en Python, mais voici un code à illustrer:

La première partie traite ici de l'ajout de jetons au godet. Vous pouvez voir l'optimisation d'ajouter des jetons en fonction du temps (2 à la dernière ligne), puis la dernière ligne serre-joint le contenu du seau au maximum (message_burst)

my $start_time = time; 
    ... 
    # Bucket handling 
    my $bucket = $conn->{fujiko_limit_bucket}; 
    my $lasttx = $conn->{fujiko_limit_lasttx}; 
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL; 
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST; 

$ conn est une structure de données qui est passé autour. C'est à l'intérieur d'une méthode qui s'exécute de façon routinière (elle calcule quand la prochaine fois elle aura quelque chose à faire, et dort aussi longtemps ou jusqu'à ce qu'elle reçoive le trafic réseau). La partie suivante de la méthode gère l'envoi. C'est plutôt compliqué, parce que les messages ont des priorités qui leur sont associées.

# Queue handling. Start with the ultimate queue. 
    my $queues = $conn->{fujiko_queues}; 
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) { 
      # Ultimate is special. We run ultimate no matter what. Even if 
      # it sends the bucket negative. 
      --$bucket; 
      $entry->{code}(@{$entry->{args}}); 
    } 
    $queues->[PRIORITY_ULTIMATE] = []; 

C'est la première file d'attente, qui est exécutée, quoi qu'il arrive. Même si notre connexion est tuée pour des inondations. Utilisé pour des choses extrêmement importantes, comme répondre au PING du serveur.Ensuite, le reste des files d'attente:

# Continue to the other queues, in order of priority. 
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) { 
      my $queue = $queues->[$pri]; 
      while (scalar(@$queue)) { 
        if ($bucket < 1) { 
          # continue later. 
          $need_more_time = 1; 
          last QRUN; 
        } else { 
          --$bucket; 
          my $entry = shift @$queue; 
          $entry->{code}(@{$entry->{args}}); 
        } 
      } 
    } 

Enfin, l'état du godet est enregistré en arrière à la structure de données conn $ (en fait un peu plus tard dans la méthode, il calcule d'abord combien de temps il faudra plus de travail) Comme vous pouvez le voir, le code de gestion de seau réel est très petit - environ quatre lignes. Le reste du code est la gestion de file d'attente prioritaire. Le bot a des files d'attente prioritaires de sorte que, par exemple, quelqu'un qui discute avec lui ne peut pas l'empêcher de remplir ses importantes fonctions de coup de pied/interdiction.

+0

Suis-je manque quelque chose ... il semble que cela vous limiter à 1 message toutes les 8 secondes après que vous obtenez dans le premier 5 @ – chills42

+0

chills42: Oui, je l'ai lu la mauvaise question ... voir la deuxième la moitié de la réponse. – derobert

+0

@chills: Si last_send est <8 secondes, vous n'ajoutez aucun jeton au bucket. Si votre compartiment contient des jetons, vous pouvez envoyer le message. sinon vous ne pouvez pas (vous avez déjà envoyé 5 messages dans les 8 dernières secondes) – derobert

1

Une solution consiste à fixer un horodatage à chaque élément de file d'attente et de jeter l'élément après 8 secondes se sont écoulées. Vous pouvez effectuer cette vérification chaque fois que la file d'attente est ajoutée.

Cela ne fonctionne que si vous limitez la taille de la file d'attente à 5 et défaussez tout ajout alors que la file d'attente est pleine.

+0

Ceci ne fournit aucune limitation de débit. Au lieu de cela, il empêche la croissance excessive de la file d'attente si vous avez déjà une limitation de débit. – derobert

+0

Merci. J'ai édité la réponse en conséquence. :) – jheriko

2

Gardez le temps que les cinq dernières lignes ont été envoyées. Tenir les messages en file d'attente jusqu'à ce que le temps le cinquième le plus récent message (si elle existe) est au moins 8 secondes dans le passé (avec last_five comme un tableau de fois):

now = time.time() 
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0: 
    last_five.insert(0, now) 
    send_message(msg) 
if len(last_five) > 5: 
    last_five.pop() 
+0

Vous faites beaucoup plus de travail que le seau à jetons ... – derobert

+0

Pas depuis que vous l'avez révisé, je ne suis pas. – Pesto

+0

Vous stockez cinq horodatages et vous les déplacez à plusieurs reprises dans la mémoire (ou en effectuant des opérations de liste chaînée). Je stocke un compteur entier et un horodateur. Et seulement faire de l'arithmétique et assigner. – derobert

41

Utilisez ce décorateur @RateLimited (ratepersec) avant votre fonction de mise en file d'attente.

Fondamentalement, ce vérifie si 1/sec de taux se sont écoulés depuis la dernière fois et sinon, attend le reste du temps, sinon il n'attend pas. Cela vous limite effectivement à taux/sec. Le décorateur peut être appliqué à n'importe quelle fonction que vous voulez limitée.

Dans votre cas, si vous voulez un maximum de 5 messages par 8 secondes, utilisez @RateLimited (0,625) avant votre fonction sendToQueue.

import time 

def RateLimited(maxPerSecond): 
    minInterval = 1.0/float(maxPerSecond) 
    def decorate(func): 
     lastTimeCalled = [0.0] 
     def rateLimitedFunction(*args,**kargs): 
      elapsed = time.clock() - lastTimeCalled[0] 
      leftToWait = minInterval - elapsed 
      if leftToWait>0: 
       time.sleep(leftToWait) 
      ret = func(*args,**kargs) 
      lastTimeCalled[0] = time.clock() 
      return ret 
     return rateLimitedFunction 
    return decorate 

@RateLimited(2) # 2 per second at most 
def PrintNumber(num): 
    print num 

if __name__ == "__main__": 
    print "This should print 1,2,3... at about 2 per second." 
    for i in range(1,100): 
     PrintNumber(i) 
+0

J'aime l'idée d'utiliser un décorateur pour cela.) Pourquoi faire lastTimeCalled une liste? Aussi, je doute que cela fonctionnera lorsque plusieurs threads appellent la même fonction RateLimited ... – Stephan202

+6

C'est une liste parce que les types simples Comme float sont constantes lorsqu'elles sont capturées par une fermeture.En faisant une liste, la liste est constante, mais son contenu ne l'est pas.Oui, ce n'est pas sûr pour les threads, mais cela peut être facilement corrigé avec des verrous –

+0

'time.clock() 'n'a pas assez de résolution sur mon système, donc j'ai adapté le code et changé pour utiliser' time.time() ' – mtrbean

0

Que diriez-vous ceci:

long check_time = System.currentTimeMillis(); 
int msgs_sent_count = 0; 

private boolean isRateLimited(int msgs_per_sec) { 
    if (System.currentTimeMillis() - check_time > 1000) { 
     check_time = System.currentTimeMillis(); 
     msgs_sent_count = 0; 
    } 

    if (msgs_sent_count > (msgs_per_sec - 1)) { 
     return true; 
    } else { 
     msgs_sent_count++; 
    } 

    return false; 
} 
9

pour bloquer le traitement jusqu'à ce que le message peut être envoyé, ainsi faire la queue d'autres messages, belle solution de antti peut également être modifié comme suit:

rate = 5.0; // unit: messages 
per = 8.0; // unit: seconds 
allowance = rate; // unit: messages 
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds 

when (message_received): 
    current = now(); 
    time_passed = current - last_check; 
    last_check = current; 
    allowance += time_passed * (rate/per); 
    if (allowance > rate): 
    allowance = rate; // throttle 
    if (allowance < 1.0): 
    time.sleep((1-allowance) * (per/rate)) 
    forward_message(); 
    allowance = 0.0; 
    else: 
    forward_message(); 
    allowance -= 1.0; 

juste attend jusqu'à ce qu'il y ait suffisamment d'espace pour envoyer le message. ne pas commencer avec deux fois le taux, allocation peut également initialisé avec 0.

+1

Lorsque vous dormez '(1-allow) * (per/rate)', vous devez ajouter la mme quantit de 'last_check'. – Alp

1

Si quelqu'un est toujours intéressé, j'utilise cette classe callable simple en conjonction avec un stockage de valeur de clé LRU temporisé pour limiter le débit de requêtes par IP. Utilise une deque, mais peut être réécrit pour être utilisé avec une liste à la place.

from collections import deque 
import time 


class RateLimiter: 
    def __init__(self, maxRate=5, timeUnit=1): 
     self.timeUnit = timeUnit 
     self.deque = deque(maxlen=maxRate) 

    def __call__(self): 
     if self.deque.maxlen == len(self.deque): 
      cTime = time.time() 
      if cTime - self.deque[0] > self.timeUnit: 
       self.deque.append(cTime) 
       return False 
      else: 
       return True 
     self.deque.append(time.time()) 
     return False 

r = RateLimiter() 
for i in range(0,100): 
    time.sleep(0.1) 
    print(i, "block" if r() else "pass") 
0

J'avais besoin d'une variation dans Scala. Ici, il est:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) { 

    import Thread.sleep 
    private def now = System.currentTimeMillis/1000.0 
    private val (calls, sec) = callsPerSecond 
    private var allowance = 1.0 
    private var last = now 

    def apply(a: A): B = { 
    synchronized { 
     val t = now 
     val delta_t = t - last 
     last = t 
     allowance += delta_t * (calls/sec) 
     if (allowance > calls) 
     allowance = calls 
     if (allowance < 1d) { 
     sleep(((1 - allowance) * (sec/calls) * 1000d).toLong) 
     } 
     allowance -= 1 
    } 
    f(a) 
    } 

} 

Voici comment il peut être utilisé:

val f = Limiter((5d, 8d), { 
    _: Unit ⇒ 
    println(System.currentTimeMillis) 
}) 
while(true){f(())} 
0

Juste une implémentation python d'un code de réponse acceptée.

import time 

class Object(object): 
    pass 

def get_throttler(rate, per): 
    scope = Object() 
    scope.allowance = rate 
    scope.last_check = time.time() 
    def throttler(fn): 
     current = time.time() 
     time_passed = current - scope.last_check; 
     scope.last_check = current; 
     scope.allowance = scope.allowance + time_passed * (rate/per) 
     if (scope.allowance > rate): 
      scope.allowance = rate 
     if (scope.allowance < 1): 
      pass 
     else: 
      fn() 
      scope.allowance = scope.allowance - 1 
    return throttler