0

J'utilise une structure maître-esclaves pour mettre en oeuvre un calcul parallèle. Un processus maître unique (0) des données sur les charges, et distribue les morceaux et les instructions pour les processus esclaves (1 - N) qui font le levage de charges lourdes, en utilisant de grands objets ... blablabla. Le problème est l'utilisation de la mémoire, que je surveille en utilisant resource.getrusage(resource.RUSAGE_SELF).ru_maxrss sur chaque processus esclave.mémoire Libération en boucles python processus parallèle

La première tâche utilise environ 6 Go de mémoire, comme prévu, mais lorsque l'esclave reçoit la deuxième tâche, il gonfle jusqu'à un peu plus de 10 Go, comme si la mémoire précédente n'était pas collectée. Ma compréhension était que dès qu'une variable perd ses références (dans le code ci-dessous, lorsque la variable _gwb est réinitialisée) garbage collection devrait nettoyer la maison. Pourquoi cela ne se passe-t-il pas?

Would jeter dans un del _gwb à la fin de chaque aide en boucle?
Qu'en est-il un appel manuel à gc.collect()?
Ou dois-je reproduire subprocess es comme described in this answer?

J'utilise mpi4py sur un cluster géré SLURM.

Le processus maîtreressemble à quelque chose comme:

for jj, tt in enumerate(times): 

    for ii, sim in enumerate(sims): 

     search = True 
     # Find a slave to give this task to 
     while search: 
      # Repackage HDF5 data into dictionary to work with MPI 
      sim_dat = ... # load some data 

      # Look for available slave process 
      data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG) 
      src = stat.Get_source() 

      # Store Results 
      if tag == TAGS.DONE: 
       _store_slave_results(data, ...) 
       num_done += 1 
      elif tag == TAGS.READY: 
       # Distribute tasks 
       comm.send(sim_data, dest=src, tag=TAGS.START) 
       # Stop searching, move to next task 
       search = False 

      cycles += 1 

Et les esclaves:

while True: 
    # Tell Master this process is ready 
    comm.send(None, dest=0, tag=TAGS.READY) 
    # Receive ``task`` ([number, gravPot, ndensStars]) 
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) 
    tag = stat.Get_tag() 

    if tag == TAGS.START: 
     _gwb = Large_Data_Structure(task) 
     data = _gwb.do_heavy_lifting(task) 
     comm.send(data, dest=0, tag=TAGS.DONE) 
    elif tag == TAGS.EXIT: 
     break 

    cycles += 1 

Edit: D'autres subtilités étranges (dans le cas où ils pourraient être pertinents) :
1) seulement certains processus montrent la mémoire en pleine croissance, r rester à peu près le même;
2) La quantité de mémoire active est différente sur les différents processus esclaves (différents par 100s of MB ... même si elles doivent nécessairement être en cours d'exécution le même code!

Répondre

1

del _gwb devrait faire une grande différence. Avec _gwb = Large_Data_Structure(task) Les nouvelles données sont générées et ensuite assignées à _gwd.Lorsque les anciennes données sont libérées.Un del spécifique va se débarrasser de l'objet tôt.Vous pouvez toujours voir une augmentation de la mémoire pour la deuxième boucle - Python libère l'objet dans son tas, mais il n'y a rien à dire que la prochaine allocation obtiendra exactement le même tas de mémoire

Le garbage collector n'intervient que dans les cas où le comptage régulier des références n'est pas suffisant pour déclencher la libération de la mémoire. En supposant que do_heavy_lifting ne fasse rien de génial, cela ne fera pas de différence.

Vous mentionnez subprocess ... une autre option sur les systèmes Linux est os.fork. Le processus enfant obtient une vue copy-on-write de l'espace adresse parent. Le gros objet est généré dans la mémoire enfant et disparaît à la sortie. Je ne peux pas garantir que cela fonctionnera mais serait une expérience intéressante.

while True: 
    # Tell Master this process is ready 
    comm.send(None, dest=0, tag=TAGS.READY) 
    # Receive ``task`` ([number, gravPot, ndensStars]) 
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) 
    tag = stat.Get_tag() 

    if tag == TAGS.START: 
     pid = os.fork() 
     if pid: 
      # parent waits for child 
      os.waitpid(pid) 
     else: 
      # child does work, sends results and exits 
      _gwb = Large_Data_Structure(task) 
      data = _gwb.do_heavy_lifting(task) 
      comm.send(data, dest=0, tag=TAGS.DONE) 
      os._exit() 
    elif tag == TAGS.EXIT: 
     break 

    cycles += 1 
+0

Merci! Je vais essayer. Un couple de subtilités étranges (au cas où elles pourraient être pertinentes): 1) seulement certains processus montrent la mémoire en croissance, d'autres restent à peu près les mêmes; 2) La quantité spécifique de mémoire active est * différente * sur les différents processus esclaves ... même si elles doivent nécessairement exécuter le même code! – DilithiumMatrix

+0

Je ne peux pas dire pourquoi c'est le cas. Vos données calculées peuvent être sensibles aux paramètres entrants (par exemple, 'range (count)' est différent selon que count vaut 1 ou 10000000). Juste une supposition. – tdelaney

+0

Hmm, je ne trouve aucun endroit où les tailles de données devraient changer entre les processus. Je l'ai essayé avec 'del _gwb' et il n'y a aucun changement dans le comportement de la mémoire ... L'objet' _gwb' stocke certaines références à des objets externes, par ex. '_gwb = Large_Data_Structure (task, other_obj)' et dans le constructeur de '_gwb':' (self.other = other_obj) '... cela pourrait-il empêcher la collecte de la mémoire? – DilithiumMatrix