3

J'essaye d'implémenter un client Websocket de base en utilisant asyncio et websockets avec Python 3.5.2.Rendre le callback websocket asynchrone avec asyncio

Fondamentalement, je veux connect_to_dealer être un appel de blocage, mais attendez le message websocket sur un fil différent. Après avoir lu quelques docs (j'ai très peu exp avec Python), j'ai conclu que asyncio.ensure_future() passant un coroutine (listen_for_message) était le chemin à parcourir.

Maintenant, je peux courir listen_for_message sur un thread différent, mais à l'intérieur du coroutine je ne peux pas sembler utiliser await ou tout autre mécanisme pour faire les appels synchrone. Si je le fais, l'exécution attend éternellement (elle se bloque) même pour un simple sleep.

Je voudrais savoir ce que je fais mal.

async def listen_for_message(self, future, websocket): 
    while (True): 
     try: 
      await asyncio.sleep(1) # It hangs here 
      print('Listening for a message...') 
      message = await websocket.recv() # If I remove the sleep, hangs here 
      print("< {}".format(message)) 
      future.set_result(message) 
      future.done() 
     except websockets.ConnectionClosed as cc: 
      print('Connection closed') 
     except Exception as e: 
      print('Something happened') 

def handle_connect_message(self, future): 
    # We must first remove the websocket-specific payload because we're only interested in the connect protocol msg 
    print(future.result) 

async def connect_to_dealer(self): 
    print('connect to dealer') 
    websocket = await websockets.connect('wss://mywebsocket')) 
    hello_message = await websocket.recv() 
    print("< {}".format(hello_message)) 
    # We need to parse the connection ID out of the message 
    connection_id = hello_message['connectionId'] 
    print('Got connection id {}'.format(connection_id)) 
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers) 
    if sub_response.status_code == 200: 
     print('Now we\'re observing traffic') 
    else: 
     print('Oops request failed with {code}'.format(code=sub_response.status_code)) 
    # Now we need to handle messages but continue with the regular execution 
    try: 
     future = asyncio.get_event_loop().create_future() 
     future.add_done_callback(self.handle_connect_message) 
     asyncio.ensure_future(self.listen_for_message(future, websocket)) 
    except Exception as e: 
     print(e) 

Répondre

2

Y a-t-il une raison particulière pour laquelle vous devez travailler avec des contrats à terme explicites?

Avec asyncio, vous pouvez utiliser une combinaison de coroutines et Tasks pour atteindre la plupart des objectifs. Les tâches sont essentiellement des coroutines enveloppées qui se déroulent en arrière-plan, indépendamment des autres codes asynchrones, de sorte que vous n'avez pas besoin de gérer explicitement leur flux ou de les jongler avec d'autres bits de code.

Je ne suis pas tout à fait sûr de votre objectif final, mais peut-être l'approche élaborée ci-dessous vous donne quelque chose à travailler avec:

import asyncio 

async def listen_for_message(): 

    while True: 

     await asyncio.sleep(0) 

     try: 
      print('Listening for a message...') 
      message = await websocket.recv() 

      print("< {}".format(message)) 

     except websockets.ConnectionClosed as cc: 
      print('Connection closed') 

     except Exception as e: 
      print('Something happened') 


async def connect_to_dealer(): 

    print('connect to dealer') 
    websocket = await websockets.connect('wss://mywebsocket') 

    hello_message = await websocket.recv() 
    print("< {}".format(hello_message)) 

    # We need to parse the connection ID out of the message 
    connection_id = hello_message['connectionId'] 
    print('Got connection id {}'.format(connection_id)) 

    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
     user_id='username', connection_id=connection_id), headers=headers) 

    if sub_response.status_code == 200: 
     print('Now we\'re observing traffic') 
    else: 
     print('Oops request failed with {code}'.format(code=sub_response.status_code)) 


async def my_app(): 

    # this will block until connect_to_dealer() returns 
    websocket = await connect_to_dealer() 

    # start listen_for_message() in its own task wrapper, so doing it continues in the background 
    asyncio.ensure_future(listen_for_message(websocket)) 

    # you can continue with other code here that can now coexist with listen_for_message() 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(my_app()) 
    loop.run_forever() 
+1

Salut shongolo, je n'ai pas besoin de travailler avec des contrats à terme explicites, mais je l'ai lu sur les docs et juste utilisé. J'ai résolu mon problème grâce à vos suggestions, je pense que la clé n'était pas d'appeler 'asyncio.ensure_future (listen_for_message (websocket))' depuis la coroutine 'connect_to_dealer()'. – mdelolmo

+2

Heureux qu'il a déclenché une solution. Il est correct d'appeler 'ensure_future()' d'une autre coroutine mais quelque part dans votre code, vous devez piloter toutes les coroutines en utilisant 'loop.run_until_complete()' et, si nécessaire (par exemple avec des tâches) 'loop.run_forever()' – shongololo