2017-05-29 1 views
0

Je veux utiliser la boucle d'événements pour surveiller toute insertion de données dans mon asyncio.Queue (vous pouvez trouver son code source ici https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py), mais je rencontre des problèmes. Voici le code suivant:Y at-il un moyen d'activer la boucle d'événements asyncio

import asyncio 
import threading 

async def recv(q): 
    while True: 
     msg = await q.get() 
     print(msg) 

async def checking_task(): 
    while True: 
     await asyncio.sleep(0.1) 

def loop_in_thread(loop,q): 
    asyncio.set_event_loop(loop) 
    asyncio.ensure_future(recv(q)) 
    asyncio.ensure_future(insert(q)) 
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended 
    loop.run_forever() 

async def insert(q): 
    print('invoked') 
    await q.put('hello') 

q = asyncio.Queue() 
loop = asyncio.get_event_loop() 
t = threading.Thread(target=loop_in_thread, args=(loop, q,)) 
t.start() 

Le programme a commencé et nous pouvons voir le résultat suivant

invoked 
hello 
-> print(asyncio.Task.all_tasks()) 
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39> 
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>} 

Mais maintenant, si l'on ajoute manuellement les données dans q en utilisant q.put_nowait('test'), nous obtiendrions le résultat suivant :

q.put_nowait('test') # a non-async way to add data into queue 
-> print(asyncio.Task.all_tasks()) 
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39> 
wait_for=<Future finished result=None>>} 

Comme vous pouvez le voir, l'avenir est déjà terminé, mais nous imprimons n'a toujours pas la chaîne nouvellement ajouté 'test' En d'autres termes, msg = await q.get() est toujours en attente même si le futur lié à q.get() est terminé et qu'aucune autre tâche n'est en cours d'exécution. Cela me embrouille parce que dans la documentation officielle (https://docs.python.org/3/library/asyncio-task.html), il est dit

result = attendre future ou le résultat = rendement du futur - suspend le coroutine jusqu'à ce que l'avenir est fait, puis renvoie le résultat de l'avenir

Il semblait que même si l'avenir est fait, nous avons encore besoin d'une sorte de await dans une autre fonction asynchrone pour que la boucle d'événements continue de traiter les tâches.

J'ai trouvé une solution de contournement à ce problème, qui est d'ajouter un checking_task(), et également ajouter cette coroutine dans la boucle d'événements; alors cela fonctionnera comme prévu. Mais l'ajout d'un coroutine check_task() est très coûteux pour le CPU car il ne fait que lancer une boucle while. Je me demande s'il existe un moyen manuel de déclencher cet événement await sans utiliser de fonction asynchrone. Par exemple, quelque chose de magique comme

q.put_nowait('test') 
loop.ok_you_can_start_running_other_pending_tasks() 

Les aides seront grandement appréciées! Merci.

Répondre

0

donc j'ai fini avec l'aide

loop.call_soon_threadsafe(q.put_nowait, 'test') 

et il fonctionnera comme prévu. Après comprendre cela, j'ai cherché des informations sur. Il s'est avéré que ce poste (Scheduling an asyncio coroutine from another thread) a le même problème. Et la réponse de @ kfx fonctionne également, ce qui est

loop.call_soon_threadsafe(loop.create_task, q.put('test')) 

Avis asyncio.Queue.put() est un coroutine mais asyncio.Queue.put_nowait() est une fonction normale.