2017-05-05 1 views
2

J'essaie de 2 façons d'arrêter une boucle infinie de courir:asyncio CancelledError et KeyboardInterrupt

  • supervisor_1: tâche est annulée programatically
  • supervisor_2: tâche est arrêté avec Ctrl + C

Alors que supervisor_2 ne jette aucune erreur au cas d'interruption, je ne peux pas superviseur_1 d'obtenir Task was destroyed but it is pending!. Une idée pourquoi?

Voici le code:

import asyncio 
import aioredis 
from functools import partial 



class Listener: 
    def __init__(self, redis_conn): 
     self.redis_conn = redis_conn 

    async def forever(self, loop_name): 
     counter = 0 
     try: 
      while True: 
       print('{}: {}'.format(loop_name, counter)) 
       counter += 1 
       await asyncio.sleep(1) 
     except asyncio.CancelledError: 
      print('Task Cancelled') 
      self.redis_conn.close() 
      await self.redis_conn.wait_closed() 


async def supervisor_1(redis_conn): 
    redis_conn = await redis_conn 

    l = Listener(redis_conn) 

    task = asyncio.ensure_future(
     asyncio.gather(l.forever('loop_1'), 
         l.forever('loop_2'))) 
    await asyncio.sleep(2) 
    task.cancel() 


async def supervisor_2(redis_conn): 
    redis_conn = await redis_conn 

    l = Listener(redis_conn) 
    await asyncio.gather(l.forever('loop_1'), 
         l.forever('loop_2')) 


if __name__ == '__main__': 
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1) 

    loop = asyncio.get_event_loop() 
    run = partial(supervisor_2, redis_conn=redis_conn) 
    task = asyncio.ensure_future(run()) 
    try: 
     loop.run_until_complete(task) 
    except KeyboardInterrupt: 
     print('Interruped !') 
     task.cancel() 
     loop.run_forever() 
    finally: 
     loop.close() 

@update:

Merci à @Gerasimov, voici une version qui résoudre le problème, mais en quelque sorte soulèvent encore des erreurs de temps en temps sur KeyboardInterrupt :

async def supervisor(redis_conn): 
    redis_conn = await redis_conn 

    l = Listener(redis_conn) 

    task = asyncio.ensure_future(
     asyncio.gather(l.forever('loop_1'), 
         l.forever('loop_2')) 
    ) 
    await asyncio.sleep(10) 
    task.cancel() 
    with suppress(asyncio.CancelledError): 
     await task 

async def kill_tasks(): 
    pending = asyncio.Task.all_tasks() 
    for task in pending: 
     task.cancel() 
     with suppress(asyncio.CancelledError): 
      await task 

et

if __name__ == '__main__': 
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1) 

    loop = asyncio.get_event_loop() 
    run = partial(supervisor, redis_conn=redis_conn) 
    task = asyncio.ensure_future(run()) 
    try: 
     loop.run_until_complete(task) 
    except KeyboardInterrupt: 
     print('Interruped !') 
     loop.run_until_complete(kill_tasks()) 
    finally: 
     loop.close() 

Répondre

2

task.cancel() lui-même ne termine pas la tâche: il dit simplement à la tâche que CancelledError devrait être soulevée à l'intérieur et revient immédiatement. Vous devriez l'appeler et attendre que la tâche soit effectivement annulée (alors que cela va augmenter CancelledError).

Vous ne devez pas non plus supprimer la tâche interne CancelledError.

Lisez this answer où j'ai essayé de montrer différentes façons de travailler avec des tâches. Par exemple pour annuler une tâche et l'attendre annulé vous pouvez faire:

from contextlib import suppress 


task = ... # remember, task doesn't suppress CancelledError itself 

task.cancel() # returns immediately, we should await task raised CancelledError. 

with suppress(asyncio.CancelledError): 
    await task # or loop.run_until_complete(task) if it happens after event loop stopped 

# Now when we awaited for CancelledError and handled it, 
# task is finally over and we can close event loop without warning. 
+0

merci pour le lien. J'ai mis à jour ma réponse avec ce que j'ai compris d'un correctif. Mais toujours obtenir des erreurs (mais pas toujours comme avant) – Orelus

+1

@Orelus, même erreur qu'avant? Essayez de déplacer 'loop.run_until_complete (kill_tasks())' dans le bloc finally, juste avant 'loop.close()'. Cela devrait probablement résoudre le problème. Je ne suis pas sûr de ce que fait votre coroutine 'run()', mais la situation peut probablement arriver, quand elle est terminée, mais certaines tâches ne le sont pas: dans ce cas, à la fermeture de la boucle d'événements, vous obtiendrez un avertissement même si KeyboardInterrupt . –