2016-03-18 1 views
4

Comment puis-je modifier le code suivant (adapté de http://materials.jeremybejarano.com/MPIwithPython/pointToPoint.html) de sorte que chaque instance comm.Send soit reçue par root = 0 et que la sortie soit imprimée. Pour l'instant, seule la première commande d'envoi est reçue.Recevoir plusieurs commandes d'envoi en utilisant mpi4py

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
from mpi4py.MPI import ANY_SOURCE 
import numpy as np 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 


else: 
    for i in range(0,np.random.randint(1,10),1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0) 


if rank == 0: 
    comm.Recv(randNum, ANY_SOURCE) 
    print "Process", rank, "received the number", randNum[0] 
+0

Juste pour clarifier/confirmer. Ai-je raison de supposer que vous n'avez aucune chance de savoir d'avance combien de messages chaque rang enverra, même sur le rang lui-même? – Zulan

+0

Idéalement, je voudrais envoyer des messages de chaque rang sans avoir à connaître le nombre de messages que le rang enverra. Si ce n'est pas possible, il serait possible de calculer dans le rang combien de messages seront envoyés, mais cela serait différent pour chaque rang. – 218

Répondre

3

Si vous ne connaissez pas le nombre de messages que vous allez envoyer, vous devez introduire un message indiquant la fin des messages. Vous pouvez l'utiliser de manière générique en utilisant des tags spéciaux. Pour éviter de fournir un tampon désadaptation pour le message de terminaison, vous pouvez utiliser probe vérifier quel type de message vient à

tag_data = 42 
tag_end = 23 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 
else: 
    for i in range(0,np.random.randint(1,10),1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0, tag=tag_data) 
    # send the termination message. Using the lower-case interface is simpler 
    comm.send(None, dest=0, tag=tag_end) 

if rank == 0: 
    # For debugging it might be better to use a list of still active procsses 
    remaining = comm.Get_size() - 1 
    while remaining > 0: 
     s = MPI.Status() 
     comm.Probe(status=s) 
     # make sure we post the right kind of message 
     if s.tag == tag_data: 
      comm.Recv(randNum, s.source, tag=tag_data) 
      print "Process ", s.source, " received the number", randNum[0] 
     elif s.tag == tag_end: 
      # don't need the result here 
      print "Process ", rank, " is done" 
      comm.recv(source=s.source, tag=tag_end) 
      remaining -= 1 

Il existe de nombreuses variantes de cela. Par exemple, si vous savez qu'un message est le dernier message, vous pouvez fusionner le message de fin.

+0

Pour le moment, cela échoue pour moi sur la ligne 'comm.probe (status = s)' avec l'erreur 'comm.probe (status = s) AttributeError: 'mpi4py.MPI.Intracomm' objet n'a pas d'attribut 'probe'' . En outre, y a-t-il une signification aux numéros d'étiquettes que vous avez choisis, ou est-il juste nécessaire d'identifier 'tag_end' séparément à' tag_data' et n'importe quel nombre le ferait? Ou certains numéros sont-ils réservés à des processus particuliers? – 218

+0

Peut-être 'probe' a été introduit dans la version 2.0.0 de mpi4py. Juste substituer avec le «Sonde» majuscule, ne devrait faire aucune différence. Les valeurs des balises sont complètement arbitraires, elles doivent simplement être distinctes. Les valeurs réelles dans mes exemples sont des références à [42] (https://en.wikipedia.org/wiki/42_%28number%29#Hitchhiker.27s_Guide_to_the_Galaxy) et [23] (https://en.wikipedia.org/ wiki/23_% 28film% 29), même si je n'ai pas réellement vu ce dernier. – Zulan

+0

Il semble que 'comm.probe' ne soit venu que dans mpi4py version 2.0.0. J'ai une installation 'python3' utilisant ceci et le code original que vous avez posté fonctionne (évidemment avec des instructions' print' modifiées). Toutefois, avec l'ancienne version de mpi4py 'comm.Probe' provoque le blocage du processus. Toujours dans la ligne 'print" Process ", rank," is done "' cela imprime toujours 'rank = 0'. Vraisemblablement remplacer 'rank' par' s.source' affichera le 'rank' qui vient de se terminer et qui a envoyé' tag_end'? – 218

1

Si chaque processus connaît le nombre de messages à envoyer, les étapes suivantes peuvent être conçues pour résoudre le problème:

1) Réduire le nombre de message à envoyer au processus racine. Chaque processus envoie à la racine le nombre de messages qu'il enverra plus tard. Cette opération est appelée une réduction et elle peut être réalisée par la fonction comm.reduce(...)

2) Recevoir tous les messages sur le processus 0.

Voici un code basé sur la vôtre, qui devrait faire l'affaire. Il peut être couru par mpirun -np 4 python main.py

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
from mpi4py.MPI import ANY_SOURCE 
import numpy as np 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 

#just in case, if numpy.random is seed with 
np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank) 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 
    nb=np.empty((1,),dtype=int) 
    nb0=np.zeros((1,),dtype=int) 
    comm.Reduce([nb0, MPI.INT],[nb, MPI.INT],op=MPI.SUM, root=0) #sums the total number of random number from every process on rank 0, in nb. 
    #print "rank"+str(rank)+" nb "+str(nb) 
else: 
    nb=np.empty((1,),dtype=int) 
    nb[0]=np.random.randint(1,10) 
    #print "rank"+str(rank)+" nb "+str(nb) 
    comm.Reduce([nb, MPI.INT],None,op=MPI.SUM, root=0) 
    for i in range(0,nb[0],1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0) 



if rank == 0: 
    for i in range(nb[0]): #receives nb message, each one with its int. 
     comm.Recv(randNum, ANY_SOURCE) 
     print "Process", rank, "received the number", randNum[0] 

Selon le documentation of numpy.random() le générateur de nombres pseudo-aléatoires Mersenne Twister est initialement ensemencé par un nombre extrait de /dev/urandom (ou l'analogue Windows) si elle est disponible ou de graines de l'horloge autrement. Ainsi, dans le dernier cas, tous les processus peuvent recevoir la même graine et générer les mêmes nombres aléatoires. Pour éviter cela, j'ai ajouté la ligne suivante:

np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)