2010-04-07 1 views
7

I recently asked about trying to optimise a Python loop for a scientific application, et reçu an excellent, smart way of recoding it within NumPy which reduced execution time by a factor of around 100 pour moi!Réécriture d'une boucle for en NumPy pur pour réduire le temps d'exécution

Toutefois, le calcul de la valeur B est en fait imbriqué dans quelques autres boucles, car il est évalué sur une grille de positions régulière. Existe-t-il une réécriture NumPy intelligente pour raser cette procédure? Je soupçonne que le gain de performance pour cette partie serait moins marqué, et les inconvénients seraient probablement qu'il ne serait pas possible de rapporter à l'utilisateur sur la progression du calcul, que les résultats n'ont pas pu être écrits à le fichier de sortie jusqu'à la fin du calcul, et éventuellement que cela en une étape énorme aurait des implications de mémoire? Est-il possible de contourner l'un de ceux-ci?

import numpy as np 
import time 

def reshape_vector(v): 
    b = np.empty((3,1)) 
    for i in range(3): 
     b[i][0] = v[i] 
    return b 

def unit_vectors(r): 
    return r/np.sqrt((r*r).sum(0)) 

def calculate_dipole(mu, r_i, mom_i): 
    relative = mu - r_i 
    r_unit = unit_vectors(relative) 
    A = 1e-7 

    num = A*(3*np.sum(mom_i*r_unit, 0)*r_unit - mom_i) 
    den = np.sqrt(np.sum(relative*relative, 0))**3 
    B = np.sum(num/den, 1) 
    return B 

N = 20000 # number of dipoles 
r_i = np.random.random((3,N)) # positions of dipoles 
mom_i = np.random.random((3,N)) # moments of dipoles 
a = np.random.random((3,3)) # three basis vectors for this crystal 
n = [10,10,10] # points at which to evaluate sum 
gamma_mu = 135.5 # a constant 

t_start = time.clock() 
for i in range(n[0]): 
    r_frac_x = np.float(i)/np.float(n[0]) 
    r_test_x = r_frac_x * a[0] 
    for j in range(n[1]): 
     r_frac_y = np.float(j)/np.float(n[1]) 
     r_test_y = r_frac_y * a[1] 
     for k in range(n[2]): 
      r_frac_z = np.float(k)/np.float(n[2]) 
      r_test = r_test_x +r_test_y + r_frac_z * a[2] 
      r_test_fast = reshape_vector(r_test) 
      B = calculate_dipole(r_test_fast, r_i, mom_i) 
      omega = gamma_mu*np.sqrt(np.dot(B,B)) 
      # write r_test, B and omega to a file 
    frac_done = np.float(i+1)/(n[0]+1) 
    t_elapsed = (time.clock()-t_start) 
    t_remain = (1-frac_done)*t_elapsed/frac_done 
    print frac_done*100,'% done in',t_elapsed/60.,'minutes...approximately',t_remain/60.,'minutes remaining' 

Répondre

2

Une chose évidente que vous pouvez faire est de remplacer la ligne

r_test_fast = reshape_vector(r_test) 

avec

r_test_fast = r_test.reshape((3,1)) 

ne sera probablement pas faire une grande différence dans la performance, mais en tout cas il est logique utiliser les builtins numpy au lieu de réinventer la roue. De manière générale, comme vous l'avez probablement déjà remarqué, l'optimisation de numpy consiste à exprimer l'algorithme à l'aide d'opérations entières numpy ou au moins avec des tranches au lieu d'itérer sur chaque élément du code python. Ce qui tend à empêcher ce type de "vectorisation" est ce que l'on appelle des dépendances portées en boucle, c'est-à-dire des boucles où chaque itération dépend du résultat d'une itération précédente. En regardant brièvement votre code, vous n'avez pas une telle chose, et il devrait être possible de vectoriser votre code très bien.

EDIT: Une solution

Je n'ai pas vérifié cela est correct, mais devrait vous donner une idée de la façon de l'aborder. Pour commencer, prenez le cartesian() function, which we'll use. Alors

 

def calculate_dipole_vect(mus, r_i, mom_i): 
    # Treat each mu sequentially 
    Bs = [] 
    omega = [] 
    for mu in mus: 
     rel = mu - r_i 
     r_norm = np.sqrt((rel * rel).sum(1)) 
     r_unit = rel/r_norm[:, np.newaxis] 
     A = 1e-7 

     num = A*(3*np.sum(mom_i * r_unit, 0)*r_unit - mom_i) 
     den = r_norm ** 3 
     B = np.sum(num/den[:, np.newaxis], 0) 
     Bs.append(B) 
     omega.append(gamma_mu * np.sqrt(np.dot(B, B))) 
    return Bs, omega 


# Transpose to get more "natural" ordering with row-major numpy 
r_i = r_i.T 
mom_i = mom_i.T 

t_start = time.clock() 
r_frac = cartesian((np.arange(n[0])/float(n[0]), 
        np.arange(n[1])/float(n[1]), 
        np.arange(n[2])/float(n[2]))) 
r_test = np.dot(r_frac, a) 
B, omega = calculate_dipole_vect(r_test, r_i, mom_i) 

print 'Total time for vectorized: %f s' % (time.clock() - t_start) 
 

Eh bien, dans mes tests, c'est en fait légèrement plus lent que l'approche en boucle à partir de laquelle je suis parti. Le fait est que, dans la version originale de la question, il était déjà vectorisé avec des opérations de tableaux entiers sur des tableaux de forme (20000, 3), donc toute autre vectorisation n'apporte pas vraiment d'avantage supplémentaire. En fait, cela peut aggraver les performances, comme ci-dessus, peut-être en raison de grandes baies temporaires.

+0

Je pense que la suggestion de Justin de profiler était probablement sage, mais merci beaucoup pour cela ... bien que je ne sois pas sûr de l'utiliser, je pense qu'essayer de comprendre cet exemple est probablement une très bonne façon d'apprendre. :) – Statto

2

Si vous codez votre code profile, vous verrez que 99% du temps d'exécution est dans calculate_dipole, donc la réduction du temps pour cette boucle ne donnera pas vraiment une réduction notable du temps d'exécution. Vous devez toujours vous concentrer sur calculate_dipole si vous voulez accélérer le processus. J'ai essayé mon code Cython pour calculate_dipole à ce sujet et j'ai obtenu une réduction d'environ un facteur de 2 dans le temps global. Il pourrait y avoir d'autres façons d'améliorer le code Cython.