2016-12-11 2 views
1

J'ai cherché la barrière de python mais il y a très peu de problèmes liés. Je suis toujours confus au sujet de barrier.wait(), même mon code fonctionne."Barrière de filetage Python" Pourquoi ce code fonctionne-t-il et existe-t-il un meilleur moyen?

J'utilise la barrière python pour implémenter une telle fonction: Un thread principal et n sous-threads. Dans chaque tour, le thread principal attend que tous les sous-threads terminent leur travail en cours, puis tous les threads passent au tour suivant jusqu'à certaines conditions que nous rencontrons. Ainsi, j'ai trouvé que la barrière est propre à implémenter cette fonction, voici mon code pour le thread principal.

def superstep(self): 
    workers = [] 
    barrier = threading.Barrier(self.num_workers+1) 
    for vertex in self.vertices: 
     worker = Worker(vertex, barrier) 
     workers.append(worker) 
     worker.start() 

    while self.flag: 
     barrier.wait() 
     self.distributeMessages() 
     self.count += 1 
     print ("superstep: ", self.count) 
     self.flag = self.isTerminated() 

    for worker in workers: 
     worker.flag = False 

    for worker in workers: 
     worker.join() 
  1. la première boucle « pour » crée n threads nommés travailleurs et stockés dans une liste de travailleurs. La boucle 'while' est le thread principal qui attend les autres sous-threads, et se casse lorsque self.flag est False.
  2. la deuxième boucle 'for' utilisée pour définir le flag sur False dans chaque worker (sous-threads), en leur disant de quitter la boucle.

Voici ma classe Worker.

class Worker(threading.Thread): 
    def __init__(self, vertex, barrier): 
     threading.Thread.__init__(self) 
     self.vertex = vertex 
     self.flag = True 
     self.barrier = barrier 

    def run(self): 
     while self.flag: 
      self.barrier.wait() 
      do something 

Le code fonctionne bien, tous les threads peuvent rejoindre(). Mais comme je l'ai regardé python barrier, tous les threads seront publiés simultanément lorsque tous les threads appellent wait(). Que se passe-t-il si les threads principaux se détachent de sa boucle while et que tous les autres threads l'attendent, dans ce cas, la seconde boucle for est inutile et les sous-threads ne rejoindront jamais().

Alors, comment ce code fonctionne, existe-t-il un autre moyen de quitter la barrière au lieu d'élever BrokenBarrierError? En outre, si j'ajoute du code dans la deuxième boucle 'for', j'imprime des informations ou autre chose, la procédure est bloquée. Je suppose qu'il doit y avoir des sous-threads qui sont en attente() et n'ont aucune chance de vérifier l'indicateur, donc ils ne peuvent pas quitter run() des threads.

+0

Vous pourriez peut-être appeler 'barrier.abort()' après la seconde pour, libérer les travailleurs en attente. – Gribouillis

+0

@Gribouillis Merci pour votre réponse. barrier.abort() déclenche un BrokenBarrierError et cela empêche mon code de s'exécuter, donc je me demande s'il existe un meilleur moyen. – fancyqlx

+0

Vous pourriez attraper le BrokenBarrierError dans les threads de travail – Gribouillis

Répondre

1

Si vous ne souhaitez pas utiliser abort, vous pouvez avoir deux appels à Barrier.wait dans chaque thread. Cela casserait l'opération en deux parties. Dans la première partie, les threads de travail feraient leur travail et le thread principal mettrait à jour le statut de l'indicateur. Ensuite, sur la deuxième partie, chaque thread vérifie l'état du drapeau et quitte la boucle si nécessaire.

Au niveau du code, il ressemblerait à quelque chose comme ceci:

# Main 
def superstep(self): 
    workers = [] 
    barrier = threading.Barrier(self.num_workers+1) 
    for vertex in self.vertices: 
     worker = Worker(vertex, barrier) 
     workers.append(worker) 
     worker.start() 

    while self.flag: 
     barrier.wait() 
     self.distributeMessages() 
     self.count += 1 
     print ("superstep: ", self.count) 
     self.flag = self.isTerminated() 
     for worker in workers: 
      worker.flag = self.flag 
     barrier.wait() 

    for worker in workers: 
     worker.join() 

# Worker 
def run(self): 
    while self.flag: 
     self.barrier.wait() 
     # do something 
     self.barrier.wait() 
+0

Merci, cette solution fonctionne bien et j'en apprends beaucoup. – fancyqlx

1

Vous pouvez appeler

self.barrier.abort() 

pour libérer les attente des travailleurs après la deuxième boucle, et attraper BrokenBarrierError dans run() méthode du travailleur .

+0

Merci encore, la réponse que j'ai acceptée est également très belle. – fancyqlx

+0

Très bien, très bien. – Gribouillis