2017-07-19 2 views
0

J'ai un programme python qui crée un processus séparé pour un HttpServer à exécuter. Ensuite, je fais une demande à ce HttpServer en utilisant grequests.map sur un fil séparé ou grequests.send avec un threading.Pool.Python 2.7 qrequests.map sur un thread séparé obtient une réponse mais grequests.send ne fonctionne pas

Lorsque j'utilise grequests.map sur un thread séparé, j'obtiens une réponse. Cependant, quand j'utilise grequests.send en utilisant un threading.Pool je n'obtiens pas de réponse.

J'ai collé le code ci-dessous qui démontre le problème que je rencontre. Pourquoi est-ce que grequests.map sur un thread distinct fonctionne, mais grequests.send avec un threading.Pool ne fonctionne pas?

En utilisant grequest.send avec threading.Pool résultats dans la sortie suivante:

[07-19 09:43:26] p99437 
{/Users/mhoggan/Development/test_rest/test_request_handler.py:48} INFO - Making requst to endpoint=http://localhost:8081/test/ 
[07-19 09:43:26] p99437 {/Users/mhoggan/Development/test_rest/request_handler.py:64} INFO - <grequests.send was called...> 
[07-19 09:43:26] p99437 {/Users/mhoggan/Development/test_rest/test_request_handler.py:54} INFO - Waiting for response... 
[07-19 09:43:31] p99437 {/Users/mhoggan/Development/test_rest/test_request_handler.py:59} CRITICAL - Failed to make request with error <type 'exceptions.AssertionError'>:. 
[07-19 09:43:31] p99437 {/usr/local/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:212} DEBUG - Starting new HTTP connection (1): localhost 

============================ ====================================================

Utilisation grequests.map sur un résultat de fil séparés dans la sortie suivante:

[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/test_request_handler.py:48} INFO - Making requst to endpoint=http://localhost:8081/test/ 
[07-19 09:47:20] p99528 {/usr/local/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:212} DEBUG - Starting new HTTP connection (1): localhost 
127.0.0.1 - - [19/Jul/2017 09:47:20] "GET /test/?token=test HTTP/1.1" 200 - 
[07-19 09:47:20] p99528 {/usr/local/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:400} DEBUG - http://localhost:8081 "GET /test/?token=test HTTP/1.1" 200 None 
[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/test_request_handler.py:25} INFO - Status code on valid request is 200 content=<html> 
    <head> 
    <title>Data.</title> 
    </head> 
    <body><p>This is a test.</p> 
    <p>You accessed path: /test/</p> 
    <p>With params: {'token': ['test']}</p> 
    </body> 
</html> 

[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/request_handler.py:62} INFO - <grequests.map was called...> 
[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/test_request_handler.py:54} INFO - Waiting for response... 

================================ ==================== =================

import grequests 
import logging 
import threading 

from gevent.pool import Pool 


logging.basicConfig(format='[%(asctime)s] p%(process)s {%(pathname)s:' 
          '%(lineno)d} %(levelname)s - %(message)s', 
        datefmt='%m-%d %H:%M:%S', 
        level=logging.DEBUG) 
logger = logging.getLogger(__name__) 


class RequestHandler(object): 

    def __init__(self, endpoint_url): 
     self.endpoint_url = endpoint_url 
     if not self.endpoint_url.startswith("http://"): 
      self.endpoint_url = "http://{0}".format(self.endpoint_url) 

    @staticmethod 
    def _threaded_map(
     request, 
     response_callback, 
     error_callback=None, 
     timeout=None 
    ): 
     grequests.map(
      [request], 
      exception_handler=error_callback, 
      size=1 
     ) 

    def request_for_test(
     self, 
     response_callback, 
     error_callback=None, 
     timeout=None 
    ): 
     header = {"Content-type": "application/json"} 
     payload = {'token': 'test'} 

     if not error_callback: 
      error_callback = self.request_exception 
     request = grequests.get(
      self.endpoint_url, 
      headers=header, 
      params=payload, 
      timeout=timeout, 
      hooks={'response': [response_callback]}, 
     ) 
     args = (request, response_callback, error_callback, timeout,) 
     thread = threading.Thread(target=self._threaded_map, args=args) 
     thread.run() 
     logger.info('<grequests.map was called...>') 
     #grequests.send(request, pool=Pool(2), stream=False) 
     #logger.info('<grequests.send was called...>') 

========================== =====================================================

import logging 
import s2sphere 
import threading 
import unittest2 

from request_handler import RequestHandler 
from test_service import (
    TestService, 
    END_POINT_0, 
    HOST_NAME, 
    PORT, 
) 


logging.basicConfig(format='[%(asctime)s] p%(process)s {%(pathname)s:' 
          '%(lineno)d} %(levelname)s - %(message)s', 
        datefmt='%m-%d %H:%M:%S', 
        level=logging.DEBUG) 
logger = logging.getLogger(__name__) 


def _handle_valid_request(response, **kwargs): 
    logger.info('Status code on valid request is {0} content={1}'.format(
      response.status_code, 
      response.content 
     ) 
    ) 
    event.set() 


def _handle_error(request, exception): 
    logger.error('Failing the tests due to request error: ' 
       '{0} -- in request {1}'.format(exception, request)) 

if __name__ == '__main__': 
    REQUEST_TIMEOUT = 5 # In seconds 
    httpd = TestService() 
    httpd.start() 
    event = threading.Event() 
    endpoint_url = 'http://{0}:{1}{2}'.format(
     HOST_NAME, 
     PORT, 
     END_POINT_0 
    ) 

    rh = RequestHandler(endpoint_url=endpoint_url) 
    try: 
     logger.info('Making requst to endpoint={0}'.format(endpoint_url)) 
     rh.request_for_test(
      _handle_valid_request, 
      error_callback=_handle_error, 
      timeout=REQUEST_TIMEOUT 
     ) 
     logger.info('Waiting for response...'.format(endpoint_url)) 
     assert(event.wait(timeout=REQUEST_TIMEOUT)) 
    except Exception as e: 
     logger.fatal('Failed to make request with error {0}:{1}.'.format(
       type(e), 
       e 
      ) 
     ) 
    finally: 
     httpd.stop() 

============================================== ===========================

import multiprocessing 

from BaseHTTPServer import (
    BaseHTTPRequestHandler, 
    HTTPServer, 
) 
from urlparse import (
    urlparse, 
    parse_qs 
) 


END_POINT_0 = '/test/' 
HOST_NAME = 'localhost' 
PORT = 8081 


class Handler(BaseHTTPRequestHandler): 

    def do_GET(self): 
     url_path = urlparse(self.path).path 
     url_query = urlparse(self.path).query 
     url_params= parse_qs(url_query) 

     if url_path == END_POINT_0 or url_path == END_POINT_1: 
      self.send_response(200) 
      self.send_header("Content-type", "text/html") 
      self.end_headers() 
      self.wfile.write(
       '<html>\n' 
       ' <head>\n' 
       ' <title>Data.</title>\n' 
       ' </head>\n' 
       ' <body><p>This is a test.</p>\n' 
       ' <p>You accessed path: %s</p>\n' 
       ' <p>With params: %s</p>\n' 
       ' </body>\n' 
       '</html>\n' % (url_path, url_params) 
      ) 


class TestService(object): 

    def __init__(self, server_class=HTTPServer): 
     self.httpd = server_class((HOST_NAME, PORT), Handler) 
     self.server_process = None 

    def _start(self): 
     self.server_process.daemon = True 
     self.httpd.serve_forever() 

    def start(self): 
     self.stop() 

     if not self.server_process or not self.server_process.is_alive(): 
      self.server_process = multiprocessing.Process(
       target=self._start 
      ) 
     self.server_process.start() 

    def stop(self): 
     if self.server_process: 
      self.httpd.server_close() 
      self.server_process.terminate() 

Répondre

0

la raison en est que grequests.send utilise gevent sous le capot d'une manière telle que le rappel est appelé sur le même thread que l'envoi a été appelé. Ainsi, en utilisant le threading.Event pour bloquer sur le thread empêche le rappel de réponse d'être appelé qu'après le thread.Event est signalé.