2009-06-16 3 views
8

Contexte:Classe Python pour fusionner les fichiers triés, comment cela peut-il être amélioré?

Je nettoie grand (ne peut pas être tenue en mémoire) des fichiers délimités par des tabulations. Comme je nettoie le fichier d'entrée, je crée une liste en mémoire; quand il arrive à 1 000 000 entrées (environ 1 Go en mémoire) je le trier (en utilisant la clé par défaut ci-dessous) et écrire la liste dans un fichier. Cette classe sert à regrouper les fichiers triés. Cela fonctionne sur les fichiers que j'ai rencontrés jusqu'ici. Mon cas le plus important, jusqu'à présent, est la fusion de 66 fichiers triés.

Questions:

  1. Y at-il des trous dans ma logique (où est-il fragile)?
  2. Ai-je implémenté l'algorithme de fusion-tri correctement?
  3. Y a-t-il des améliorations évidentes qui pourraient être faites?

Exemple de données:

Ceci est une abstraction d'une ligne dans un de ces fichiers:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

Les plats à emporter est que j'utilise 'SomeStringId'.lower().replace(' ', '') comme ma clé de tri.

Code d'origine:

class SortedFileMerger(): 
    """ A one-time use object that merges any number of smaller sorted 
     files into one large sorted file. 

     ARGS: 
      paths - list of paths to sorted files 
      output_path - string path to desired output file 
      dedup - (boolean) remove lines with duplicate keys, default = True 
      key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')" 
        will be prepended by "lambda line: ". This should be the same 
        key that was used to sort the files being merged! 
    """ 
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"): 
     self.key = eval("lambda line: %s" % key) 
     self.dedup = dedup 
     self.handles = [open(path, 'r') for path in paths] 
     # holds one line from each file 
     self.lines = [file_handle.readline() for file_handle in self.handles] 
     self.output_file = open(output_path, 'w') 
     self.lines_written = 0 
     self._mergeSortedFiles() #call the main method 

    def __del__(self): 
     """ Clean-up file handles. 
     """ 
     for handle in self.handles: 
      if not handle.closed: 
       handle.close() 
     if self.output_file and (not self.output_file.closed): 
      self.output_file.close() 

    def _mergeSortedFiles(self): 
     """ Merge the small sorted files to 'self.output_file'. This can 
      and should only be called once. 
      Called from __init__(). 
     """ 
     previous_comparable = '' 
     min_line = self._getNextMin() 
     while min_line: 
      index = self.lines.index(min_line) 
      comparable = self.key(min_line) 
      if not self.dedup:      
       #not removing duplicates 
       self._writeLine(index) 
      elif comparable != previous_comparable: 
       #removing duplicates and this isn't one 
       self._writeLine(index) 
      else:         
       #removing duplicates and this is one 
       self._readNextLine(index) 
      previous_comparable = comparable 
      min_line = self._getNextMin() 
     #finished merging 
     self.output_file.close() 

    def _getNextMin(self): 
     """ Returns the next "smallest" line in sorted order. 
      Returns None when there are no more values to get. 
     """ 
     while '' in self.lines: 
      index = self.lines.index('') 
      if self._isLastLine(index): 
       # file.readline() is returning '' because 
       # it has reached the end of a file. 
       self._closeFile(index) 
      else: 
       # an empty line got mixed in 
       self._readNextLine(index) 
     if len(self.lines) == 0: 
      return None 
     return min(self.lines, key=self.key) 

    def _writeLine(self, index): 
     """ Write line to output file and update self.lines 
     """ 
     self.output_file.write(self.lines[index]) 
     self.lines_written += 1 
     self._readNextLine(index) 

    def _readNextLine(self, index): 
     """ Read the next line from handles[index] into lines[index] 
     """ 
     self.lines[index] = self.handles[index].readline() 

    def _closeFile(self, index): 
     """ If there are no more lines to get in a file, it 
      needs to be closed and removed from 'self.handles'. 
      It's entry in 'self.lines' also need to be removed. 
     """ 
     handle = self.handles.pop(index) 
     if not handle.closed: 
      handle.close() 
     # remove entry from self.lines to preserve order 
     _ = self.lines.pop(index) 

    def _isLastLine(self, index): 
     """ Check that handles[index] is at the eof. 
     """ 
     handle = self.handles[index]    
     if handle.tell() == os.path.getsize(handle.name): 
      return True 
     return False 

Edit: mise en œuvre des suggestions de Brian je suis venu avec la solution suivante:

Deuxième édition: Mise à jour la suggestion du code par John Machin :

def decorated_file(f, key): 
    """ Yields an easily sortable tuple. 
    """ 
    for line in f: 
     yield (key(line), line) 

def standard_keyfunc(line): 
    """ The standard key function in my application. 
    """ 
    return line.split('\t', 2)[1].replace(' ', '').lower() 

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc): 
    """ Does the same thing SortedFileMerger class does. 
    """ 
    files = map(open, paths) #open defaults to mode='r' 
    output_file = open(output_path, 'w') 
    lines_written = 0 
    previous_comparable = '' 
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]): 
     comparable = line[0] 
     if previous_comparable != comparable: 
      output_file.write(line[1]) 
      lines_written += 1 
     previous_comparable = comparable 
    return lines_written 

rugueux test

En utilisant les mêmes fichiers d'entrée (2,2 Go de données):

  • classe SortedFileMerger a 51 minutes (3068.4 secondes)
  • solution de Brian a pris 40 minutes (2408.5 secondes)
  • Après avoir ajouté les suggestions de John Machin, le code de la solution a pris 36 minutes (2214,0 secondes)
+0

decorated_file équivaut à ((clé (ligne), ligne) pour la ligne en f) –

+0

@gnibbler, Will qu'accélérer le processus ou tout simplement se débarrasser de la fonction? – tgray

Répondre

16

Notez que dans python2.6, heapq a une nouvelle fonction merge qui fera cela pour vous.

Pour gérer la fonction clé personnalisée, vous pouvez simplement envelopper le fichier iterator avec quelque chose qui décore afin qu'elle compare basée sur la clé, et dépouiller après coup:

def decorated_file(f, key): 
    for line in f: 
     yield (key(line), line) 

filenames = ['file1.txt','file2.txt','file3.txt'] 
files = map(open, filenames) 
outfile = open('merged.txt') 

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]): 
    outfile.write(line[1]) 

[Modifier] Même dans les versions antérieures de python, il vaut probablement la peine de prendre l'implémentation de la fusion à partir du module heapq ultérieur. C'est un python pur, et il est non modifié dans python2.5, et comme il utilise un tas pour obtenir le minimum suivant, il devrait être très efficace lors de la fusion d'un grand nombre de fichiers. Vous devriez pouvoir copier simplement le heapq.py d'une installation de python2.6, le copier dans votre source en tant que "heapq26.py" et employer "from heapq26 import merge" - il n'y a aucune caractéristique spécifique 2.6 utilisée dedans. Alternativement, vous pouvez simplement copier la fonction de fusion (réécrire les appels heappop etc. pour référencer le module heapq python2.5).

+0

En fait, j'utilise toujours python 2.5. – tgray

+0

Ceci est une excellente réponse, j'ai cherché Google pendant des semaines et ne pouvait pas trouver cela. – tgray

2

< < Cette « réponse » est un commentaire sur le code résultant du questionneur original >>

Suggestion: en utilisant eval() est ummmm et ce que vous faites limite l'appelant à l'aide lambda - extraction de la clé peut nécessiter plus qu'un one-liner, et en tout cas n'avez-vous pas besoin de la même fonction pour l'étape de tri préliminaire?

donc remplacer:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"): 
    keyfunc = eval("lambda line: %s" % key) 

avec ceci:

def my_keyfunc(line): 
    return line.split('\t', 2)[1].replace(' ', '').lower() 
    # minor tweaks may speed it up a little 

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):  
+0

Merci, l'eval() m'a paru bizarre aussi, mais je ne connaissais pas l'alternative. J'avais obtenu la méthode de cette recette: http://code.activestate.com/recipes/576755/ – tgray

+0

Cette recette fournit le gimmick d'eval() seulement comme caractéristique facultative pour ceux qui sont assez courageux pour taper la source de leur fonction d'extraction principale dans la ligne de commande quand ils exécutent un tri multi-Go :-) Vous remarquerez que c'était proprement séparé; les deux fonctions fusion et tri prennent une fonction pour l'argument clé, pas une chaîne. –

Questions connexes