J'ai un programme python qui crée un processus séparé pour un HttpServer à exécuter. Ensuite, je fais une demande à ce HttpServer en utilisant grequests.map sur un fil séparé ou grequests.send avec un threading.Pool.Python 2.7 qrequests.map sur un thread séparé obtient une réponse mais grequests.send ne fonctionne pas
Lorsque j'utilise grequests.map
sur un thread séparé, j'obtiens une réponse. Cependant, quand j'utilise grequests.send
en utilisant un threading.Pool
je n'obtiens pas de réponse.
J'ai collé le code ci-dessous qui démontre le problème que je rencontre. Pourquoi est-ce que grequests.map
sur un thread distinct fonctionne, mais grequests.send
avec un threading.Pool
ne fonctionne pas?
En utilisant grequest.send
avec threading.Pool
résultats dans la sortie suivante:
[07-19 09:43:26] p99437
{/Users/mhoggan/Development/test_rest/test_request_handler.py:48} INFO - Making requst to endpoint=http://localhost:8081/test/
[07-19 09:43:26] p99437 {/Users/mhoggan/Development/test_rest/request_handler.py:64} INFO - <grequests.send was called...>
[07-19 09:43:26] p99437 {/Users/mhoggan/Development/test_rest/test_request_handler.py:54} INFO - Waiting for response...
[07-19 09:43:31] p99437 {/Users/mhoggan/Development/test_rest/test_request_handler.py:59} CRITICAL - Failed to make request with error <type 'exceptions.AssertionError'>:.
[07-19 09:43:31] p99437 {/usr/local/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:212} DEBUG - Starting new HTTP connection (1): localhost
============================ ====================================================
Utilisation grequests.map
sur un résultat de fil séparés dans la sortie suivante:
[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/test_request_handler.py:48} INFO - Making requst to endpoint=http://localhost:8081/test/
[07-19 09:47:20] p99528 {/usr/local/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:212} DEBUG - Starting new HTTP connection (1): localhost
127.0.0.1 - - [19/Jul/2017 09:47:20] "GET /test/?token=test HTTP/1.1" 200 -
[07-19 09:47:20] p99528 {/usr/local/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:400} DEBUG - http://localhost:8081 "GET /test/?token=test HTTP/1.1" 200 None
[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/test_request_handler.py:25} INFO - Status code on valid request is 200 content=<html>
<head>
<title>Data.</title>
</head>
<body><p>This is a test.</p>
<p>You accessed path: /test/</p>
<p>With params: {'token': ['test']}</p>
</body>
</html>
[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/request_handler.py:62} INFO - <grequests.map was called...>
[07-19 09:47:20] p99528 {/Users/mhoggan/Development/test_rest/test_request_handler.py:54} INFO - Waiting for response...
================================ ==================== =================
import grequests
import logging
import threading
from gevent.pool import Pool
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(pathname)s:'
'%(lineno)d} %(levelname)s - %(message)s',
datefmt='%m-%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
class RequestHandler(object):
def __init__(self, endpoint_url):
self.endpoint_url = endpoint_url
if not self.endpoint_url.startswith("http://"):
self.endpoint_url = "http://{0}".format(self.endpoint_url)
@staticmethod
def _threaded_map(
request,
response_callback,
error_callback=None,
timeout=None
):
grequests.map(
[request],
exception_handler=error_callback,
size=1
)
def request_for_test(
self,
response_callback,
error_callback=None,
timeout=None
):
header = {"Content-type": "application/json"}
payload = {'token': 'test'}
if not error_callback:
error_callback = self.request_exception
request = grequests.get(
self.endpoint_url,
headers=header,
params=payload,
timeout=timeout,
hooks={'response': [response_callback]},
)
args = (request, response_callback, error_callback, timeout,)
thread = threading.Thread(target=self._threaded_map, args=args)
thread.run()
logger.info('<grequests.map was called...>')
#grequests.send(request, pool=Pool(2), stream=False)
#logger.info('<grequests.send was called...>')
========================== =====================================================
import logging
import s2sphere
import threading
import unittest2
from request_handler import RequestHandler
from test_service import (
TestService,
END_POINT_0,
HOST_NAME,
PORT,
)
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(pathname)s:'
'%(lineno)d} %(levelname)s - %(message)s',
datefmt='%m-%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
def _handle_valid_request(response, **kwargs):
logger.info('Status code on valid request is {0} content={1}'.format(
response.status_code,
response.content
)
)
event.set()
def _handle_error(request, exception):
logger.error('Failing the tests due to request error: '
'{0} -- in request {1}'.format(exception, request))
if __name__ == '__main__':
REQUEST_TIMEOUT = 5 # In seconds
httpd = TestService()
httpd.start()
event = threading.Event()
endpoint_url = 'http://{0}:{1}{2}'.format(
HOST_NAME,
PORT,
END_POINT_0
)
rh = RequestHandler(endpoint_url=endpoint_url)
try:
logger.info('Making requst to endpoint={0}'.format(endpoint_url))
rh.request_for_test(
_handle_valid_request,
error_callback=_handle_error,
timeout=REQUEST_TIMEOUT
)
logger.info('Waiting for response...'.format(endpoint_url))
assert(event.wait(timeout=REQUEST_TIMEOUT))
except Exception as e:
logger.fatal('Failed to make request with error {0}:{1}.'.format(
type(e),
e
)
)
finally:
httpd.stop()
============================================== ===========================
import multiprocessing
from BaseHTTPServer import (
BaseHTTPRequestHandler,
HTTPServer,
)
from urlparse import (
urlparse,
parse_qs
)
END_POINT_0 = '/test/'
HOST_NAME = 'localhost'
PORT = 8081
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
url_path = urlparse(self.path).path
url_query = urlparse(self.path).query
url_params= parse_qs(url_query)
if url_path == END_POINT_0 or url_path == END_POINT_1:
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
'<html>\n'
' <head>\n'
' <title>Data.</title>\n'
' </head>\n'
' <body><p>This is a test.</p>\n'
' <p>You accessed path: %s</p>\n'
' <p>With params: %s</p>\n'
' </body>\n'
'</html>\n' % (url_path, url_params)
)
class TestService(object):
def __init__(self, server_class=HTTPServer):
self.httpd = server_class((HOST_NAME, PORT), Handler)
self.server_process = None
def _start(self):
self.server_process.daemon = True
self.httpd.serve_forever()
def start(self):
self.stop()
if not self.server_process or not self.server_process.is_alive():
self.server_process = multiprocessing.Process(
target=self._start
)
self.server_process.start()
def stop(self):
if self.server_process:
self.httpd.server_close()
self.server_process.terminate()