2017-08-28 7 views
0

Je viens d'avoir une situation où une application multithread que j'ai écrite était suspendue à un moment où elle interagissait avec une file d'attente, qui était peuplée par une bibliothèque, dans un autre thread.En Python, quelle est la façon la plus simple de lire une file d'attente alors qu'elle n'est pas vide, et d'appeler toujours task_done?

Dans la forme la plus simple, je faisais juste

while not q.empty(): 
    row = q.get() 
    # do something with row 

La raison pour laquelle j'avais que, et pas plus (en particulier en laissant de côté task_done) était que je n'étais pas connu avec le multi-threading, ou Queue s en python, et c'est ce qui était dans l'exemple de code.

Dans ma situation, à un certain point après les heures de fonctionnement, l'application pendait, parfois à q.empty(), parfois à q.get(), et parfois après q.get().

Quand je lis sur les autres qui avaient eu des problèmes similaires, il était alors seulement que j'ai appris task_done, et quand j'ajouté que mon code, tout simplement comme,

while not q.empty(): 
    row = q.get() 
    # do something with row 
    q.task_done() 

Ces blocages que je vivais cessé. Je fais quelques opérations de mise en file d'attente comme celle-ci dans toute l'application, et je veux un moyen standard de gérer en cours d'exécution le q while not q.empty(), obtenir une ligne avec q.get(), et en appelant q.task_done().

+0

Assurez-vous d'utiliser la file d'attente synchronisée. get() utilisera les paramètres par défaut (block = True, timeout = None), donc il semblerait que vous ayez une impasse ailleurs. https://docs.python.org/3/library/queue.html#queue.Queue.get – Kamran

+0

Et que devrait être le put() correspondant? Je n'ai pas accès à ça, parce que c'est dans la bibliothèque (bien que je puisse soit demander une mise à jour, soit passer ma propre sous-classe de file d'attente, et faire ce que je veux), mais c'est q.put (data), et la file d'attente que j'ai passée a été définie sur maxsize = 0 (infini), donc je ne sais pas vraiment d'où l'impasse proviendrait. – seaders

Répondre

0

La méthode préférée que j'ai trouvé pour cela, que je suis maintenant à l'aide est une combinaison d'un context manager pour gérer la combinaison en get et task_done comme,

class read_from_q: 
    def __init__(self, q, block=False, timeout=None): 
     """ 
     :param Queue.Queue q: 
     :param bool block: 
     :param timeout: 
     """ 
     self.q = q 
     self.block = block 
     self.timeout = timeout 

    def __enter__(self): 
     return self.q.get(self.block, self.timeout) 

    def __exit__(self, _type, _value, _traceback): 
     self.q.task_done() 

avec un generator function pour gérer la not empty() vérifier et yield la ligne de nouveau à vous le souhaitez,

def queue_rows(q, block=False, timeout=None): 
    """ 
    :param Queue.Queue q: 
    :param bool block: 
    :param int timeout: 
    """ 
    while not q.empty(): 
     with read_from_q(q, block, timeout) as row: 
      yield row 

Alors maintenant, au lieu d'écrire,

while not q.empty(): 
    row = q.get() 
    # do something with row 
    q.task_done() 

J'utilise,

for row in queue_rows(q): 
    # do something with row and fogedaboutit 

que je considère personnellement plus élégant, et dans l'ensemble plus sûr (parce que je ne vais pas oublier d'appeler task_done comme ce fut le cas avant).