2017-09-29 4 views
0

J'ai un problème avec le package multiprocessing.dummy et deepdish pour écrire des fichiers h5 compressés. Ce est ce que je fais:Python multiprocessing.dummy et deepdish ne fonctionnent pas ensemble

import deepdish as dd 
from multiprocessing.dummy import Pool 

def writeThings(args): 
    path, np_array = args 
    dd.io.save(path, {'arr': np_array}, compression='blosc') 

p = Pool(4) 
p.map(writeThings, all_np_arrays_and_paths) 

Tout fonctionne très bien quand je commente la DeepDish commande retirerai. Il semble que dd crée un certain type de fichiers corrompus sur Windows et Python détecte cette erreur, et se bloque juste. Est-ce que quelqu'un sait comment réparer ceci? Merci beaucoup.

+0

Le message d'erreur serait utile. Avez-vous essayé d'encapsuler l'appel de méthode pour enregistrer les données dans un gestionnaire d'exceptions? –

+0

Il n'y a actuellement aucun message d'erreur. Python existe avec un code d'exception 1073741819, et après l'avoir cherché, j'en ai trouvé un qui avait des problèmes avec l'écriture de fichiers h5. – anki

Répondre

0

Pour clarifier les choses, les chemins sont distincts les uns des autres, donc j'écrire dans des fichiers différents. Cependant, cette fonction simple ne fonctionne toujours pas. Cependant, si j'Embed ce dans une classe threading.Thread qui a une serrure et autour de la commande dd.io.save avec lock.acquire et après l'écriture du fichier lock.release tout fonctionne bien.

Voici un extrait de code pour tout le monde:

import threading 

class writeThings(threading.Thread): 
    def __init__(self, args, lock): 
     super().__init__() 
     self.args = args 
     self.lock = lock 

    def run(self): 
     while self.args: 
      path, np_array = self.args.pop() 
      # Give this thread unique writing rights 
      self.lock.acquire() 
      dd.io.save(path, {"arr": np_array}, compression='blosc') 
      self.lock.release() 

lock = threading.Lock() 
n_threads = 4 
threads = [] 

for i in range(n_threads): 
    threads.append(writeThings(args_junk[i],lock)) 

for i in range(n_threads): 
    threads[i].start() 

for i in range(n_threads): 
    threads[i].join()