2016-03-16 4 views
0

J'ai l'MWE suivant à l'aide comm.Scatterv et comm.Gatherv de distribuer un tableau 4D à travers un certain nombre de noyaux (size)Le long de quel axe la fonction Scatterv de mpi4py scinde-t-elle un tableau numpy?

import numpy as np 
from mpi4py import MPI 
import matplotlib.pyplot as plt 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

if rank == 0: 
    test = np.random.rand(411,48,52,40) #Create array of random numbers 
    outputData = np.zeros(np.shape(test)) 
    split = np.array_split(test,size,axis = 0) #Split input array by the number of available cores 

    split_sizes = [] 

    for i in range(0,len(split),1): 
     split_sizes = np.append(split_sizes, len(split[i])) 

    displacements = np.insert(np.cumsum(split_sizes),0,0)[0:-1] 

    plt.imshow(test[0,0,:,:]) 
    plt.show() 

else: 
#Create variables on other cores 
    split_sizes = None 
    displacements = None 
    split = None 
    test = None 
    outputData = None 

#Broadcast variables to other cores 
test = comm.bcast(test, root = 0) 
split = comm.bcast(split, root=0) 
split_sizes = comm.bcast(split_sizes, root = 0) 
displacements = comm.bcast(displacements, root = 0) 

output_chunk = np.zeros(np.shape(split[rank])) #Create array to receive subset of data on each core, where rank specifies the core 
print("Rank %d with output_chunk shape %s" %(rank,output_chunk.shape)) 

comm.Scatterv([test,split_sizes, displacements,MPI.DOUBLE],output_chunk,root=0) #Scatter data from test across cores and receive in output_chunk 

output = output_chunk 

plt.imshow(output_chunk[0,0,:,:]) 
plt.show() 

print("Output shape %s for rank %d" %(output.shape,rank)) 

comm.Barrier() 

comm.Gatherv(output,[outputData,split_sizes,displacements,MPI.DOUBLE], root=0) #Gather output data together 

if rank == 0: 
    print("Final data shape %s" %(outputData.shape,)) 
    plt.imshow(outputData[0,0,:,:]) 
    plt.show() 

Cela crée un tableau 4D de nombres aléatoires et, en principe, devrait diviser entre size noyaux avant recombinant. Je m'attendais à Scatterv à diviser le long de l'axe 0 (longueur 411) en fonction des entiers de départ et des déplacements dans les vecteurs split_sizes et displacements. Cependant, j'obtiens une erreur lors de la recombinaison avec Gatherv (mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated) et le tracé de output_chunk sur chaque core montre que la plupart des données d'entrée ont été perdues, il semble donc que la scission ne s'est pas produite le long du premier axe. Mes questions sont les suivantes: Pourquoi le fractionnement ne se produit-il pas le long du premier axe, comment puis-je savoir dans quel axe se produit la division, et est-il possible de changer/spécifier quel axe cela se produit?

+0

'comm.Scatterv' ne sait probablement rien sur le tableau, la forme, les dimensions ou les pas de' numpy'. Au mieux, il peut traiter 'test' comme un bloc de mémoire. En fait, il se peut que ce soit uniquement le pointeur de l'objet tableau, et non son tampon de données. Est-ce que ce code fonctionne avec un tableau 1d? Ou 'test.flatten()'? – hpaulj

Répondre

2

comm.Scatterv et comm.Gatherv ne connaissent rien aux dimensions du tableau numpy. Ils voient juste le sendbuf comme un bloc de mémoire. Par conséquent, il est nécessaire de prendre cela en compte lors de la spécification des sendcounts et displacements (voir http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html pour plus de détails). L'hypothèse est également que les données sont disposées en style C (rangée majeure) en mémoire.

Un exemple de matrice 2D est donné ci-dessous. Les parties clés de ce code doivent définir correctement split_sizes_input/split_sizes_output et displacements_input/displacements_output. Le code prend la deuxième dimension de dimension en compte pour spécifier les divisions correctes dans le bloc de mémoire:

split_sizes_input = split_sizes*512 

Pour des dimensions supérieures, cette ligne serait changé:

split_sizes_input = split_sizes*indirect_dimension_sizes 

indirect_dimension_sizes = npts2*npts3*npts4*....*nptsN

et de même pour split_sizes_output.

Le code crée un tableau 2D avec les numéros 1 à 512 incrémentés sur une dimension. Il est facile de voir à partir des tracés si les données ont été divisées et recombinées correctement.

import numpy as np 
from mpi4py import MPI 
import matplotlib.pyplot as plt 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

if rank == 0: 
    test = np.arange(0,512,dtype='float64') 
    test = np.tile(test,[256,1]) #Create 2D input array. Numbers 1 to 512 increment across dimension 2. 
    outputData = np.zeros([256,512]) #Create output array of same size 
    split = np.array_split(test,size,axis = 0) #Split input array by the number of available cores 

    split_sizes = [] 

    for i in range(0,len(split),1): 
     split_sizes = np.append(split_sizes, len(split[i])) 

    split_sizes_input = split_sizes*512 
    displacements_input = np.insert(np.cumsum(split_sizes_input),0,0)[0:-1] 

    split_sizes_output = split_sizes*512 
    displacements_output = np.insert(np.cumsum(split_sizes_output),0,0)[0:-1] 


    print("Input data split into vectors of sizes %s" %split_sizes_input) 
    print("Input data split with displacements of %s" %displacements_input) 

    plt.imshow(test) 
    plt.colorbar() 
    plt.title('Input data') 
    plt.show() 

else: 
#Create variables on other cores 
    split_sizes_input = None 
    displacements_input = None 
    split_sizes_output = None 
    displacements_output = None 
    split = None 
    test = None 
    outputData = None 

split = comm.bcast(split, root=0) #Broadcast split array to other cores 
split_sizes = comm.bcast(split_sizes_input, root = 0) 
displacements = comm.bcast(displacements_input, root = 0) 
split_sizes_output = comm.bcast(split_sizes_output, root = 0) 
displacements_output = comm.bcast(displacements_output, root = 0) 

output_chunk = np.zeros(np.shape(split[rank])) #Create array to receive subset of data on each core, where rank specifies the core 
print("Rank %d with output_chunk shape %s" %(rank,output_chunk.shape)) 
comm.Scatterv([test,split_sizes_input, displacements_input,MPI.DOUBLE],output_chunk,root=0) 

output = np.zeros([len(output_chunk),512]) #Create output array on each core 

for i in range(0,np.shape(output_chunk)[0],1): 
    output[i,0:512] = output_chunk[i] 

plt.imshow(output) 
plt.title("Output shape %s for rank %d" %(output.shape,rank)) 
plt.colorbar() 
plt.show() 

print("Output shape %s for rank %d" %(output.shape,rank)) 

comm.Barrier() 

comm.Gatherv(output,[outputData,split_sizes_output,displacements_output,MPI.DOUBLE], root=0) #Gather output data together 



if rank == 0: 
    outputData = outputData[0:len(test),:] 
    print("Final data shape %s" %(outputData.shape,)) 
    plt.imshow(outputData) 
    plt.colorbar() 
    plt.show() 
    print(outputData)