2017-10-16 4 views
11

Je suis en train de paralléliser un embarrassingly parallèle pour la boucle (previously asked here) et se sont installés sur this implementation qui correspondent à mes paramètres:Pourquoi ne vois-je pas accélérer le multitraitement en Python?

with Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc, 
              total_test_samples, num_classes, num_features, label_set, 
              method_names, pos_class_index, out_results_dir, exhaustive_search]) 
     partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs) 

     with Pool(processes=num_procs) as pool: 
      cv_results = pool.map(partial_func_holdout, range(num_repetitions)) 

La raison pour laquelle je dois utiliser un proxy object (partagé entre les processus) est le premier élément de la liste proxy partagée datasets qui est une liste de grands objets (chacun environ 200-300 Mo). Cette liste datasets a généralement 5-25 éléments. J'ai généralement besoin d'exécuter ce programme sur un cluster HPC. Voici la question, quand je lance ce programme avec 32 processus et 50 Go de mémoire (num_repetitions = 200, avec des jeux de données étant une liste de 10 objets, chaque 250 Mo), je ne vois pas une accélération même par un facteur de 16 (avec 32 processus parallèles). Je ne comprends pas pourquoi - des indices? Des erreurs évidentes, ou de mauvais choix? Où puis-je améliorer cette mise en œuvre? Des alternatives?

Je suis sûr que cela a été discuté auparavant, et les raisons peuvent être variées et très spécifiques à la mise en œuvre - d'où je vous demande de me fournir vos 2 cents. Merci.

Mise à jour: J'ai fait du profilage avec cProfile pour avoir une meilleure idée - voici quelques informations, triées par temps cumulé.

In [19]: p.sort_stats('cumulative').print_stats(50) 
Mon Oct 16 16:43:59 2017 profiling_log.txt 

     555404 function calls (543552 primitive calls) in 662.201 seconds 

    Ordered by: cumulative time 
    List reduced from 4510 to 50 due to restriction <50> 

    ncalls tottime percall cumtime percall filename:lineno(function) 
    897/1 0.044 0.000 662.202 662.202 {built-in method builtins.exec} 
     1 0.000 0.000 662.202 662.202 test_rhst.py:2(<module>) 
     1 0.001 0.001 661.341 661.341 test_rhst.py:70(test_chance_classifier_binary) 
     1 0.000 0.000 661.336 661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run) 
     4 0.000 0.000 661.233 165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait) 
     4 0.000 0.000 661.233 165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait) 
     23 661.233 28.749 661.233 28.749 {method 'acquire' of '_thread.lock' objects} 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map) 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get) 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait) 
    866/8 0.004 0.000 0.868 0.108 <frozen importlib._bootstrap>:958(_find_and_load) 
    866/8 0.003 0.000 0.867 0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked) 
    720/8 0.003 0.000 0.865 0.108 <frozen importlib._bootstrap>:641(_load_unlocked) 
    596/8 0.002 0.000 0.865 0.108 <frozen importlib._bootstrap_external>:672(exec_module) 
    1017/8 0.001 0.000 0.863 0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed) 
    522/51 0.001 0.000 0.765 0.015 {built-in method builtins.__import__} 

Les informations de profilage maintenant trié par time

In [20]: p.sort_stats('time').print_stats(20) 
Mon Oct 16 16:43:59 2017 profiling_log.txt 

     555404 function calls (543552 primitive calls) in 662.201 seconds 

    Ordered by: internal time 
    List reduced from 4510 to 20 due to restriction <20> 

    ncalls tottime percall cumtime percall filename:lineno(function) 
     23 661.233 28.749 661.233 28.749 {method 'acquire' of '_thread.lock' objects} 
    115/80 0.177 0.002 0.211 0.003 {built-in method _imp.create_dynamic} 
     595 0.072 0.000 0.072 0.000 {built-in method marshal.loads} 
     1 0.045 0.045 0.045 0.045 {method 'acquire' of '_multiprocessing.SemLock' objects} 
    897/1 0.044 0.000 662.202 662.202 {built-in method builtins.exec} 
     3 0.042 0.014 0.042 0.014 {method 'read' of '_io.BufferedReader' objects} 
2037/1974 0.037 0.000 0.082 0.000 {built-in method builtins.__build_class__} 
     286 0.022 0.000 0.061 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat) 
    2886 0.021 0.000 0.021 0.000 {built-in method posix.stat} 
     79 0.016 0.000 0.016 0.000 {built-in method posix.read} 
     597 0.013 0.000 0.021 0.000 <frozen importlib._bootstrap_external>:830(get_data) 
     276 0.011 0.000 0.013 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset) 
     108 0.011 0.000 0.038 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser) 
    1225 0.011 0.000 0.050 0.000 <frozen importlib._bootstrap_external>:1233(find_spec) 
    7179 0.009 0.000 0.009 0.000 {method 'splitlines' of 'str' objects} 
     33 0.008 0.000 0.008 0.000 {built-in method posix.waitpid} 
     283 0.008 0.000 0.015 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines) 
     3 0.008 0.003 0.008 0.003 {method 'poll' of 'select.poll' objects} 
    7178 0.008 0.000 0.008 0.000 {method 'expandtabs' of 'str' objects} 
     597 0.007 0.000 0.007 0.000 {method 'read' of '_io.FileIO' objects} 

Plus d'informations profilage triés par percall info: profiling info sorted by percall

Mise à jour 2

Les éléments dans la grande liste datasets je mentionne ed plus tôt ne sont généralement pas aussi gros - ils sont généralement 10-25 Mo chacun. Mais en fonction de la précision en virgule flottante utilisée, du nombre d'échantillons et de fonctionnalités, cela peut facilement atteindre 500 Mo-1 Go par élément également. par conséquent je préférerais une solution qui puisse évoluer.

Mise à jour 3:

Le code à l'intérieur holdout_trial_compare_datasets utilise la méthode GridSearchCV de scikit-learn, qui utilise en interne la bibliothèque JOBLIB si nous fixons n_jobs> 1 (ou chaque fois que nous même y). Cela pourrait conduire à de mauvaises interactions entre le multi-traitement et joblib. Donc en essayant une autre configuration où je ne mets pas du tout n_jobs (qui devrait par défaut pas de parallélisme dans scikit-learn). Vous tiendrons au courant.

+0

avez-vous fait le profilage? – georgexsh

+0

pas encore, comme les paramètres pour lesquels je voudrais le tester (16-32 processus, avec 10-15 jeux de données) exigent que je l'exécute sur un cluster et je ne sais pas comment profiler les programmes python sur la ligne de commande . Je vais regarder dans bientôt. –

+0

mon 2 ¢: si votre grand objet de données ne passe que de parent à enfant, 'Manager' semble trop puissant, vous pouvez le charger dans une variable globale dans le parent, puis il sera partagé avec child après' fork() '. – georgexsh

Répondre

0
{method 'acquire' of '_thread.lock' objects} 

En regardant votre sortie profileur Je dirais que les frais généraux de verrouillage objet partagé/déverrouillage accable les gains de vitesse de multithreading. Refaire le travail de refactorisation pour que le travail soit étendu aux travailleurs qui n'ont pas besoin de se parler autant.

Spécifiquement, si possible, dérivez une réponse par pile de données et ensuite agir sur les résultats accumulés. C'est pourquoi les files d'attente peuvent sembler beaucoup plus rapides: elles impliquent un type de travail qui ne nécessite pas un objet qui doit être «géré» et donc verrouillé/déverrouillé.

Seulement «gérer» les choses qui doivent absolument être partagées entre les processus. Votre liste gérée contient des objets Admirer très compliqués ...

Un paradigme est plus rapide:

allwork = manager.list([a, b,c]) 
theresult = manager.list() 

puis

while mywork: 
    unitofwork = allwork.pop() 
    theresult = myfunction(unitofwork) 
+0

Merci pour les suggestions et la réponse. En fait, le travail dans différents processus n'a pas besoin de se parler tous, c'est-à-dire, une fois qu'ils ont accédé à la liste de données partagées 'shared_inputs', tout ce qui est à l'intérieur de l'unité de travail est indépendant des autres. –

+0

Et je ne suis pas sûr de savoir comment ma suggestion implémentée est différente de vos suggestions. Pourriez-vous essayer d'élaborer davantage, ou peut-être écrire une mise en œuvre plus détaillée qui peut être permutée à la place de ce que je montre au sommet? ce serait génial, merci. –

0

Si vous n'avez pas besoin d'un objet complexe partagé, seul usage une liste des objets les plus simples imaginables. Puis dites aux travailleurs d'acquérir les données complexes qu'ils peuvent traiter dans leur propre petit monde.

Essayez:

allwork = manager.list([datasetid1, datasetid2 ,...]) 
theresult = manager.list() 

while mywork: 
    unitofworkid = allwork.pop() 
    theresult = myfunction(unitofworkid) 

def myfunction(unitofworkid): 
    thework = acquiredataset(unitofworkid) 
    result = holdout_trial_compare_datasets(thework, ...) 

J'espère que cela a du sens. Il ne devrait pas prendre trop de temps pour refactoriser dans cette direction. Et vous devriez voir que {méthode 'acquérir' des objets '_thread.lock'} numéro tomber comme un rocher lorsque vous profilez.

+0

Merci, j'ai besoin d'y penser. Cela conduirait à d'énormes E/S dans ma situation - mais il faut penser si c'est seulement une petite fraction de temps par rapport au calcul parallèle réel. –

2

Sur la base de discussion dans les commentaires, je l'ai fait une mini-expérience, comparé trois versions de mise en œuvre:

  • v1: essentiellement même que votre approche, en fait, comme partial(f1, *shared_inputs) désarchivera proxy_manager.list immédiatement, Manager.List pas impliqué ici, les données transmises au travailleur avec la file d'attente interne de Pool.
  • v2: v2 en cours d'utilisation Manager.List, la fonction de travail recevra un objet ListProxy, il récupère les données partagées via une connexion interne à un processus serveur.
  • v3: données de partage de processus enfant provenant du parent, profitez de l'appel système fork(2).

def f1(*args): 
    for e in args[0]: pow(e, 2) 

def f2(*args): 
    for e in args[0][0]: pow(e, 2) 

def f3(n): 
    for i in datasets: pow(i, 2) 

def v1(np): 
    with mp.Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets,]) 
     pf = partial(f1, *shared_inputs) 
     with mp.Pool(processes=np) as pool: 
      r = pool.map(pf, range(16)) 

def v2(np): 
    with mp.Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets,]) 
     pf = partial(f2, shared_inputs) 
     with mp.Pool(processes=np) as pool: 
      r = pool.map(pf, range(16)) 

def v3(np): 
    with mp.Pool(processes=np) as pool: 
     r = pool.map(f3, range(16)) 

datasets = [2.0 for _ in range(10 * 1000 * 1000)] 
for f in (v1, v2, v3): 
    print(f.__code__.co_name) 
    for np in (2, 4, 8, 16): 
     s = time() 
     f(np) 
     print("%s %.2fs" % (np, time()-s)) 

résultats pris sur un 16 noyau E5-2682 VPC, il est évident que les échelles v3 mieux: result

+0

Merci George - m'aide vraiment à me rapprocher de la compréhension du problème. Je suis vraiment curieux à propos de '* shared_inputs' déballage immédiatement - je ne me rendais pas compte que cela arrive .. chose semblable se produirait lorsque nous appliquons' functools.partial' aussi, car il essaie de geler les entrées et la signature, non? Ainsi, avec ou sans déballage, les données doivent avoir été transmises aux processus enfants via le décapage, n'est-ce pas? cela signifie que le seul que nous pouvons éviter est de les utiliser comme variables globales et de les accéder à partir de processus enfants. Des dangers avec cette méthode? Aucun changement d'entrée ne sera effectué dans les processus enfants. –

+0

comme liste déballée '* shared_inputs' avant que le paramètre partial ne prenne effet, il" gèle "les objets que contient la liste (proxy), pas le proxy lui-même, v2 un exemple pour utiliser' Manager.List' avec partial; Oui, les données ont été décapées/décapées en passant; Dans le contexte de votre problème et de votre charge de travail, il n'y a rien à craindre, si vous passez quelque chose associé à des ressources externes comme un objet socket, un descripteur de fichier, vous devez être plus prudent. – georgexsh

+0

merci George pour votre aide - je pense que nous devons mieux établir les goulots d'étranglement .. esp avec des objets volumineux (10 éléments de 20 Mo) et des calculs complexes (quelque chose de plus exigeant que des quadrillages réguliers) Vos commentaires ont été très utiles et j'espère pouvoir le comprendre. –