2017-08-19 9 views
2

J'appelle une fonction task(url, param1, param2) qui renvoie le résultat d'un appel d'API à url = url ou le nom url si l'appel API n'a pas fonctionné. Mon task ressemble à quelque chose comme:renvoyer des valeurs à partir d'exceptions dans le multitraitement

def task(url, param1, param2): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     return url 

Maintenant, je veux appliquer task à une liste de 100 urls et commencer à les multiprocessing comme:

import multiprocessing as mp 

def run_tasks(urls, param1, param2): 
    jobs = [] 
    for i in range(len(urls)): 
     process = mp.Process(target=task, args=(urls[i], param1, param2)) 
     jobs.append(process) 

    ## catch error processes 
    error_urls = [] 

    ## start processes 
    for j in jobs: 
     j.start() 

    ## finish processes 
    for j in jobs: 
     j.join() 

De ce qui précède run_tasks, comment pourrais-je retourner une liste de le url s qui m'avait donné un ValueError? J'ai essayé error_urls.append(j.join()), mais cela n'a pas fonctionné.

Répondre

2

Il existe deux méthodes pour obtenir des résultats du processus.

Méthode 1. Utilisez list de Manager. Vous ne devez pas utiliser le verrou pour synchroniser entre les processus.

from multiprocessing import Process, Manager 

def task(url, param1, param2, error_list): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     error_list.append(url) 

def run_tasks(urls, param1, param2): 

    error_list = Manager().list()  
    jobs = [] 

    for i in range(len(urls)): 
     process = Process(target=task, args=(urls[i], param1, param2, error_list)) 
     jobs.append(process) 

    ## start processes 
    for j in jobs: 
     j.start() 

    ## finish processes 
    for j in jobs: 
     j.join() 

Méthode 2. Utilisez ProcessPoolExecutor de concurrent.futures. Cette méthode est facile à comprendre et moins de code.

from concurrent import futures 

def task(url, param1, param2): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     return url 

def runt_tasks(urls, param1, param2): 

    with futures.ProcessPoolExecutor() as executor: 
     result = executor.map(task, urls, [param1] * len(urls), [param2] * len(urls)) 

    error_list = [item for item in result if item is not None] 

Enfin, à partir de la description de la question. C'est un problème sensible à IO. Je vous recommande d'utiliser ThreadPoolExecutor. Lorsque vous effectuez une opération d'E/S, le thread libère le GIL pour laisser les autres threads s'exécuter. Pour un problème sensible au processeur, vous devez utiliser ProcessPoolExecutor. Et asyncio est un autre choix pour faire de la programmation simultanée en Python 3.

1

Essayez la mémoire partagée. utiliser cette multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

Vous pouvez définir ceci dans dans run_tasks

from multiprocessing import Process, Lock 
from multiprocessing.sharedctypes import Array 
lock = Lock() 
error_urls = Array(c_char_p, [], lock = lock) 

Et

def task(url, param1, param2): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     error_urls.append(url) 

comme la doc de Array():

La même chose que RawArray() sauf en fonction de la valeur du verrou Enveloppe de synchronisation sûre pour les processus m peut être renvoyé au lieu d'un tableau raw ctypes.

Il est donc sécurisé. En savoir plus sur Array() peut se référer this, à propos de ctypes (c_char_p) se référer this