2017-05-31 1 views
0

Je suis en train d'ajouter deux coroutines à asyncio boucle et obtenir une erreur:python3 Asyncio Création d'une connexion secondaire d'une valeur initiale d'un

RuntimeError: This event loop is already running 

Mon objectif est de communiquer à un serveur (que je n'ai pas le contrôle de). Ce serveur attend une connexion initiale du client. Le serveur a ensuite fourni un port au client sur cette connexion. Le client doit utiliser ce port pour créer une deuxième connexion. Cette deuxième connexion est utilisée par le serveur pour envoyer des messages non sollicités au client. La première connexion reste disponible pour d'autres communications bidirectionnelles.

Pour recréer ce scénario, j'ai un code qui reproduit l'erreur:

class Connection(): 
    def __init__(self, ip, port, ioloop): 
     self.ip = ip 
     self.port = port 
     self.ioloop = ioloop 
     self.reader, self.writer = None, None 
     self.protocol = None 
     self.fileno = None 

    async def __aenter__(self): 
     # Applicable when doing 'with Connection(...' 
     log.info("Entering and Creating Connection") 
     self.reader, self.writer = (
      await asyncio.open_connection(self.ip, self.port, loop=self.ioloop) 
     ) 
     self.protocol = self.writer.transport.get_protocol() 
     self.fileno = self.writer.transport.get_extra_info('socket').fileno() 

     log.info(f"Created connection {self}") 
     return self 

    async def __aexit__(self, *args): 
     # Applicable when doing 'with Connection(...' 
     log.info(f"Exiting and Destroying Connection {self}") 
     if self.writer: 
      self.writer.close() 

    def __await__(self): 
     # Applicable when doing 'await Connection(...' 
     return self.__aenter__().__await__() 

    def __repr__(self): 
     return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]" 

    async def send_recv_message(self, message): 
     log.debug(f"send: '{message}'") 
     self.writer.write(message.encode()) 
     await self.writer.drain() 

     log.debug("awaiting data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv: '{data}'") 
     return data 


class ServerConnection(Connection): 
    async def setup_connection(self): 
     event_port = 8889 # Assume this came from the server 
     print("In setup connection") 
     event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop) 
     self.ioloop.run_until_complete(event_connection.recv_message()) 

class EventConnection(Connection): 
    async def recv_message(self): 
     log.debug("awaiting recv-only data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv only: '{data}'") 
     return data 


async def main(loop): 
    client1 = await ServerConnection('127.0.0.1', 8888, loop) 
    await client1.setup_connection() 
    await client1.send_recv_message("Hello1") 
    await client1.send_recv_message("Hello2") 
    await asyncio.sleep(5) 

if __name__ == '__main__': 
    #logging.basicConfig(level=logging.INFO) 
    logging.basicConfig(level=logging.DEBUG) 
    log = logging.getLogger() 
    ioloop = asyncio.get_event_loop() 
    print('starting loop') 
    ioloop.run_until_complete(main(ioloop)) 
    print('completed loop') 
    ioloop.close() 

L'erreur se produit dans la méthode ServerConnection.setup_connection() où run_until_complete est appelé. Je fais probablement quelque chose de mal en raison d'un manque de compréhension asyncio. Fondamentalement, comment puis-je configurer une connexion secondaire qui recevra des notifications d'événements (non sollicités) lors de la configuration de la première connexion?

Merci.

Followup

Puisque le code est très similaire (quelques modifications pour ajouter plus de fonctionnalités), j'espère que ce n'est pas mauvaise étiquette à followup au poste d'origine que l'erreur résultante est toujours le même.

Le nouveau problème est que lorsqu'il reçoit le message non sollicité (qui est reçu par EventConnection), la méthode recv_message appelle process_data. Je voudrais que process_data soit un futur pour que recv_message se termine (ioloop devrait s'arrêter). Le paramètre Ensure_Future le récupère et continue à s'exécuter pour utiliser ServerConnection pour effectuer une requête/réponse au serveur. Avant de faire cela, il doit aller à un code utilisateur (représenté par external_command() et de qui je préférerais cacher les choses asynchrones). Cela le rendrait synchrone à nouveau. Par conséquent, une fois qu'ils ont fait ce dont ils ont besoin, ils doivent appeler execute_command sur ServerConnection, qui relance alors la boucle. Le problème est que mon utilisation de Ensure_Future n'a pas fonctionné car il semble que la boucle ne s'est pas arrêtée. Par conséquent, lorsque l'exécution du code atteint execute_command qui exécute run_until_complete, une exception avec l'erreur "Cette boucle d'événement est déjà en cours d'exécution" se produit.

J'ai deux questions:

  1. Comment puis-je faire en sorte que le ioloop peut arrêter après process_data est placé dans ensure_future, puis être en mesure de l'exécuter à nouveau à execute_command?

  2. Une fois recv_message a reçu quelque chose, comment pouvons-nous faire en sorte que il peut recevoir plus de données non sollicitées? Est-il suffisant/sûr d'utiliser simplement Ensure_Future pour se rappeler?

Voici l'exemple de code qui simule ce problème.

client1 = None 

class ServerConnection(Connection): 
    connection_type = 'Server Connection' 
    async def setup_connection(self): 
     event_port = 8889 # Assume this came from the server 
     print("In setup connection") 
     event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop) 
     asyncio.ensure_future(event_connection.recv_message()) 

    async def _execute_command(self, data): 
     return await self.send_recv_message(data) 

    def execute_command(self, data): 
     response_str = self.ioloop.run_until_complete(self._execute_command(data)) 
     print(f"exec cmd response_str: {response_str}") 

    def external_command(self, data): 
     self.execute_command(data) 


class EventConnection(Connection): 
    connection_type = 'Event Connection' 
    async def recv_message(self): 
     global client1 
     log.debug("awaiting recv-only data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv-only: '{data}'") 
     asyncio.ensure_future(self.process_data(data)) 
     asyncio.ensure_future(self.recv_message()) 

    async def process_data(self, data): 
     global client1 
     await client1.external_command(data) 


async def main(ioloop): 
    global client1 
    client1 = await ServerConnection('127.0.0.1', 8888, ioloop) 
    await client1.setup_connection() 
    print(f"after connection setup loop running is {ioloop.is_running()}") 
    await client1.send_recv_message("Hello1") 
    print(f"after Hello1 loop running is {ioloop.is_running()}") 
    await client1.send_recv_message("Hello2") 
    print(f"after Hello2 loop running is {ioloop.is_running()}") 
    while True: 
     print(f"inside while loop running is {ioloop.is_running()}") 
     t = 10 
     print(f"asyncio sleep {t} sec") 
     await asyncio.sleep(t) 


if __name__ == '__main__': 
    logging.basicConfig(level=logging.DEBUG) 
    log = logging.getLogger() 
    ioloop = asyncio.get_event_loop() 
    print('starting loop') 
    ioloop.run_until_complete(main(ioloop)) 
    print('completed loop') 
    ioloop.close() 
+0

Vous souhaitez créer une connexion 2 en même temps? Pourquoi ne pas utiliser 'ayncio.gather'? Avec cette méthode, vous pouvez lancer 2 actions asynchrones ... –

+0

asyncio.gather ne semble pas s'appliquer à mon cas pour 2 raisons. Premièrement, il est censé rassembler les résultats dans l'ordre indiqué et l'autre, il semble vouloir regrouper tous les futurs dans une liste. Dans mon cas, je voudrais lui passer un avenir maintenant et le deuxième avenir (la deuxième connexion) plus tard, après avoir reçu le port du premier avenir. – bhairav

Répondre

0

Essayez de remplacer:

self.ioloop.run_until_complete 

Avec

await 
+0

Je l'avais essayé, mais il serait suspendu en attendant la méthode recv_message EventConnection. En fait, je l'ai remplacé par asyncio.ensure_future et cela semblait avoir fait l'affaire. Cependant, j'ai encore une autre couche de complexité pour laquelle j'ai rencontré un problème similaire (j'ai construit ce cadre lentement pour ce dont j'ai besoin). Va l'ajouter comme une mise à jour à la question d'origine. – bhairav