2017-08-27 5 views
0

Je connais bien la programmation événementielle mais j'ai rencontré ce problème et j'ai mis fin à des solutions possibles. J'ai lu la documentation de Tornado, j'ai essayé avec:Exécution de tâches asynchrones à l'aide du framework Web Tornado

  1. Futures
  2. gen.coroutine
  3. asynchrone
  4. add_timeout

mais je n'ai pas pu résoudre le problème suivant:

  • J'ai un serveur websocket qui vient écouter les nouveaux messages et en fonction du type de message, il appelle une fonction

    WebSocketHandler classe (tornado.websocket.WebSocketHandler) spécifique:

    ... 
    
    def on_message(self, message): 
        if message['type'] is X: 
         self.write(functionA(message['data'])) 
        elif message['type'] is Y: 
         self.write(functionB(message['data'])) 
    ... 
    

Le problème vient quand une fonction coûteuse informatiquement est exécuter, laisser dire fonctionn, il peut prendre jusqu'à 5 minutes pour mettre fin à

def functionA(message): 
    params = extract_params(message) 
    cmd = "computationally_expensive_tool" 
    out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir) 
    ... 
    return json.dumps({ 
         "error": False, 
         "message": "computationally_expensive_tool_execution_terminated", 
         "type": X 
        }) 

Ma question est de savoir comment puis-je exécuter ce functio n d'une manière asynchrone afin que je puisse encore gérer d'autres messages et le résultat de la fonction A quand il sera prêt?

+0

Pourriez-vous donner un exemple de vos fonctions? –

+0

Salut @ notorious.no, merci de votre intérêt. J'ai ajouté quelques détails sur la fonctionA et l'objectif que je veux atteindre. Donc functionA appelle cet outil et quand il finit, je veux pouvoir le notifier au client en lui envoyant un message. – MrGoodKat

Répondre

1

Si functionA est une fonction de blocage qui ne peut pas être fait asynchrone, vous voulez probablement exécuter dans un pool de fil:

executor = concurrent.futures.ThreadPoolExecutor() 

@gen.coroutine 
def on_message(self, message): 
    if message['type'] is X: 
     yield executor.submit(functionA, message['data']) 
    elif message['type'] is Y: 
     functionB(message['data']) 

Ceci va bloquer cette websocket jusqu'à functionA finitions, mais permettre à d'autres connexions de continuer à travailler . Si vous devez continuer à traiter d'autres types de messages à partir de la même connexion pendant que functionA s'exécute, vous avez besoin d'un arrangement plus compliqué, pouvant impliquer un .

+0

Merci pour votre réponse Ben! Oui, je dois servir le même client même si functionA est toujours en cours d'exécution. Existe-t-il un moyen d'exécuter asynchrone functionA et de lui attacher un rappel? pour que je puisse faire quelque chose comme functionA_hadler (resultA): #do quelque chose (comme répondre quand le résultat est prêt) – MrGoodKat

+0

Oui, vous pouvez faire quelque chose comme 'IOLoop.current(). add_future (executor.submit (functionA, message [ 'data']), callback) '. –

+0

Merci beaucoup! Ça marche. C'est définitivement une solution à mon problème. – MrGoodKat