0

J'ai un problème de multi-traitement en python. Dans le code ci-dessous, j'appelle 7 workers (multiprocessing.Process) et un résultat threading.Thread. Avant et après le traitement des données (extraction des métadonnées de fichiers), je lance:Multiprocessing Python et trop de fichiers ouverts

lsof | grep ' <user> ' | grep 'python3' 

Et je reçois des poignées ouvertes comme:

python3 17291    ivo DEL  REG    0,20    5288943 /dev/shm/ZMcs2H 
python3 17291    ivo DEL  REG    0,20    5288942 /dev/shm/3iMR4q 
python3 17291    ivo DEL  REG    0,20    5288941 /dev/shm/XPYh79 

et lors de l'exécution multitraitement à plusieurs reprises en boucle (traitement certains messages continus) Je reçois

OSError: [Errno 24] Too many open files 

Y a-t-il quelque chose qui ne va pas avec le traitement multi-processeurs?

def worker_process_results(meta_queue, res_dict): 
    while True: 
     try: 
      (path, meta) = meta_queue.get() 
      res_dict[path] = meta 
     finally: 
      meta_queue.task_done() 

def multiprocess_get_metadata(paths, thread_count = 7): 
    """ Scan files for metadata (multiprocessing). """ 
    file_queue = multiprocessing.JoinableQueue() 
    meta_queue = multiprocessing.JoinableQueue() 

    res_dict = dict() 
    # result thread  
    meta_thread = threading.Thread(target = lambda: worker_process_results(meta_queue, res_dict)) 
    meta_thread.daemon = True 
    meta_thread.start() 

    workers = [] 

    for _ in range(0, min(thread_count, len(paths))): 
     worker = MetaDataWorker(file_queue, meta_queue) 
     worker.daemon = True 
     worker.start()   
     workers.append(worker) 

    for path in paths: 
     file_queue.put(path) 

    file_queue.join() 
    meta_queue.join() 

    for x in workers: 
     x.terminate() 

    return res_dict 

class MetaDataWorker(multiprocessing.Process): 
    ''' Use library to get meta data from file. ''' 

    def __init__(self, file_queue, meta_queue): 
     ''' Constructor. ''' 
     super().__init__() 

     self.file_queue = file_queue 
     self.meta_queue = meta_queue 

    def run(self): 
     """ Run. """ 

     while True: 
      try: 
       path = self.file_queue.get() 
       meta = getmetadata(path) 
       meta = None 
       self.meta_queue.put((path, meta)) 
      except Exception as err: 
       print("Thread end.") 
       print("{0}".format(err)) 
      finally: 
       self.file_queue.task_done() 
+0

Vous parlez de « 7 travailleurs », mais le code ci-dessus crée un sous-processus pour chaque chemin passé – jsbueno

+0

non -> uniquement lorsque j'ai trop peu de chemins -> pour _ dans la plage (0, min (thread_count, len (chemins))) – user3691223

Répondre

0

déjà résolu, je devais envoyer des signaux de fin pour les travailleurs et les fils de résultat pour arrêter la boucle sans fin