J'ai un problème de multi-traitement en python. Dans le code ci-dessous, j'appelle 7 workers (multiprocessing.Process) et un résultat threading.Thread. Avant et après le traitement des données (extraction des métadonnées de fichiers), je lance:Multiprocessing Python et trop de fichiers ouverts
lsof | grep ' <user> ' | grep 'python3'
Et je reçois des poignées ouvertes comme:
python3 17291 ivo DEL REG 0,20 5288943 /dev/shm/ZMcs2H
python3 17291 ivo DEL REG 0,20 5288942 /dev/shm/3iMR4q
python3 17291 ivo DEL REG 0,20 5288941 /dev/shm/XPYh79
et lors de l'exécution multitraitement à plusieurs reprises en boucle (traitement certains messages continus) Je reçois
OSError: [Errno 24] Too many open files
Y a-t-il quelque chose qui ne va pas avec le traitement multi-processeurs?
def worker_process_results(meta_queue, res_dict):
while True:
try:
(path, meta) = meta_queue.get()
res_dict[path] = meta
finally:
meta_queue.task_done()
def multiprocess_get_metadata(paths, thread_count = 7):
""" Scan files for metadata (multiprocessing). """
file_queue = multiprocessing.JoinableQueue()
meta_queue = multiprocessing.JoinableQueue()
res_dict = dict()
# result thread
meta_thread = threading.Thread(target = lambda: worker_process_results(meta_queue, res_dict))
meta_thread.daemon = True
meta_thread.start()
workers = []
for _ in range(0, min(thread_count, len(paths))):
worker = MetaDataWorker(file_queue, meta_queue)
worker.daemon = True
worker.start()
workers.append(worker)
for path in paths:
file_queue.put(path)
file_queue.join()
meta_queue.join()
for x in workers:
x.terminate()
return res_dict
class MetaDataWorker(multiprocessing.Process):
''' Use library to get meta data from file. '''
def __init__(self, file_queue, meta_queue):
''' Constructor. '''
super().__init__()
self.file_queue = file_queue
self.meta_queue = meta_queue
def run(self):
""" Run. """
while True:
try:
path = self.file_queue.get()
meta = getmetadata(path)
meta = None
self.meta_queue.put((path, meta))
except Exception as err:
print("Thread end.")
print("{0}".format(err))
finally:
self.file_queue.task_done()
Vous parlez de « 7 travailleurs », mais le code ci-dessus crée un sous-processus pour chaque chemin passé – jsbueno
non -> uniquement lorsque j'ai trop peu de chemins -> pour _ dans la plage (0, min (thread_count, len (chemins))) – user3691223