Je suis en train d'ajouter deux coroutines à asyncio boucle et obtenir une erreur:python3 Asyncio Création d'une connexion secondaire d'une valeur initiale d'un
RuntimeError: This event loop is already running
Mon objectif est de communiquer à un serveur (que je n'ai pas le contrôle de). Ce serveur attend une connexion initiale du client. Le serveur a ensuite fourni un port au client sur cette connexion. Le client doit utiliser ce port pour créer une deuxième connexion. Cette deuxième connexion est utilisée par le serveur pour envoyer des messages non sollicités au client. La première connexion reste disponible pour d'autres communications bidirectionnelles.
Pour recréer ce scénario, j'ai un code qui reproduit l'erreur:
class Connection():
def __init__(self, ip, port, ioloop):
self.ip = ip
self.port = port
self.ioloop = ioloop
self.reader, self.writer = None, None
self.protocol = None
self.fileno = None
async def __aenter__(self):
# Applicable when doing 'with Connection(...'
log.info("Entering and Creating Connection")
self.reader, self.writer = (
await asyncio.open_connection(self.ip, self.port, loop=self.ioloop)
)
self.protocol = self.writer.transport.get_protocol()
self.fileno = self.writer.transport.get_extra_info('socket').fileno()
log.info(f"Created connection {self}")
return self
async def __aexit__(self, *args):
# Applicable when doing 'with Connection(...'
log.info(f"Exiting and Destroying Connection {self}")
if self.writer:
self.writer.close()
def __await__(self):
# Applicable when doing 'await Connection(...'
return self.__aenter__().__await__()
def __repr__(self):
return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]"
async def send_recv_message(self, message):
log.debug(f"send: '{message}'")
self.writer.write(message.encode())
await self.writer.drain()
log.debug("awaiting data...")
data = await self.reader.read(9999)
data = data.decode()
log.debug(f"recv: '{data}'")
return data
class ServerConnection(Connection):
async def setup_connection(self):
event_port = 8889 # Assume this came from the server
print("In setup connection")
event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
self.ioloop.run_until_complete(event_connection.recv_message())
class EventConnection(Connection):
async def recv_message(self):
log.debug("awaiting recv-only data...")
data = await self.reader.read(9999)
data = data.decode()
log.debug(f"recv only: '{data}'")
return data
async def main(loop):
client1 = await ServerConnection('127.0.0.1', 8888, loop)
await client1.setup_connection()
await client1.send_recv_message("Hello1")
await client1.send_recv_message("Hello2")
await asyncio.sleep(5)
if __name__ == '__main__':
#logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
ioloop = asyncio.get_event_loop()
print('starting loop')
ioloop.run_until_complete(main(ioloop))
print('completed loop')
ioloop.close()
L'erreur se produit dans la méthode ServerConnection.setup_connection() où run_until_complete est appelé. Je fais probablement quelque chose de mal en raison d'un manque de compréhension asyncio. Fondamentalement, comment puis-je configurer une connexion secondaire qui recevra des notifications d'événements (non sollicités) lors de la configuration de la première connexion?
Merci.
Followup
Puisque le code est très similaire (quelques modifications pour ajouter plus de fonctionnalités), j'espère que ce n'est pas mauvaise étiquette à followup au poste d'origine que l'erreur résultante est toujours le même.
Le nouveau problème est que lorsqu'il reçoit le message non sollicité (qui est reçu par EventConnection), la méthode recv_message appelle process_data. Je voudrais que process_data soit un futur pour que recv_message se termine (ioloop devrait s'arrêter). Le paramètre Ensure_Future le récupère et continue à s'exécuter pour utiliser ServerConnection pour effectuer une requête/réponse au serveur. Avant de faire cela, il doit aller à un code utilisateur (représenté par external_command() et de qui je préférerais cacher les choses asynchrones). Cela le rendrait synchrone à nouveau. Par conséquent, une fois qu'ils ont fait ce dont ils ont besoin, ils doivent appeler execute_command sur ServerConnection, qui relance alors la boucle. Le problème est que mon utilisation de Ensure_Future n'a pas fonctionné car il semble que la boucle ne s'est pas arrêtée. Par conséquent, lorsque l'exécution du code atteint execute_command qui exécute run_until_complete, une exception avec l'erreur "Cette boucle d'événement est déjà en cours d'exécution" se produit.
J'ai deux questions:
Comment puis-je faire en sorte que le ioloop peut arrêter après process_data est placé dans ensure_future, puis être en mesure de l'exécuter à nouveau à execute_command?
Une fois recv_message a reçu quelque chose, comment pouvons-nous faire en sorte que il peut recevoir plus de données non sollicitées? Est-il suffisant/sûr d'utiliser simplement Ensure_Future pour se rappeler?
Voici l'exemple de code qui simule ce problème.
client1 = None
class ServerConnection(Connection):
connection_type = 'Server Connection'
async def setup_connection(self):
event_port = 8889 # Assume this came from the server
print("In setup connection")
event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
asyncio.ensure_future(event_connection.recv_message())
async def _execute_command(self, data):
return await self.send_recv_message(data)
def execute_command(self, data):
response_str = self.ioloop.run_until_complete(self._execute_command(data))
print(f"exec cmd response_str: {response_str}")
def external_command(self, data):
self.execute_command(data)
class EventConnection(Connection):
connection_type = 'Event Connection'
async def recv_message(self):
global client1
log.debug("awaiting recv-only data...")
data = await self.reader.read(9999)
data = data.decode()
log.debug(f"recv-only: '{data}'")
asyncio.ensure_future(self.process_data(data))
asyncio.ensure_future(self.recv_message())
async def process_data(self, data):
global client1
await client1.external_command(data)
async def main(ioloop):
global client1
client1 = await ServerConnection('127.0.0.1', 8888, ioloop)
await client1.setup_connection()
print(f"after connection setup loop running is {ioloop.is_running()}")
await client1.send_recv_message("Hello1")
print(f"after Hello1 loop running is {ioloop.is_running()}")
await client1.send_recv_message("Hello2")
print(f"after Hello2 loop running is {ioloop.is_running()}")
while True:
print(f"inside while loop running is {ioloop.is_running()}")
t = 10
print(f"asyncio sleep {t} sec")
await asyncio.sleep(t)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
ioloop = asyncio.get_event_loop()
print('starting loop')
ioloop.run_until_complete(main(ioloop))
print('completed loop')
ioloop.close()
Vous souhaitez créer une connexion 2 en même temps? Pourquoi ne pas utiliser 'ayncio.gather'? Avec cette méthode, vous pouvez lancer 2 actions asynchrones ... –
asyncio.gather ne semble pas s'appliquer à mon cas pour 2 raisons. Premièrement, il est censé rassembler les résultats dans l'ordre indiqué et l'autre, il semble vouloir regrouper tous les futurs dans une liste. Dans mon cas, je voudrais lui passer un avenir maintenant et le deuxième avenir (la deuxième connexion) plus tard, après avoir reçu le port du premier avenir. – bhairav