2016-03-26 1 views
1

Contexte: J'ai un tableau que j'ai dispersé dans mes moteurs (4 moteurs à ce moment), je veux appliquer une fonction à chaque point du tableau pour un nombre arbitraire d'itérations et rassemblez le tableau résultant des moteurs et effectuez une analyse dessus.Exécution lente iparallel avec scatter/gather

Par exemple I ont le réseau de points de données, qui sont dispersés et que le nombre d'itérations sur chaque point de données:

data_points = range(16) 
iterations = 10 
dview.scatter('points', data_points) 

j'ai une fonction fournie par l'utilisateur en tant que tel, qui est poussé vers les moteurs:

def user_supplied_function(point): 
    return randint(0, point) 

dview.push(dict(function_one = user_supplied_function)) 

Une liste pour mes résultats et l'exécution en parallèle:

result_list = [] 
for i in range(iterations): 
    %px engine_result = [function_one(j) for j in points] 
    result_list.append(dview.gather('engine_result')) 

Problème: Cela fonctionne, et j'obtiens le résultat que je veux des moteurs, cependant, à mesure que le nombre d'itérations augmente, la boucle prend de plus en plus de temps à s'exécuter. Au point où 1000 itérations sur 50 points prend plus de 15 secondes à compléter. Alors qu'une version séquentielle de cette tâche prend moins d'une seconde.

Une idée de ce qui pourrait causer cela? Serait-ce le surcoût du message passant de gather()? Si oui, quelqu'un peut-il suggérer des solutions?

Répondre

0

Compris. C'était l'overhead de gather() et .append() après tout. La solution la plus simple est de gather() après que les moteurs ont fini leur travail, plutôt que de le faire à chaque itération.

Solution

%autopx 
engine_result = [] 
for i in xrange(iterations): 
    engine_result += [[function_one(j) for j in points]] 
%autopx 
result_list = list(dview.gather('engine_result')) 

Ceci, cependant, obtient les résultats dans une liste mal formatée des listes où les résultats de chaque moteur sont placés à côté de l'autre au lieu de commander par numéro d'itération. Les commandes suivantes distribuent les listes et aplatissent les sous-listes pour chaque itération.

gathered_list = [None] * iterations 
gathered_list = [[result_list[j * iterations + i] for j in xrange(len(result_list)/iterations)] for i in xrange(iterations)] 
gathered_list = [reduce(lambda x, y: x.extend(y) or x, z) for z in gathered_list]