1

J'ai un objet compteur au processus racine, je veux le disperser à tous les processus dans un groupe mais la fonction de dispersion donne une erreur (j'ai aussi essayé avec Scatter() mais pas de chance) . J'utilise mpi4py pour le traitement parallèle.Objet de compteur de dispersion pour tous les processus du groupe

Traceback (most recent call last): 
File "tsetscatter.py", line 13, in <module> 
total_counter = comm.scatter(total_counter, root=0) 
File "MPI/Comm.pyx", line 1286, in mpi4py.MPI.Comm.scatter 
(src/mpi4py.MPI.c:109079) 
File "MPI/msgpickle.pxi", line 707, in mpi4py.MPI.PyMPI_scatter 
(src/mpi4py.MPI.c:48114) 
File "MPI/msgpickle.pxi", line 161, in mpi4py.MPI.Pickle.dumpv 
(src/mpi4py.MPI.c:41605) 
ValueError: expecting 8 items, got 5 

Le code source est:

from mpi4py import MPI 
from collections import Counter 

if __name__ == "__main__": 
comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
total_counter = [] 
if rank == 0: 
    lst = [('key1', 2), ('key2', 6), ('key3', 9), ('key4', 4), ('key5', 1)] 
    total_counter = Counter(dict(lst)) 
print total_counter 
total_counter = comm.scatter(total_counter, root=0) 
print total_counter 

Toute aide sur la façon dont cela peut être réalisé est très apprécié.

Répondre

0

j'ai pu disperser en créant des blocs de données (nombre de morceaux = no. De processus)

if rank == 0: 
lst = [('key1', 2), ('key2', 6), ('key3', 9), ('key4', 4), ('key5', 1)] 
total_counter = Counter(dict(lst)) 
chunks = [[]for _ in range(size)] 
for i, chunk in enumerate(total_counter): 
    chunks[i % size].append({chunk: total_counter.get(chunk)}) 
else: 
total_counter = None 
chunks = None 
total_counter = comm.scatter(chunks, root=0) 
print rank, ": ", total_counter 

Il fonctionne comme prévu maintenant.