2017-05-13 4 views
0

J'ai besoin de traiter des cadres à partir d'une webcam et d'envoyer quelques images sélectionnées à un serveur websocket distant. Le serveur répond immédiatement avec un message de confirmation (un peu comme un serveur d'écho). Le traitement des trames est lent et gourmand en ressources processeur, donc je veux le faire en utilisant un pool de threads séparé (producteur) pour utiliser tous les cœurs disponibles. Ainsi, le client (consommateur) reste inactif jusqu'à ce que le pool ait quelque chose à envoyer. Ma mise en œuvre actuelle, voir ci-dessous, fonctionne bien seulement si J'ajoute un petit sommeil à l'intérieur de la boucle de test du producteur. Si je supprime ce délai, je cesse de recevoir une réponse du serveur (à la fois le serveur d'écho et de mon vrai serveur). Même la première réponse est perdue, donc je ne pense pas que ce soit un mécanisme de protection contre les inondations. Qu'est-ce que je fais de mal?Client websocket Tornado perdant les messages de réponse?

import tornado 
from tornado.websocket import websocket_connect 
from tornado import gen, queues 

import time 

class TornadoClient(object): 

    url = None 
    onMessageReceived = None 
    onMessageSent = None 

    ioloop = tornado.ioloop.IOLoop.current() 
    q = queues.Queue() 

    def __init__(self, url, onMessageReceived, onMessageSent): 
     self.url = url 
     self.onMessageReceived = onMessageReceived 
     self.onMessageSent = onMessageSent 

    def enqueueMessage(self, msgData, binary=False): 
     print("TornadoClient.enqueueMessage") 

     self.ioloop.add_callback(self.addToQueue, (msgData, binary)) 

     print("TornadoClient.enqueueMessage done") 

    @gen.coroutine 
    def addToQueue(self, msgTuple): 
     yield self.q.put(msgTuple) 

    @gen.coroutine 
    def main_loop(self): 

     connection = None 
     try: 
      while True: 

       while connection is None: 
       try: 
        print("Connecting...") 
        connection = yield websocket_connect(self.url) 
        print("Connected " + str(connection)) 
       except Exception, e: 
        print("Exception on connection " + str(e)) 
        connection = None 
        print("Retry in a few seconds...") 
        yield gen.Task(self.ioloop.add_timeout, time.time() + 3) 

       try: 
        print("Waiting for data to send...") 
        msgData, binaryVal = yield self.q.get() 
        print("Writing...") 
        sendFuture = connection.write_message(msgData, binary=binaryVal) 
        print("Write scheduled...") 
       finally: 
       self.q.task_done() 

       yield sendFuture 
       self.onMessageSent("Sent ok") 

       print("Write done. Reading...") 
       msg = yield connection.read_message() 
       print("Got msg.") 
       self.onMessageReceived(msg) 

       if msg is None: 
       print("Connection lost") 
       connection = None 

      print("main loop completed") 
     except Exception, e: 
      print("ExceptionExceptionException") 
      print(e) 
      connection = None 

     print("Exit main_loop function") 

    def start(self): 
     self.ioloop.run_sync(self.main_loop) 
     print("Main loop completed") 


######### TEST METHODS ######### 

def sendMessages(client): 
    time.sleep(2) #TEST only: wait for client startup 
    while True: 
     client.enqueueMessage("msgData", binary=False) 
     time.sleep(1) # <--- comment this line to break it 

def testPrintMessage(msg): 
    print("Received: " + str(msg)) 

def testPrintSentMessage(msg): 
    print("Sent: " + msg) 


if __name__=='__main__': 

    from threading import Thread 

    client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage) 

    thread = Thread(target = sendMessages, args = (client,)) 
    thread.start() 

    client.start() 

Mon vrai problème

Dans mon programme réel j'utilise une « fenêtre comme » mécanisme pour protéger le consommateur (un serveur autobahn.twisted.websocket): le producteur peut envoyer jusqu'à un maximum de annuler l'acquittement des messages (les cadres de la webcam), puis arrête d'attendre que la moitié de la fenêtre se libère. Le consommateur envoie un message "PROCESSED" en accusant réception d'un ou plusieurs messages (juste un compteur, et non un identifiant). Ce que je vois sur le journal des consommateurs, c'est que les messages sont traités et la réponse est renvoyée, mais ces messages disparaissent quelque part dans le réseau. J'ai peu d'expérience avec asynchio alors je voulais être sûr que je ne manque pas de rendement, d'annotation ou d'autre chose.

C'est le journal de côté des consommateurs:

2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"} 
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d 

Répondre

0

C'est un code propre. Je crois que la raison pour laquelle vous avez besoin de dormir dans votre thread sendMessages est parce que, sinon, il continue à appeler enqueueMessage aussi vite que possible, des millions de fois par seconde. Puisque enqueueMessage fait pas attendre le traitement du message en file d'attente, il continue à appeler IOLoop.add_callback aussi vite que possible, sans donner à la boucle assez d'occasion pour exécuter les rappels.

La boucle peut faire une certaine progression en cours d'exécution sur le thread principal, puisque vous ne le bloquez pas réellement. Mais le thread sendMessages ajoute les callbacks beaucoup plus vite que la boucle ne peut les gérer. Au moment où la boucle a sorti un message de la file d'attente et a commencé à le traiter, des millions de nouveaux rappels sont déjà ajoutés, que la boucle doit exécuter avant de pouvoir passer à l'étape suivante du traitement des messages.

Par conséquent, pour votre code de test, je pense qu'il est correct de dormir entre les appels à enqueueMessage sur le fil.

+0

Ok, je le vois. Cet exemple minimal est un peu extrême :) Mais pourquoi vois-je ce même comportement avec mon vrai code? J'ajouterai quelques informations supplémentaires à ma question. – lorenzo