J'ai besoin de traiter des cadres à partir d'une webcam et d'envoyer quelques images sélectionnées à un serveur websocket distant. Le serveur répond immédiatement avec un message de confirmation (un peu comme un serveur d'écho). Le traitement des trames est lent et gourmand en ressources processeur, donc je veux le faire en utilisant un pool de threads séparé (producteur) pour utiliser tous les cœurs disponibles. Ainsi, le client (consommateur) reste inactif jusqu'à ce que le pool ait quelque chose à envoyer. Ma mise en œuvre actuelle, voir ci-dessous, fonctionne bien seulement si J'ajoute un petit sommeil à l'intérieur de la boucle de test du producteur. Si je supprime ce délai, je cesse de recevoir une réponse du serveur (à la fois le serveur d'écho et de mon vrai serveur). Même la première réponse est perdue, donc je ne pense pas que ce soit un mécanisme de protection contre les inondations. Qu'est-ce que je fais de mal?Client websocket Tornado perdant les messages de réponse?
import tornado
from tornado.websocket import websocket_connect
from tornado import gen, queues
import time
class TornadoClient(object):
url = None
onMessageReceived = None
onMessageSent = None
ioloop = tornado.ioloop.IOLoop.current()
q = queues.Queue()
def __init__(self, url, onMessageReceived, onMessageSent):
self.url = url
self.onMessageReceived = onMessageReceived
self.onMessageSent = onMessageSent
def enqueueMessage(self, msgData, binary=False):
print("TornadoClient.enqueueMessage")
self.ioloop.add_callback(self.addToQueue, (msgData, binary))
print("TornadoClient.enqueueMessage done")
@gen.coroutine
def addToQueue(self, msgTuple):
yield self.q.put(msgTuple)
@gen.coroutine
def main_loop(self):
connection = None
try:
while True:
while connection is None:
try:
print("Connecting...")
connection = yield websocket_connect(self.url)
print("Connected " + str(connection))
except Exception, e:
print("Exception on connection " + str(e))
connection = None
print("Retry in a few seconds...")
yield gen.Task(self.ioloop.add_timeout, time.time() + 3)
try:
print("Waiting for data to send...")
msgData, binaryVal = yield self.q.get()
print("Writing...")
sendFuture = connection.write_message(msgData, binary=binaryVal)
print("Write scheduled...")
finally:
self.q.task_done()
yield sendFuture
self.onMessageSent("Sent ok")
print("Write done. Reading...")
msg = yield connection.read_message()
print("Got msg.")
self.onMessageReceived(msg)
if msg is None:
print("Connection lost")
connection = None
print("main loop completed")
except Exception, e:
print("ExceptionExceptionException")
print(e)
connection = None
print("Exit main_loop function")
def start(self):
self.ioloop.run_sync(self.main_loop)
print("Main loop completed")
######### TEST METHODS #########
def sendMessages(client):
time.sleep(2) #TEST only: wait for client startup
while True:
client.enqueueMessage("msgData", binary=False)
time.sleep(1) # <--- comment this line to break it
def testPrintMessage(msg):
print("Received: " + str(msg))
def testPrintSentMessage(msg):
print("Sent: " + msg)
if __name__=='__main__':
from threading import Thread
client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage)
thread = Thread(target = sendMessages, args = (client,))
thread.start()
client.start()
Mon vrai problème
Dans mon programme réel j'utilise une « fenêtre comme » mécanisme pour protéger le consommateur (un serveur autobahn.twisted.websocket): le producteur peut envoyer jusqu'à un maximum de annuler l'acquittement des messages (les cadres de la webcam), puis arrête d'attendre que la moitié de la fenêtre se libère. Le consommateur envoie un message "PROCESSED" en accusant réception d'un ou plusieurs messages (juste un compteur, et non un identifiant). Ce que je vois sur le journal des consommateurs, c'est que les messages sont traités et la réponse est renvoyée, mais ces messages disparaissent quelque part dans le réseau. J'ai peu d'expérience avec asynchio alors je voulais être sûr que je ne manque pas de rendement, d'annotation ou d'autre chose.
C'est le journal de côté des consommateurs:
2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"}
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d
Ok, je le vois. Cet exemple minimal est un peu extrême :) Mais pourquoi vois-je ce même comportement avec mon vrai code? J'ajouterai quelques informations supplémentaires à ma question. – lorenzo