2017-08-05 2 views
0
while True: 
     if (len(queue) > 0): 
      blocking_send_from_queue() 
      there_is_something_to_send = False 
     else: 
      break 
     data = non_blocking_recv() 
     queue = process(queue, data) 

Ceci est le code d'un client, qui est censé traiter les requêtes. Pour compléter une requête, il peut être nécessaire de créer plus de sous-requêtes en fonction des données reçues et de les envoyer au serveur, puis de traiter les résultats des sous-requêtes, et ainsi de suite. Puis-je créer des résultats plus efficaces ou étendus en utilisant l'asyncio?Que peut faire l'asyncio de python pour améliorer cette fonctionnalité?

+0

Cela semble vraiment propice aux conditions de course. – user2357112

Répondre

0

Vous pouvez essayer quelque chose comme ceci:

import asyncio 

loop = asyncio.get_event_loop() 
data_queue = asyncio.Queue(loop=loop) 
receiving = True 

def forward(data): 
    queue = process(queue, data) 
    if (len(queue) > 0): 
     blocking_send_from_queue() 
     return True 
    else: 
     return False 

async def receive(): 
    while receiving: 
     data = await non_blocking_recv() 
     await data_queue.put(data) 

async def main(): 
    receiver = loop.create_task(receive()) 
    while True: 
     data = await data_queue.get() 
     if not await loop.run_in_executor(None, forward, data): 
      global receiving 
      receiving = False 
      data_queue.get_no_wait() 
      break 
    await receiver 

loop.run_until_complete(main()) 

Alors que, non_blocking_recv() et blocking_send_from_queue() pourrait fonctionner en parallèle sans bloquer les uns les autres, dans le coût de la mise en mémoire tampon data blocs en mémoire avec data_queue. Vous pouvez définir la taille de la file d'attente de data_queue pour contrôler la taille de la mémoire tampon. En outre, process() et blocking_send_from_queue() ressemblent à des méthodes qui peuvent être exécutées en parallèle. Cela dépend de l'implémentation réelle, je suppose que queue est un objet persistant threading.Queue ici. Ainsi forward peut être divisé en: (sauter le code d'arrêt)

def do_process(data): 
    process(queue, data) 
    return len(queue) > 0 

def forward(): 
    while True: 
     blocking_send_from_queue() 

Et main seront changés à:

async def main(): 
    loop.create_task(receive()) 
    loop.run_in_executor(None, forward) 
    while True: 
     data = await data_queue.get() 
     if not await loop.run_in_executor(None, do_process, data): 
      break 

Idéalement, au plus un non_blocking_recv, un process et un blocking_send_from_queue pourrait fonctionner en parallèle , si GIL est correctement libéré dans blocking_send_from_queue.

Enfin, si GIL est même sorti en process, et si l'envoi de l'ordre ne doit être le même que recevoir l'ordre, il est même possible d'utiliser pleinement la puissance de calcul multi-core, en exécutant process en parallèle: (code partiel)

async def process_worker(): 
    while True: 
     data = await data_queue.get() 
     loop.run_in_executor(None, process, queue, data) 

async def main(): 
    loop.create_task(receive()) 
    loop.run_in_executor(None, forward) 
    for _ in range(CPU_CORE_NUM): 
     loop.create_task(process_worker()) 
    # wait for all tasks here