Je pourrais utiliser du pseudo-code, ou mieux, Python. J'essaye d'implémenter une file d'attente de limitation de débit pour un bot Python IRC, et cela fonctionne partiellement, mais si quelqu'un déclenche moins de messages que la limite (par exemple, la limite de débit est de 5 messages par 8 secondes, et le déclencheur suivant est sur les 8 secondes (par exemple, 16 secondes plus tard), le bot envoie le message, mais la file d'attente devient pleine et le bot attend 8 secondes, même si ce n'est pas nécessaire depuis la période de 8 secondes.Qu'est-ce qu'un bon algorithme de limitation de débit?
Répondre
Voici le simplest algorithm, si vous voulez juste déposer des messages quand ils arrivent trop vite (au lieu de les faire la queue, ce qui est logique car la file d'attente peut obtenir arbitrairement grand):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate/per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
Il y a pas datastructures, minuteries, etc. dans cette solution et cela fonctionne proprement :) Pour voir cela, « indemnité » à la vitesse croît 5/8 unités par seconde au plus, soit au maximum cinq unités par huit secondes. Chaque message transféré déduit une unité, donc vous ne pouvez pas envoyer plus de cinq messages toutes les huit secondes.
Notez que rate
doit être un nombre entier, c'est-à-dire sans partie décimale non nulle, sinon l'algorithme ne fonctionnera pas correctement (le taux réel ne sera pas rate/per
). Par exemple. rate=0.5; per=1.0;
ne fonctionne pas car allowance
ne passera jamais à 1.0. Mais rate=1.0; per=2.0;
fonctionne bien.
Un Token Bucket est assez simple à implémenter.
Commencez avec un seau avec 5 jetons.
Toutes les 5/8 secondes: Si le seau a moins de 5 jetons, en ajouter un.
Chaque fois que vous souhaitez envoyer un message: Si le compartiment a un jeton ≥1, retirez un jeton et envoyez le message. Sinon, attendez/déposez le message/peu importe.
(évidemment, dans le code réel, vous utiliseriez un compteur entier au lieu de jetons réels et vous pouvez optimiser le tout 5/8s étape par horodatages stockage)
En relisant la question, si la limite de taux est entièrement réinitialisée toutes les 8 secondes, alors voici une modification:
Commencez avec un horodatage, last_send
, il ya un moment (par exemple, à l'époque). Commencez également avec le même compartiment à 5 jetons.
Frappez la règle toutes les 5/8 secondes.
Chaque fois que vous envoyez un message: Vérifiez d'abord si last_send
≥ 8 secondes. Si oui, remplissez le seau (réglez-le sur 5 jetons). Deuxièmement, s'il y a des jetons dans le compartiment, envoyez le message (sinon, drop/wait/etc.). Troisièmement, définissez last_send
à maintenant.
Cela devrait fonctionner pour ce scénario. J'ai effectivement écrit un bot IRC en utilisant une stratégie comme celle-ci (la première approche). C'est en Perl, pas en Python, mais voici un code à illustrer:
La première partie traite ici de l'ajout de jetons au godet. Vous pouvez voir l'optimisation d'ajouter des jetons en fonction du temps (2 à la dernière ligne), puis la dernière ligne serre-joint le contenu du seau au maximum (message_burst)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$ conn est une structure de données qui est passé autour. C'est à l'intérieur d'une méthode qui s'exécute de façon routinière (elle calcule quand la prochaine fois elle aura quelque chose à faire, et dort aussi longtemps ou jusqu'à ce qu'elle reçoive le trafic réseau). La partie suivante de la méthode gère l'envoi. C'est plutôt compliqué, parce que les messages ont des priorités qui leur sont associées.
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
C'est la première file d'attente, qui est exécutée, quoi qu'il arrive. Même si notre connexion est tuée pour des inondations. Utilisé pour des choses extrêmement importantes, comme répondre au PING du serveur.Ensuite, le reste des files d'attente:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
Enfin, l'état du godet est enregistré en arrière à la structure de données conn $ (en fait un peu plus tard dans la méthode, il calcule d'abord combien de temps il faudra plus de travail) Comme vous pouvez le voir, le code de gestion de seau réel est très petit - environ quatre lignes. Le reste du code est la gestion de file d'attente prioritaire. Le bot a des files d'attente prioritaires de sorte que, par exemple, quelqu'un qui discute avec lui ne peut pas l'empêcher de remplir ses importantes fonctions de coup de pied/interdiction.
Suis-je manque quelque chose ... il semble que cela vous limiter à 1 message toutes les 8 secondes après que vous obtenez dans le premier 5 @ – chills42
chills42: Oui, je l'ai lu la mauvaise question ... voir la deuxième la moitié de la réponse. – derobert
@chills: Si last_send est <8 secondes, vous n'ajoutez aucun jeton au bucket. Si votre compartiment contient des jetons, vous pouvez envoyer le message. sinon vous ne pouvez pas (vous avez déjà envoyé 5 messages dans les 8 dernières secondes) – derobert
Une solution consiste à fixer un horodatage à chaque élément de file d'attente et de jeter l'élément après 8 secondes se sont écoulées. Vous pouvez effectuer cette vérification chaque fois que la file d'attente est ajoutée.
Cela ne fonctionne que si vous limitez la taille de la file d'attente à 5 et défaussez tout ajout alors que la file d'attente est pleine.
Gardez le temps que les cinq dernières lignes ont été envoyées. Tenir les messages en file d'attente jusqu'à ce que le temps le cinquième le plus récent message (si elle existe) est au moins 8 secondes dans le passé (avec last_five comme un tableau de fois):
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
Vous faites beaucoup plus de travail que le seau à jetons ... – derobert
Pas depuis que vous l'avez révisé, je ne suis pas. – Pesto
Vous stockez cinq horodatages et vous les déplacez à plusieurs reprises dans la mémoire (ou en effectuant des opérations de liste chaînée). Je stocke un compteur entier et un horodateur. Et seulement faire de l'arithmétique et assigner. – derobert
Utilisez ce décorateur @RateLimited (ratepersec) avant votre fonction de mise en file d'attente.
Fondamentalement, ce vérifie si 1/sec de taux se sont écoulés depuis la dernière fois et sinon, attend le reste du temps, sinon il n'attend pas. Cela vous limite effectivement à taux/sec. Le décorateur peut être appliqué à n'importe quelle fonction que vous voulez limitée.
Dans votre cas, si vous voulez un maximum de 5 messages par 8 secondes, utilisez @RateLimited (0,625) avant votre fonction sendToQueue.
import time
def RateLimited(maxPerSecond):
minInterval = 1.0/float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
J'aime l'idée d'utiliser un décorateur pour cela.) Pourquoi faire lastTimeCalled une liste? Aussi, je doute que cela fonctionnera lorsque plusieurs threads appellent la même fonction RateLimited ... – Stephan202
C'est une liste parce que les types simples Comme float sont constantes lorsqu'elles sont capturées par une fermeture.En faisant une liste, la liste est constante, mais son contenu ne l'est pas.Oui, ce n'est pas sûr pour les threads, mais cela peut être facilement corrigé avec des verrous –
'time.clock() 'n'a pas assez de résolution sur mon système, donc j'ai adapté le code et changé pour utiliser' time.time() ' – mtrbean
Que diriez-vous ceci:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
pour bloquer le traitement jusqu'à ce que le message peut être envoyé, ainsi faire la queue d'autres messages, belle solution de antti peut également être modifié comme suit:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate/per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep((1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
juste attend jusqu'à ce qu'il y ait suffisamment d'espace pour envoyer le message. ne pas commencer avec deux fois le taux, allocation peut également initialisé avec 0.
Lorsque vous dormez '(1-allow) * (per/rate)', vous devez ajouter la mme quantit de 'last_check'. – Alp
Si quelqu'un est toujours intéressé, j'utilise cette classe callable simple en conjonction avec un stockage de valeur de clé LRU temporisé pour limiter le débit de requêtes par IP. Utilise une deque, mais peut être réécrit pour être utilisé avec une liste à la place.
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
J'avais besoin d'une variation dans Scala. Ici, il est:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis/1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls/sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec/calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
Voici comment il peut être utilisé:
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}
Juste une implémentation python d'un code de réponse acceptée.
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate/per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
- 1. Django: limitation de débit simple
- 2. Quelle est la meilleure façon d'implémenter un algorithme de limitation de débit pour les requêtes Web?
- 3. Comment puis-je implémenter la limitation de débit avec Apache? (demandes par seconde)
- 4. Algorithme pour déterminer le mot de passe faible/bon/fort
- 5. ISR - Débit de données maximal
- 6. Limitation de Sendmail?
- 7. Bon algorithme pour dessiner des polygones solides à 2 dimensions?
- 8. Débit cumulé de plusieurs connexions (Linux)
- 9. Calculer le débit
- 10. Comment la limitation de la bande passante fonctionne-t-elle?
- 11. Restriction ou limitation de taille de fichier en C#
- 12. Débit de débogage des performances de l'application Winsock2 minimale
- 13. Qu'est-ce qu'un bon algorithme de hachage pour ensemencer un prng avec une chaîne?
- 14. Pseudocode de cet algorithme
- 15. algorithme de recherche
- 16. Algorithme de hachage
- 17. Algorithme de pagination intelligent
- 18. Liste algorithme de classement
- 19. Algorithme de cryptographie
- 20. Algorithme de compartimentage
- 21. Algorithme de minuterie efficace
- 22. Algorithme de sous-chaîne
- 23. algorithme de traduction
- 24. Algorithme de distribution équilibrée
- 25. Algorithme de mappage XSLT
- 26. Algorithme de popularité
- 27. Limitation de la portée d'une relation ForeignKey?
- 28. Paramètres de limitation du service WCF
- 29. Limitation du taux de téléchargement C#
- 30. Limitation de l'hémisphère SQL Server 2008 R2
C'est un beau travail, c'est-à-dire. Est-ce le vôtre, ou un algorithme standard? – skaffman
Il convient également de souligner que la dimension et l'échelle de 'time_passed' doivent être les mêmes que 'per', par ex. secondes. – skaffman
Salut Skaffman, merci pour les compliments --- Je l'ai sorti de ma manche mais avec une probabilité de 99,9% quelqu'un a déjà trouvé une solution similaire :) –