2016-04-28 2 views
1

J'utilise le module multiprocessing pour faire la recherche d'URL parallèle. Mon code est comme:Python: vérifier le nombre d'éléments traités lors de l'utilisation multiprocessing.map

pat = re.compile("(?P<url>https?://[^\s]+)") 
def resolve_url(text): 
    missing = 0 
    bad = 0 
    url = 'before' 
    long_url = 'after' 
    match = pat.search(text) ## a text looks like "I am at here. http:.....(a URL)" 
    if not match: 
     missing = 1 
    else: 
     url = match.group("url") 
     try: 
      long_url = urllib2.urlopen(url).url 
     except: 
      bad = 1 
    return (url, long_url, missing, bad) 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(100) 
    resolved_urls = pool.map(resolve_url, checkin5) ## checkin5 is a list of texts 

La question est, ma liste checkin5 contient environ 600 000 éléments et ce travail parallèle prend vraiment le temps. Je veux vérifier dans le processus combien d'éléments ont été résolus. Si, dans une simple boucle, je peux le faire comme:

resolved_urls = [] 
now = time.time() 
for i, element in enumerate(checkin5): 
    resolved_urls.append(resolve_url(element)) 
    if i%1000 == 0: 
     print("from %d to %d: %2.5f seconds" %(i-1000, i, time.time()-now)) 
     now = time.time() 

Mais maintenant, je dois augmenter l'efficacité, donc multiprocessus est nécessaire, mais je ne sais pas comment inspecter le processus dans ce cas, tout idée?

Par ailleurs, pour vérifier si la méthode ci-dessus fonctionne également dans ce cas, j'ai essayé un code de jouet:

import multiprocessing 
import time 

def cal(x): 
    res = x*x 
    return res 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(4) 

    t0 = time.time() 
    result_list = pool.map(cal,range(1000000)) 
    print(time.time()-t0) 

    t0 = time.time() 
    for i, result in enumerate(pool.map(cal, range(1000000))): 
     if i%100000 == 0: 
      print("%d elements have been calculated, %2.5f" %(i, time.time()-t0)) 
      t0 = time.time() 

Et les résultats sont les suivants:

0.465271949768 
0 elements have been calculated, 0.45459 
100000 elements have been calculated, 0.02211 
200000 elements have been calculated, 0.02142 
300000 elements have been calculated, 0.02118 
400000 elements have been calculated, 0.01068 
500000 elements have been calculated, 0.01038 
600000 elements have been calculated, 0.01391 
700000 elements have been calculated, 0.01174 
800000 elements have been calculated, 0.01098 
900000 elements have been calculated, 0.01319 

A partir du résultat, je pense que la méthode pour un seul processus ne fonctionne pas ici. Il semble que le pool.map sera appelé en premier et après le calcul est terminé et on obtient la liste complète, le enumerate commence .... Ai-je raison?

+0

Énumérer 'pool.map (resolve_url, checkin5)' et ajouter à la liste de sortie? –

+0

@YakymPirozhenko Je me demandais si l'enumerate est en temps réel ou en fait après que tous les éléments ont été traités ... et il est difficile de vérifier ... – gladys0313

+0

@YakymPirozhenko je pense que tous les éléments après énumère ont été traités, un coup d'oeil à ma question éditée, j'ajoute un essai simple – gladys0313

Répondre

2

Vous devriez être en mesure de le faire avec Pool.imap ou Pool.imap_unordered selon que vous vous souciez ou non de l'ordre des résultats. Ils sont tous les deux non-bloquants ...

resolved_urls = [] 
pool = multiprocessing.Pool(100) 
res = pool.imap(resolve_url, checkin5) 

for x in res: 
    resolved_urls.append(x) 
    print 'finished one' 
    # ... whatever counting/tracking code you want here 
+0

Salut, merci, YakymPirozhenko me rappelle aussi, mais dans ce cas, le processus prend plus .... dans mon exemple , calcule le sqrt pour 100000 éléments, le temps d'exécution passe de 0.35s (pool.map) à ~ 13s (pool.imap) ...hmm ... – gladys0313

+0

Ce n'est pas vraiment surprenant car pour une fonction simple comme le quadrillage, la quantité de travail nécessaire pour résoudre les conditions de course l'emporte sur les avantages de la parallélisation. Ma conjecture est qu'avec le traitement de requête HTTP 'imap' sera presque aussi rapide que' map'. Je posterai plus de détails ci-dessous. –

1

D'abord, je crois que @ danf1024 a la réponse. Cela permet de résoudre le problème de ralentissement lors du passage de pool.map à pool.imap.

Voici une petite expérience:

from multiprocessing import Pool 


def square(x): 
    return x * x 


N = 10 ** 4 
l = list(range(N)) 


def test_map(n=N): 
    list(Pool().map(square, l)) 

# In [3]: %timeit -n10 q.test_map() 
# 10 loops, best of 3: 14.2 ms per loop 


def test_imap(n=N): 
    list(Pool().imap(square, l)) 

# In [4]: %timeit -n10 q.test_imap() 
# 10 loops, best of 3: 232 ms per loop 


def test_imap1(n=N): 
    list(Pool(processes=1).imap(square, l)) 

# In [5]: %timeit -n10 q.test_imap1() 
# 10 loops, best of 3: 191 ms per loop 


def test_map_naive(n=N): 
    # cast map to list in python3 
    list(map(square, l)) 

# In [6]: %timeit -n10 q.test_map_naive() 
# 10 loops, best of 3: 1.2 ms per loop 

Parce que équerrage est une opération pas cher par rapport à, disons, le téléchargement et l'analyse d'une page Web, parallélisation aura des gains si chaque processeur peut traiter de gros morceaux sans interruption de l'entrée . Ce n'est pas le cas avec imap, qui fonctionne très mal sur mes 4 cœurs. Amusant, restreindre le nombre de processus à 1 rend imap plus rapide car les conditions de course sont supprimées.

Cependant, lorsque vous passez à des opérations plus coûteuses, la différence entre imap et map devient de moins en moins importante.

+0

wow, incroyable, merci beaucoup! J'ai cliqué sur la réponse de dan parce que sa réponse est juste pour cette question --- je pense que ce sera plus simple et utile si d'autres personnes partagent aussi mon problème. Mais votre explication et votre aide sont super pour moi. Je vous remercie! – gladys0313

+0

Je suis content que vous trouviez cela utile. Aussi, compte tenu de votre tâche, vous voudrez peut-être consulter la bibliothèque de l'événement: http://eventlet.net/ –