6

J'ai un list de awaitables que je veux passer au asyncio.AbstractEventLoop mais j'ai besoin de limiter les requêtes à une API tierce.Limitation des fonctions asynchrones en Python Asyncio

Je voudrais éviter quelque chose qui attend pour passer le future à la boucle parce qu'en attendant je bloque mon attente de boucle. Quelles options ai-je? Semaphores et ThreadPools limitera le nombre sont en cours d'exécution en même temps, mais ce n'est pas mon problème. J'ai besoin de limiter mes demandes à 100/sec, mais le temps qu'il faut pour compléter la requête n'a pas d'importance.

Ceci est un exemple de travail très concis (non) à l'aide de la bibliothèque standard, qui illustre le problème. Ceci est supposé ralentir à 100/sec mais étrangle à 116.651/sec. Quelle est la meilleure façon d'accélérer la planification d'une demande asynchrone asyncio?

Code de travail:

import asyncio 
from threading import Lock 

class PTBNL: 

    def __init__(self): 
     self._req_id_seq = 0 
     self._futures = {} 
     self._results = {} 
     self.token_bucket = TokenBucket() 
     self.token_bucket.set_rate(100) 

    def run(self, *awaitables): 

     loop = asyncio.get_event_loop() 

     if not awaitables: 
      loop.run_forever() 
     elif len(awaitables) == 1: 
      return loop.run_until_complete(*awaitables) 
     else: 
      future = asyncio.gather(*awaitables) 
      return loop.run_until_complete(future) 

    def sleep(self, secs) -> True: 

     self.run(asyncio.sleep(secs)) 
     return True 

    def get_req_id(self) -> int: 

     new_id = self._req_id_seq 
     self._req_id_seq += 1 
     return new_id 

    def start_req(self, key): 

     loop = asyncio.get_event_loop() 
     future = loop.create_future() 
     self._futures[key] = future 
     return future 

    def end_req(self, key, result=None): 

     future = self._futures.pop(key, None) 
     if future: 
      if result is None: 
       result = self._results.pop(key, []) 
      if not future.done(): 
       future.set_result(result) 

    def req_data(self, req_id, obj): 
     # Do Some Work Here 
     self.req_data_end(req_id) 
     pass 

    def req_data_end(self, req_id): 
     print(req_id, " has ended") 
     self.end_req(req_id) 

    async def req_data_async(self, obj): 

     req_id = self.get_req_id() 
     future = self.start_req(req_id) 

     self.req_data(req_id, obj) 

     await future 
     return future.result() 

    async def req_data_batch_async(self, contracts): 

     futures = [] 
     FLAG = False 

     for contract in contracts: 
      req_id = self.get_req_id() 
      future = self.start_req(req_id) 
      futures.append(future) 

      nap = self.token_bucket.consume(1) 

      if FLAG is False: 
       FLAG = True 
       start = asyncio.get_event_loop().time() 

      asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract) 

     await asyncio.gather(*futures) 
     elapsed = asyncio.get_event_loop().time() - start 

     return futures, len(contracts)/elapsed 

class TokenBucket: 

    def __init__(self): 
     self.tokens = 0 
     self.rate = 0 
     self.last = asyncio.get_event_loop().time() 
     self.lock = Lock() 

    def set_rate(self, rate): 
     with self.lock: 
      self.rate = rate 
      self.tokens = self.rate 

    def consume(self, tokens): 
     with self.lock: 
      if not self.rate: 
       return 0 

      now = asyncio.get_event_loop().time() 
      lapse = now - self.last 
      self.last = now 
      self.tokens += lapse * self.rate 

      if self.tokens > self.rate: 
       self.tokens = self.rate 

      self.tokens -= tokens 

      if self.tokens >= 0: 
       return 0 
      else: 
       return -self.tokens/self.rate 


if __name__ == '__main__': 

    asyncio.get_event_loop().set_debug(True) 
    app = PTBNL() 

    objs = [obj for obj in range(500)] 

    l,t = app.run(app.req_data_batch_async(objs)) 

    print(l) 
    print(t) 

Edit: J'ai ajouté un exemple simple de TrottleTestApp ici en utilisant sémaphores, mais ne peut pas accélérer l'exécution:

import asyncio 
import time 


class ThrottleTestApp: 

    def __init__(self): 
     self._req_id_seq = 0 
     self._futures = {} 
     self._results = {} 
     self.sem = asyncio.Semaphore() 

    async def allow_requests(self, sem): 
     """Permit 100 requests per second; call 
      loop.create_task(allow_requests()) 
     at the beginning of the program to start this routine. That call returns 
     a task handle that can be canceled to end this routine. 

     asyncio.Semaphore doesn't give us a great way to get at the value other 
     than accessing sem._value. We do that here, but creating a wrapper that 
     adds a current_value method would make this cleaner""" 

     while True: 
      while sem._value < 100: sem.release() 
      await asyncio.sleep(1) # Or spread more evenly 
            # with a shorter sleep and 
            # increasing the value less 

    async def do_request(self, req_id, obj): 
     await self.sem.acquire() 

     # this is the work for the request 
     self.req_data(req_id, obj) 

    def run(self, *awaitables): 

     loop = asyncio.get_event_loop() 

     if not awaitables: 
      loop.run_forever() 
     elif len(awaitables) == 1: 
      return loop.run_until_complete(*awaitables) 
     else: 
      future = asyncio.gather(*awaitables) 
      return loop.run_until_complete(future) 

    def sleep(self, secs: [float]=0.02) -> True: 

     self.run(asyncio.sleep(secs)) 
     return True 

    def get_req_id(self) -> int: 

     new_id = self._req_id_seq 
     self._req_id_seq += 1 
     return new_id 

    def start_req(self, key): 

     loop = asyncio.get_event_loop() 
     future = loop.create_future() 
     self._futures[key] = future 
     return future 

    def end_req(self, key, result=None): 

     future = self._futures.pop(key, None) 
     if future: 
      if result is None: 
       result = self._results.pop(key, []) 
      if not future.done(): 
       future.set_result(result) 

    def req_data(self, req_id, obj): 
     # This is the method that "does" something 
     self.req_data_end(req_id) 
     pass 

    def req_data_end(self, req_id): 

     print(req_id, " has ended") 
     self.end_req(req_id) 

    async def req_data_batch_async(self, objs): 

     futures = [] 
     FLAG = False 

     for obj in objs: 
      req_id = self.get_req_id() 
      future = self.start_req(req_id) 
      futures.append(future) 

      if FLAG is False: 
       FLAG = True 
       start = time.time() 

      self.do_request(req_id, obj) 

     await asyncio.gather(*futures) 
     elapsed = time.time() - start 
     print("Roughly %s per second" % (len(objs)/elapsed)) 

     return futures 


if __name__ == '__main__': 

    asyncio.get_event_loop().set_debug(True) 
    app = ThrottleTestApp() 

    objs = [obj for obj in range(10000)] 

    app.run(app.req_data_batch_async(objs)) 
+0

Êtes-vous essayer de limiter le nombre de requêtes en cours par seconde, ou les requêtes démarrées dans une seconde particulière. Par exemple, si vous lancez 100 demandes qui durent chacune 3 secondes, pouvez-vous commencer 200 demandes supplémentaires dans les 2 secondes suivantes? –

+0

@AaronSchif Peu importe quand ils sont lancés, juste au-dessus d'une fenêtre de 1 seconde, pas plus de 100 n'ont été lancés. – Jared

Répondre

14

Vous pouvez le faire en la mise en œuvre du leaky bucket algorithm:

import asyncio 
import time 

class AsyncLeakyBucket(object): 
    """A leaky bucket rate limiter. 

    Allows up to max_rate/time_period acquisitions before blocking. 

    time_period is measured in seconds; the default is 60. 

    """ 
    def __init__(self, max_rate: float, time_period: float = 60) -> None: 
     self._max_level = max_rate 
     self._rate_per_sec = max_rate/time_period 
     self._level = 0.0 
     self._last_check = 0.0 

    def _leak(self) -> None: 
     """Drip out capacity from the bucket.""" 
     if self._level: 
      # drip out enough level for the elapsed time since 
      # we last checked 
      elapsed = time.time() - self._last_check 
      decrement = elapsed * self._rate_per_sec 
      self._level = max(self._level - decrement, 0) 
     self._last_check = time.time() 

    def has_capacity(self, amount: float = 1) -> bool: 
     """Check if there is enough space remaining in the bucket""" 
     self._leak() 
     return self._level + amount <= self._max_level 

    async def acquire(self, amount: float = 1) -> None: 
     """Acquire space in the bucket. 

     If the bucket is full, block until there is space. 

     """ 
     if amount > self._max_level: 
      raise ValueError("Can't acquire more than the bucket capacity") 

     while not self.has_capacity(amount): 
      # wait for the next drip to have left the bucket 
      await asyncio.sleep(1/self._rate_per_sec) 

     self._level += amount 

    async def __aenter__(self) -> None: 
     await self.acquire() 
     return None 

    async def __aexit__(self, exc_type, exc, tb) -> None: 
     pass 

Notez que nous plafond fuite l'acity du seau de façon opportuniste, il n'y a pas besoin d'exécuter une boucle async séparée juste pour abaisser le niveau; à la place, la capacité est filtrée lors du test de capacité restante suffisante.

Vous pouvez l'utiliser comme un gestionnaire de contexte; en essayant d'acquérir le seau quand il est des blocs pleins jusqu'à une capacité suffisante a été libéré à nouveau:

bucket = AsyncLeakyBucket(100) 

# ... 

async with bucket: 
    # only reached once the bucket is no longer full 

ou vous pouvez appeler acquire() directement:

await bucket.acquire() # blocks until there is space in the bucket 

ou vous pouvez simplement vérifier s'il y a d'abord l'espace:

if bucket.has_capacity(): 
    # reject a request due to rate limiting 

Notez que vous pouvez compter certaines demandes comme « plus lourd » ou « léger » en augmentant ou en diminuant le montant que vous « goutte à goutte » dans le seau:

await bucket.acquire(10) 
if bucket.has_capacity(0.5): 

Démo:

>>> import asyncio, time 
>>> bucket = AsyncLeakyBucket(5, 10) 
>>> async def task(): 
...  async with bucket: 
...   print('Drip!', time.time() - ref) 
... 
>>> ref = time.time() 
>>> tasks = [task() for _ in range(15)] 
>>> loop = asyncio.get_event_loop() 
>>> loop.run_until_complete(asyncio.wait(tasks)) 
Drip! 0.0016927719116210938 
Drip! 0.0017199516296386719 
Drip! 0.00173187255859375 
Drip! 0.0017418861389160156 
Drip! 0.001750946044921875 
Drip! 2.003826856613159 
Drip! 4.007770776748657 
Drip! 6.011734962463379 
Drip! 8.016689777374268 
Drip! 10.019418716430664 
Drip! 12.0219247341156 
Drip! 14.026055812835693 
Drip! 16.028339862823486 
Drip! 18.03285503387451 
Drip! 20.037498712539673 

Le godet est rempli rapidement au début, ce qui provoque le reste des tâches à répartir plus uniformément; toutes les 2 secondes, une capacité suffisante est libérée pour qu'une autre tâche soit gérée.

La mise en œuvre ci-dessus est un peu simpliste dans la mesure où nous supposons que l'attente 1/self._rate_per_sec est une bonne stratégie pour acquérir de la capacité. Avec plusieurs tâches en attente de capacité, il n'y a aucune garantie qu'elles acquerront de la capacité dans le même ordre qu'elles l'ont demandé; vous pouvez utiliser un asyncio.Event() instance avec un message d'erreur à la place, et _leak informer ceux qui attendent de la capacité via cet événement lorsque (une certaine) capacité a été libérée. Un Event utilise une file d'attente pour notifier les éléments en attente dans l'ordre.

+0

Bookmarked dans mon dossier "Greatest algorithms";) – glenfant

0

Une autre solution - en utilisant sémaphores bornées - par un collègue de travail, mentor et ami, est le suivant:

import asyncio 


class AsyncLeakyBucket(object): 

    def __init__(self, max_tasks: float, time_period: float = 60, loop: asyncio.events=None): 
     self._delay_time = time_period/max_tasks 
     self._sem = asyncio.BoundedSemaphore(max_tasks) 
     self._loop = loop or asyncio.get_event_loop() 
     self._loop.create_task(self._leak_sem()) 

    async def _leak_sem(self): 
     """ 
     Background task that leaks semaphore releases based on the desired rate of tasks per time_period 
     """ 
     while True: 
      await asyncio.sleep(self._delay_time) 
      try: 
       self._sem.release() 
      except ValueError: 
       pass 

    async def __aenter__(self) -> None: 
     await self._sem.acquire() 

    async def __aexit__(self, exc_type, exc, tb) -> None: 
     pass 

peut toujours être utilisé avec le même code async with bucket comme dans @ la réponse de Martijn