Vous pouvez essayer quelque chose comme ceci:
import asyncio
loop = asyncio.get_event_loop()
data_queue = asyncio.Queue(loop=loop)
receiving = True
def forward(data):
queue = process(queue, data)
if (len(queue) > 0):
blocking_send_from_queue()
return True
else:
return False
async def receive():
while receiving:
data = await non_blocking_recv()
await data_queue.put(data)
async def main():
receiver = loop.create_task(receive())
while True:
data = await data_queue.get()
if not await loop.run_in_executor(None, forward, data):
global receiving
receiving = False
data_queue.get_no_wait()
break
await receiver
loop.run_until_complete(main())
Alors que, non_blocking_recv()
et blocking_send_from_queue()
pourrait fonctionner en parallèle sans bloquer les uns les autres, dans le coût de la mise en mémoire tampon data
blocs en mémoire avec data_queue
. Vous pouvez définir la taille de la file d'attente de data_queue
pour contrôler la taille de la mémoire tampon. En outre, process()
et blocking_send_from_queue()
ressemblent à des méthodes qui peuvent être exécutées en parallèle. Cela dépend de l'implémentation réelle, je suppose que queue
est un objet persistant threading.Queue
ici. Ainsi forward
peut être divisé en: (sauter le code d'arrêt)
def do_process(data):
process(queue, data)
return len(queue) > 0
def forward():
while True:
blocking_send_from_queue()
Et main
seront changés à:
async def main():
loop.create_task(receive())
loop.run_in_executor(None, forward)
while True:
data = await data_queue.get()
if not await loop.run_in_executor(None, do_process, data):
break
Idéalement, au plus un non_blocking_recv
, un process
et un blocking_send_from_queue
pourrait fonctionner en parallèle , si GIL est correctement libéré dans blocking_send_from_queue
.
Enfin, si GIL est même sorti en process
, et si l'envoi de l'ordre ne doit être le même que recevoir l'ordre, il est même possible d'utiliser pleinement la puissance de calcul multi-core, en exécutant process
en parallèle: (code partiel)
async def process_worker():
while True:
data = await data_queue.get()
loop.run_in_executor(None, process, queue, data)
async def main():
loop.create_task(receive())
loop.run_in_executor(None, forward)
for _ in range(CPU_CORE_NUM):
loop.create_task(process_worker())
# wait for all tasks here
Cela semble vraiment propice aux conditions de course. – user2357112