2016-07-15 3 views
3

Je dois appeler une tâche de céleri pour chaque requête GRPC et renvoyer le résultat. Dans l'implémentation GRPC par défaut, chaque requête est traitée dans un thread distinct d'un pool de threads. Dans mon cas, le serveur est supposé traiter environ 400 demandes en mode batch par seconde. Ainsi, une requête peut devoir attendre 1 seconde pour le résultat dû au traitement par lots, ce qui signifie que la taille du pool de threads doit être supérieure à 400 pour éviter le blocage.Comment implémenter un serveur async grpc python?

Est-ce que cela peut être fait de manière asynchrone? Merci beaucoup.

class EventReporting(ss_pb2.BetaEventReportingServicer, ss_pb2.BetaDeviceMgtServicer): 
    def ReportEvent(self, request, context): 
    res = tasks.add.delay(1,2) 
    result = res.get() ->here i have to block 
    return ss_pb2.GeneralReply(message='Hello, %s!' % result.message) 

Répondre

3

Il peut être fait de manière asynchrone si votre appel à res.get peut être fait de manière asynchrone (si elle est définie avec le mot-clé async).

While grpc.server says it requires a futures.ThreadPoolExecutor, it will actually work with any futures.Executor that calls the behaviors submitted to it on some thread other than the one on which they were passed. Si vous deviez passer à grpc.server un futures.Executor implémenté par vous qui n'utilisait qu'un seul thread pour effectuer quatre cents appels simultanés (ou plus) vers EventReporting.ReportEvent, votre serveur devrait éviter le type de blocage que vous décrivez.

1

À mon avis est bonne mise en œuvre simple serveur async grpc, même comme http basé sur aiohttp.

import asyncio 
from concurrent import futures 
import functools 
import inspect 
import threading 

from grpc import _server 

def _loop_mgr(loop: asyncio.AbstractEventLoop): 

    asyncio.set_event_loop(loop) 
    loop.run_forever() 

    # If we reach here, the loop was stopped. 
    # We should gather any remaining tasks and finish them. 
    pending = asyncio.Task.all_tasks(loop=loop) 
    if pending: 
     loop.run_until_complete(asyncio.gather(*pending)) 


class AsyncioExecutor(futures.Executor): 

    def __init__(self, *, loop=None): 

     super().__init__() 
     self._shutdown = False 
     self._loop = loop or asyncio.get_event_loop() 
     self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,), 
             daemon=True) 
     self._thread.start() 

    def submit(self, fn, *args, **kwargs): 

     if self._shutdown: 
      raise RuntimeError('Cannot schedule new futures after shutdown') 

     if not self._loop.is_running(): 
      raise RuntimeError("Loop must be started before any function can " 
           "be submitted") 

     if inspect.iscoroutinefunction(fn): 
      coro = fn(*args, **kwargs) 
      return asyncio.run_coroutine_threadsafe(coro, self._loop) 

     else: 
      func = functools.partial(fn, *args, **kwargs) 
      return self._loop.run_in_executor(None, func) 

    def shutdown(self, wait=True): 
     self._loop.stop() 
     self._shutdown = True 
     if wait: 
      self._thread.join() 


# --------------------------------------------------------------------------- # 


async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): 
    context = _server._Context(rpc_event, state, request_deserializer) 
    try: 
     return await behavior(argument, context), True 
    except Exception as e: # pylint: disable=broad-except 
     with state.condition: 
      if e not in state.rpc_errors: 
       details = 'Exception calling application: {}'.format(e) 
       _server.logging.exception(details) 
       _server._abort(state, rpc_event.operation_call, 
         _server.cygrpc.StatusCode.unknown, _server._common.encode(details)) 
     return None, False 

async def _take_response_from_response_iterator(rpc_event, state, response_iterator): 
    try: 
     return await response_iterator.__anext__(), True 
    except StopAsyncIteration: 
     return None, True 
    except Exception as e: # pylint: disable=broad-except 
     with state.condition: 
      if e not in state.rpc_errors: 
       details = 'Exception iterating responses: {}'.format(e) 
       _server.logging.exception(details) 
       _server._abort(state, rpc_event.operation_call, 
         _server.cygrpc.StatusCode.unknown, _server._common.encode(details)) 
     return None, False 

async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, 
            request_deserializer, response_serializer): 
    argument = argument_thunk() 
    if argument is not None: 
     response, proceed = await _call_behavior(rpc_event, state, behavior, 
               argument, request_deserializer) 
     if proceed: 
      serialized_response = _server._serialize_response(
       rpc_event, state, response, response_serializer) 
      if serialized_response is not None: 
       _server._status(rpc_event, state, serialized_response) 

async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, 
            request_deserializer, response_serializer): 
    argument = argument_thunk() 
    if argument is not None: 
     # Notice this calls the normal `_call_behavior` not the awaitable version. 
     response_iterator, proceed = _server._call_behavior(
      rpc_event, state, behavior, argument, request_deserializer) 
     if proceed: 
      while True: 
       response, proceed = await _take_response_from_response_iterator(
        rpc_event, state, response_iterator) 
       if proceed: 
        if response is None: 
         _server._status(rpc_event, state, None) 
         break 
        else: 
         serialized_response = _server._serialize_response(
          rpc_event, state, response, response_serializer) 
         print(response) 
         if serialized_response is not None: 
          print("Serialized Correctly") 
          proceed = _server._send_response(rpc_event, state, 
                serialized_response) 
          if not proceed: 
           break 
         else: 
          break 
       else: 
        break 

_server._unary_response_in_pool = _unary_response_in_pool 
_server._stream_response_in_pool = _stream_response_in_pool 


if __name__ == '__main__': 
    server = grpc.server(AsyncioExecutor()) 
    # Add Servicer and Start Server Here 

lien original:
https://gist.github.com/seglberg/0b4487b57b4fd425c56ad72aba9971be

+0

Je fixe un commentaire. S'il vous plaît supprimer l'évaluation négative, parce que je pense que c'est une bonne mise en œuvre – Vetos