J'essaye d'implémenter un client Websocket de base en utilisant asyncio et websockets avec Python 3.5.2.Rendre le callback websocket asynchrone avec asyncio
Fondamentalement, je veux connect_to_dealer
être un appel de blocage, mais attendez le message websocket sur un fil différent. Après avoir lu quelques docs (j'ai très peu exp avec Python), j'ai conclu que asyncio.ensure_future()
passant un coroutine (listen_for_message
) était le chemin à parcourir.
Maintenant, je peux courir listen_for_message
sur un thread différent, mais à l'intérieur du coroutine je ne peux pas sembler utiliser await
ou tout autre mécanisme pour faire les appels synchrone. Si je le fais, l'exécution attend éternellement (elle se bloque) même pour un simple sleep
.
Je voudrais savoir ce que je fais mal.
async def listen_for_message(self, future, websocket):
while (True):
try:
await asyncio.sleep(1) # It hangs here
print('Listening for a message...')
message = await websocket.recv() # If I remove the sleep, hangs here
print("< {}".format(message))
future.set_result(message)
future.done()
except websockets.ConnectionClosed as cc:
print('Connection closed')
except Exception as e:
print('Something happened')
def handle_connect_message(self, future):
# We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
print(future.result)
async def connect_to_dealer(self):
print('connect to dealer')
websocket = await websockets.connect('wss://mywebsocket'))
hello_message = await websocket.recv()
print("< {}".format(hello_message))
# We need to parse the connection ID out of the message
connection_id = hello_message['connectionId']
print('Got connection id {}'.format(connection_id))
sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
if sub_response.status_code == 200:
print('Now we\'re observing traffic')
else:
print('Oops request failed with {code}'.format(code=sub_response.status_code))
# Now we need to handle messages but continue with the regular execution
try:
future = asyncio.get_event_loop().create_future()
future.add_done_callback(self.handle_connect_message)
asyncio.ensure_future(self.listen_for_message(future, websocket))
except Exception as e:
print(e)
Salut shongolo, je n'ai pas besoin de travailler avec des contrats à terme explicites, mais je l'ai lu sur les docs et juste utilisé. J'ai résolu mon problème grâce à vos suggestions, je pense que la clé n'était pas d'appeler 'asyncio.ensure_future (listen_for_message (websocket))' depuis la coroutine 'connect_to_dealer()'. – mdelolmo
Heureux qu'il a déclenché une solution. Il est correct d'appeler 'ensure_future()' d'une autre coroutine mais quelque part dans votre code, vous devez piloter toutes les coroutines en utilisant 'loop.run_until_complete()' et, si nécessaire (par exemple avec des tâches) 'loop.run_forever()' – shongololo