2017-10-17 10 views
2

J'ai un "périphérique" python qui s'exécute dans un conteneur docker. Il est connecté à un routeur Crossbar, recevant les messages d'événement autobahn/WAMP sur les canaux abonnés.Traiter les messages de l'autobahn Abonnements asynchrones, non bloquants

Lorsqu'un événement particulier est publié, mon appareil appelle une méthode qui se termine en quelques secondes. Maintenant, je veux qu'il saute ou manipule tous les messages du même événement qui sont reçus, tandis que la méthode est toujours en cours d'exécution. J'ai essayé d'accomplir cela en utilisant le décorateur @inlinecallback de Twisted et en plaçant un "self.busy" -flag sur le périphérique.

Mais il ne revient pas avec un différé immédiatement, mais il se comporte comme une méthode de blocage normale, de sorte que les messages entrants sont traités les uns après les autres.

Voici mon code:

from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 

class Pixel(ApplicationSession): 

@inlineCallbacks 
def onJoin(self, details): 
    yield self.subscribe(self.handler_no_access, 'com.event.no_access') 

@inlineCallbacks 
def handler_no_access(self, direction): 
    entries = len(self.handlers['no_access'][direction]) 

    if entries == 0: 
     self.handlers['no_access'][direction].append(direction) 
     result = yield self._handler_no_access() 
     return result 

    else: 
     yield print('handler_no_access: entries not 0: ', self.handlers['no_access']) 

@inlineCallbacks 
def _handler_no_access(self): 
    for direction in self.handlers['no_access']: 

     for message in self.handlers['no_access'][direction]: 
      yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 
      self.handlers['no_access'][direction] = [] 

Je l'ai déjà pris le chemin hacky avec le dictionnaire self.handler, par la voie.

EDIT

la méthode de blocage est:

yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 

Il contrôle une Neopixel au GPIO d'un Raspberry Pi, laissant clignoter sur et en dehors pendant 1 seconde. Toute autre appels à la méthode

def handler_no_access(self, direction) 

alors que le _timed_switch n'a pas fini, doivent être sautées, de sorte qu'ils ne se cumulent pas.

SOLUTION

@inlineCallbacks 
def handler_no_access(self, direction): 
    direction = str(direction) 

    if self.busy[direction] is False: 

     self.busy[direction] = True 

     # non-blocking now 
     yield deferToThread(self._handler_no_access, direction) 

    else: 
     yield print('handler_no_access: direction {} busy '.format(direction)) 

def _handler_no_access(self, direction): 

    # this takes 1s to execute 
    self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 

    self.busy[direction] = False 

Répondre

0

inlineCallbacks ne code fait pas bloquer dans le code non-bloquant. C'est juste une API alternative pour utiliser Deferreds. Les différés ne sont qu'un moyen de gérer les rappels.

Vous devez réécrire votre code de blocage pour qu'il ne bloque pas d'une autre manière. Vous n'avez pas dit quelle partie de votre code bloque, ni ce qu'il bloque, donc il est très difficile de suggérer comment vous pourriez faire cela. Les deux seuls outils généraux permettant de rendre le code de blocage non bloquant sont les threads et les processus. Ainsi, vous pouvez exécuter la fonction dans un thread ou un processus distinct. La fonction peut ou peut ne pas fonctionner dans un tel contexte d'exécution (encore une fois, aucun moyen de savoir sans savoir exactement ce qu'il fait).

+0

Eh bien, merci pour le heads-up, apparemment j'avais une mauvaise compréhension de la mécanique différée dans Twisted. – stk

+0

[link] (http://twistedmatrix.com/documents/current/core/howto/gendefer.html#what-deferreds-don-t-do-make-your-code-asynchronous) J'essaie de utilisez deferToThread (f) maintenant et je ferai rapport, si cela résout mon problème. – stk