2010-01-15 11 views
4

J'ai besoin de lire de très gros fichiers texte (100+ Mb), traiter chaque ligne avec regex et stocker les données dans une structure. Ma structure hérite de defaultdict, elle a une méthode read (self) qui lit le fichier self.file_name.lire plusieurs fichiers en utilisant le multi-traitement

Regardez cet exemple très simple (mais pas réel), je ne suis pas en utilisant regex, mais je suis lignes: séparation


import multiprocessing 
from collections import defaultdict 

def SingleContainer(): 
    return list() 

class Container(defaultdict): 
    """ 
    this class store odd line in self["odd"] and even line in self["even"]. 
    It is stupid, but it's only an example. In the real case the class 
    has additional methods that do computation on readen data. 
    """ 
    def __init__(self,file_name): 
     if type(file_name) != str: 
      raise AttributeError, "%s is not a string" % file_name 
     defaultdict.__init__(self,SingleContainer) 
     self.file_name = file_name 
     self.readen_lines = 0 
    def read(self): 
     f = open(self.file_name) 
     print "start reading file %s" % self.file_name 
     for line in f: 
      self.readen_lines += 1 
      values = line.split() 
      key = {0: "even", 1: "odd"}[self.readen_lines %2] 
      self[key].append(values) 
     print "readen %d lines from file %s" % (self.readen_lines, self.file_name) 

def do(file_name): 
    container = Container(file_name) 
    container.read() 
    return container.items() 

if __name__ == "__main__": 
    file_names = ["r1_200909.log", "r1_200910.log"] 
    pool = multiprocessing.Pool(len(file_names)) 
    result = pool.map(do,file_names) 
    pool.close() 
    pool.join() 
    print "Finish"  

A la fin, je dois rejoindre tous les résultats dans un seul conteneur . Il est important que l'ordre des lignes soit préservé. Mon approche est trop lente lors du retour des valeurs. Meilleure solution? J'utilise python 2.6 sous Linux

Répondre

0

Le multitraitement est plus adapté aux processus orientés CPU ou mémoire car le temps de recherche des disques rotatifs tue les performances lors de la commutation entre les fichiers. Chargez vos fichiers journaux dans un lecteur flash rapide ou une sorte de disque mémoire (physique ou virtuel), ou abandonnez le multitraitement.

+0

mon problème est limité par le processeur et non lié à l'E/S. Dans cet exemple, je divise des lignes, mais dans le cas réel je travaille avec une regex complexe et longue et le temps IO (recherche, ...) est beaucoup moins que le temps cpu –

0

Vous créez une piscine avec autant de travailleurs que de fichiers. Cela peut être trop. Habituellement, je vise à avoir le nombre de travailleurs autour du même nombre de cœurs.

Le fait est que votre dernière étape sera un processus unique fusionnant tous les résultats ensemble. Il n'y a pas à éviter cela, étant donné la description de votre problème. C'est ce qu'on appelle une synchronisation de barrière: toutes les tâches doivent atteindre le même point avant de pouvoir continuer.

Vous devez probablement exécuter ce programme plusieurs fois ou en boucle, en transmettant une valeur différente à multiprocessing.Pool() à chaque fois, en commençant à 1 et en passant au nombre de cœurs. Temps chaque exécution, et voir quel compte de travailleur fait le mieux. Le résultat dépendra de l'intensité du processeur (et non de votre disque) de votre tâche. Je ne serais pas surpris si 2 étaient mieux si votre tâche est d'environ la moitié du processeur et demi de disque, même sur une machine à 8 cœurs.

+0

Oui je l'ai déjà fait. Mon choix n'était pas un choix aléatoire. J'ai essayé de mesurer le temps sans la ligne de retour, et le meilleur choix était quand le nombre de processus est égal au nombre de dossier même si le nombre de processus est plus grand que le nombre de noyaux. –

+0

Ensuite, je ne vois pas comment vous pouvez faire mieux. Le tueur est: "Il est important que l'ordre des lignes soit préservé." Cela ne peut être fait qu'une seule entrée à la fois, même si vous avez prétraité chaque fichier indépendamment. Votre alternative serait que chaque travailleur génère un fichier avec un suffixe, et que tout ce qui lit ces fichiers les lise dans l'ordre, ainsi la fusion est éliminée. –

4

Vous rencontrez probablement deux problèmes.

L'un d'entre eux a été mentionné: vous lisez plusieurs fichiers à la fois. Ces lectures finiront par être entrelacées, ce qui causera le débordement du disque. Vous voulez lire des fichiers entiers à la fois, puis seulement multithread le calcul sur les données. Deuxièmement, vous êtes confronté à la surcharge du module multitraitement de Python. Il n'utilise pas de threads, mais lance plusieurs processus et sérialise les résultats via un canal. C'est très lent pour les données en masse - en fait, il semble être plus lent que le travail que vous faites dans le thread (au moins dans l'exemple). C'est le problème du monde réel causé par le GIL.

Si je ne modifie() pour retourner Aucun au lieu de container.items() pour désactiver les données supplémentaires copie, cet exemple est plus rapide qu'un seul fil, tant que les fichiers sont déjà mises en cache:

deux fils: 0.36elapsed 168% CPU

Un fil (remplacer pool.map sur la carte): 0: 00.52elapsed 98% CPU

Malheureusement, le problème GIL est fondamental et ne peut être travaillé autour de à l'intérieur de Python.

+0

Oui, c'est le problème: renvoyer des données. J'utilise le multitraitement et non le multithreading à cause de GIL. Mais je veux optimiser mon programme en utilisant tout le coeur de mon cpu! Si je mesure le temps à partir de "Démarrer la lecture du fichier" et "Readeng% d lignes" (en ignorant le temps de retour) la version multitraitement est 2 fois plus rapide que la version du processus unique (j'ai 2 core). Maintenant: qu'en est-il de la mémoire partagée? J'ai regardé la classe multiprocess.Manager, mais je veux partager une structure plus complexe que dict. –

+0

Je n'ai pas utilisé Manager, mais on dirait qu'il manipule les données par procuration, donc je pense que c'est encore plus lent. Vous pouvez utiliser la mémoire partagée pour partager des blocs de mémoire simples, mais pas les types Python natifs. Vous pouvez rechercher d'autres optimisations, mais sans aucun code, je ne peux faire aucune suggestion. –

+0

peut-être une solution peut être: réécrire la fonction de lecture en C++ et utiliser le multithreading réel avec C++? Avec cette approche puis-je contourner le problème de partager des données entre les processus (pipe)? –

Questions connexes