2013-02-03 11 views
22

J'utilise multiprocessing.Pool() pour paralléliser certains calculs lourds.multitraitement avec de grandes données

La fonction cible renvoie beaucoup de données (une liste énorme). Je manque de RAM.

Sans multiprocessing, je changerais simplement la fonction cible en un générateur, par yield les éléments résultants l'un après l'autre, comme ils sont calculés. Je comprends que le multitraitement ne supporte pas les générateurs - il attend la sortie entière et la renvoie immédiatement, n'est-ce pas? Pas de rendement. Est-il possible de faire en sorte que les travailleurs Pool produisent des données dès qu'ils sont disponibles, sans construire le tableau de résultats complet dans la RAM?

Exemple simple:

def target_fnc(arg): 
    result = [] 
    for i in xrange(1000000): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     for element in result: 
      yield element 

Ceci est Python 2.7.

Répondre

15

Cela ressemble à un cas d'utilisation idéal pour une file d'attente: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

simplement vos résultats se nourrissent dans la file d'attente des travailleurs mis en commun et les ingèrent dans le maître. Notez que vous pouvez toujours rencontrer des problèmes de pression de mémoire, sauf si vous vidangez la file d'attente presque aussi vite que les travailleurs la peuplent. Vous pouvez limiter la taille de la file d'attente (le nombre maximal d'objets qui rentrent dans la file d'attente) auquel cas les travailleurs regroupés bloqueront les instructions queue.put jusqu'à ce que de la place soit disponible dans la file d'attente. Cela mettrait un plafond sur l'utilisation de la mémoire. Mais si vous faites cela, il est peut-être temps de reconsidérer si vous avez besoin de la mise en commun du tout et/ou s'il est judicieux d'utiliser moins de travailleurs.

+1

La file d'attente transmet les données décapées. So data -> pickle-> unpickle-> nouvelle copie des données. Cela ralentira le programme et utilisera beaucoup plus de RAM supplémentaire. On devrait envisager d'utiliser la mémoire partagée à la place. – Wang

3

Si vos tâches peuvent renvoyer des données dans des blocs ... peuvent-elles être divisées en tâches plus petites, chacune d'entre elles retournant un seul bloc? Évidemment, ce n'est pas toujours possible. Quand ce n'est pas le cas, vous devez utiliser un autre mécanisme (comme un Queue, comme le suggère Loren Abrams). Mais quand il est, c'est probablement une meilleure solution pour d'autres raisons, ainsi que la résolution de ce problème.

Avec votre exemple, c'est certainement faisable. Par exemple:

def target_fnc(arg, low, high): 
    result = [] 
    for i in xrange(low, high): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    pool_args = [] 
    for low in in range(0, 1000000, 10000): 
     pool_args.extend(args + [low, low+10000] for args in some_args) 
    for result in pool.imap_unordered(target_fnc, pool_args): 
     for element in result: 
      yield element 

(Vous pouvez bien sûr remplacer la boucle avec une compréhension imbriquée ou un zip et flatten, si vous préférez.)

Donc, si some_args est [1, 2, 3], vous obtenez 300 tâches [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …], dont chacune ne renvoie que 10000 éléments au lieu de 1000000.

3

De votre description, il semble que vous ne souhaitiez pas vraiment traiter les données telles qu'elles se présentent, par exemple en évitant de passer un million d'éléments. list retour.

Il y a une façon plus simple de faire cela: Il suffit de mettre les données dans un fichier. Par exemple:

def target_fnc(arg): 
    fd, path = tempfile.mkstemp(text=True) 
    with os.fdopen(fd) as f: 
     for i in xrange(1000000): 
      f.write('dvsdbdfbngd\n') 
    return path 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     with open(result) as f: 
      for element in f: 
       yield element 

Il est évident que si vos résultats peuvent contenir des sauts de ligne, ou ne sont pas des chaînes, etc., vous aurez envie d'utiliser un fichier csv, un numpy, etc. au lieu d'un simple fichier texte, mais l'idée est la même.Cela étant dit, même si cela est plus simple, il y a généralement des avantages à traiter les données à la fois, donc la division de vos tâches ou l'utilisation d'un Queue (comme les deux autres réponses le suggèrent) peut être meilleure. les inconvénients (respectivement, besoin d'un moyen de casser les tâches, ou d'être en mesure de consommer les données aussi vite qu'ils sont produits) ne sont pas disjoncteurs.