2010-08-27 9 views
6

Voici ce que je suis en train d'accomplir -python -> Module multitraitement

  1. J'ai environ un million de fichiers que je dois analyser & ajouter le contenu dans un fichier analysable unique.
  2. Puisqu'un seul processus prend des âges, cette option est désactivée.
  3. Ne pas utiliser de threads en Python car il s'agit essentiellement d'exécuter un seul processus (à cause de GIL).
  4. D'où l'utilisation du module multi-traitement. c'est-à-dire engendrant 4 sous-processus pour utiliser toute cette puissance de base brute :)

Jusqu'ici tout va bien, maintenant j'ai besoin d'un objet partagé auquel tous les sous-processus ont accès. J'utilise des files d'attente du module multi-traitement. De plus, tous les sous-processus doivent écrire leur sortie dans un seul fichier. Un endroit potentiel pour utiliser les verrous, je suppose. Avec cette configuration quand je cours, je n'obtiens aucune erreur (ainsi le processus de parent semble bien), il bloque juste. Quand j'appuie sur Ctrl-C, je vois un retraçage (un pour chaque sous-processus). De plus, aucune sortie n'est écrite dans le fichier de sortie. Voici le code (notez que tout fonctionne bien sans multi-processus) -

import os 
import glob 
from multiprocessing import Process, Queue, Pool 

data_file = open('out.txt', 'w+') 

def worker(task_queue): 
    for file in iter(task_queue.get, 'STOP'): 
     data = mine_imdb_page(os.path.join(DATA_DIR, file)) 
     if data: 
      data_file.write(repr(data)+'\n') 
    return 

def main(): 
    task_queue = Queue() 
    for file in glob.glob('*.csv'): 
     task_queue.put(file) 
    task_queue.put('STOP') # so that worker processes know when to stop 

    # this is the block of code that needs correction. 
    if multi_process: 
     # One way to spawn 4 processes 
     # pool = Pool(processes=4) #Start worker processes 
     # res = pool.apply_async(worker, [task_queue, data_file]) 

     # But I chose to do it like this for now. 
     for i in range(4): 
      proc = Process(target=worker, args=[task_queue]) 
      proc.start() 
    else: # single process mode is working fine! 
     worker(task_queue) 
    data_file.close() 
    return 

Qu'est-ce que je fais mal? J'ai également essayé de passer l'open_object ouvert à chacun des processus au moment du frai. Mais pour aucun effet. par exemple, Process(target=worker, args=[task_queue, data_file]). Mais cela n'a rien changé. Je pense que les sous-processus ne sont pas en mesure d'écrire dans le fichier pour une raison quelconque. Soit l'instance du file_object ne se reproduit pas (au moment de spawn) ou quelque autre bizarrerie ... Quelqu'un a une idée?

EXTRA: Aussi est-il un moyen de garder une passe & mysql_connection persistante l'ouvrir à travers les sub_processes? Donc, j'ouvre une connexion mysql dans mon processus parent & la connexion ouverte doit être accessible à tous mes sous-processus. Fondamentalement, c'est l'équivalent d'une mémoire partagée en python. Des idées ici?

+0

Si vous n'écrivez pas dans un fichier mais que vous faites une impression, est-ce que cela fonctionne? (sur Linux, je ferais python script.py> out.dat pour empêcher l'inondation de l'écran). – extraneon

+1

Et je pense que proc.start est non bloquant, donc vous devriez probablement attendre quelque part pour donner au processus la possibilité de faire un peu de travail avant de faire le fichier de données. – extraneon

+0

data_file.close() se fait à la toute fin. Cela devrait-il avoir un effet ici? L'impression fonctionne également bien. Je vois la sortie sur l'écran quand j'utilise l'impression ... Mais je veux utiliser le fichier. Aidez-moi! Aussi est-il possible de garder un mysql_connection persistant et de le passer aux sous-processus? –

Répondre

4

Bien que la discussion avec Eric ait été fructueuse, plus tard, j'ai trouvé une meilleure façon de le faire. Dans le module multitraitement, il y a une méthode appelée 'Pool' qui est parfaite pour mes besoins.

Il optimise lui-même le nombre de cœurs de mon système. c'est-à-dire que autant de processus sont engendrés que le non. des noyaux. Bien sûr, ceci est personnalisable. Alors, voici le code. Pourrait aider quelqu'un plus tard-

from multiprocessing import Pool 

def main(): 
    po = Pool() 
    for file in glob.glob('*.csv'): 
     filepath = os.path.join(DATA_DIR, file) 
     po.apply_async(mine_page, (filepath,), callback=save_data) 
    po.close() 
    po.join() 
    file_ptr.close() 

def mine_page(filepath): 
    #do whatever it is that you want to do in a separate process. 
    return data 

def save_data(data): 
    #data is a object. Store it in a file, mysql or... 
    return 

Toujours en passant par cet énorme module.Vous ne savez pas si save_data() est exécuté par un processus parent ou si cette fonction est utilisée par les processus enfants engendrés. Si c'est l'enfant qui effectue la sauvegarde, cela peut entraîner des problèmes de concurrence dans certaines situations. Si quelqu'un a plus d'expérience dans l'utilisation de ce module, vous apprécierez plus de connaissances ici ...

3

Les docs de multitraitement indiquent plusieurs méthodes de partage d'état entre les processus:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

Je suis sûr que chaque processus obtient un interprète frais et la cible (fonction) et sont chargés dans args elle. Dans ce cas, l'espace de noms global de votre script aurait été lié à votre fonction de travail, donc le fichier data_file serait là. Cependant, je ne suis pas sûr de ce qu'il advient du descripteur de fichier tel qu'il est copié. Avez-vous essayé de passer l'objet fichier comme l'un des arguments?

Une alternative consiste à passer une autre file d'attente qui contiendra les résultats des travailleurs. Les travailleurs put les résultats et le code principal get s les résultats et l'écrit dans le fichier.

+0

Ouais! Je pourrais faire ça. Je pourrais avoir une autre file d'attente qui serait quelque chose comme une out_queue dans laquelle les processus écrire. Puisque le processus parent a accès à ceci, il pourrait continuer à lire cette file d'attente et écrire dans un fichier. Cela pourrait fonctionner !! J'ai également essayé de passer l'objet fichier comme l'un des arguments. Ça n'a pas l'air de marcher. Les threads n'écrivent pas dans le fichier. Éric aussi, aucune idée de comment passer une connexion mysql persistante aux sous-processus? –

+0

@Srikar, espérons que cela aide. En ce qui concerne les connexions mysql, je ne suis pas sûr de cela. Je dirais que vous êtes mieux avec une connexion séparée pour chaque processus. Même si vous pouviez établir une connexion, je ne suis pas sûr de la "sécurité des threads". Si vous deviez vraiment partager un seul, vous devriez probablement faire des trucs bizarres. Là encore, vous pouvez également utiliser le mécanisme de requête/réponse de la connexion dans une file d'attente. Ensuite, le processus principal (ou un processus de gestion mysql séparé) récupère les requêtes de la file d'attente, les exécute et renvoie les résultats ... ou quelque chose comme ça. –