2017-06-02 1 views
0

J'ai une liste de fichiers Excel 2010 (xlsx) qui doivent être recherchés pour une valeur spécifique. Comme xslx est un format binaire, cela ne peut pas être fait avec des éditeurs de texte normaux. Donc, je fais ce qui suit pour chaque fichierpython multiprocessing worker la consommation de mémoire augmente indéfiniment

  1. obtenir le nom de fichier
  2. ouvert pandas géants
  3. convertir dataframe à numpy tableau
  4. tableau de contrôle pour la valeur

Cela appelle à multitraitement comme n'est pas lié à I/O. Le temps est nécessaire pour les choses pandas et la conversion de tableau. J'ai donc mis en place une version multi-traitement de mon script (voir ci-dessous):

Le problème est la consommation de mémoire de chaque processus de travail. Il s'accumule continuellement en peaking dans 2GB par travailleur bien que chaque fichier xlsx soit simplement 100kb. Je ne vois pas pourquoi la mémoire n'est pas libérée avant qu'un nouveau fichier soit traité. De cette façon, je manque de mémoire avant que ma liste de fichiers ne soit traitée.

Le problème ne semble pas être la file d'attente, mais les pandas.

Voici mon code. Il peut être testé avec tous les fichiers xlsx que vous avez sur votre système.

import pandas as pd 
import multiprocessing as mp 
import glob 

path = r'c:\temp' 
fileFilter = 'serial.xlsx' 
searchString = '804.486' 


def searchFile(tasks, results, searchString): 
    """Iterates over files in tasks and searches in file for the 
    occurence of 'searchString'. 

    Args: 
    ----- 
    tasks: queue of strings 
     Files to look in 
    results: queue of strings 
     Files where the searchString was found 
    searchString: str 
     the string to be searched 
    """ 
    # for files in the queue 
    for task in iter(tasks.get, 'STOP'): 
     # read the filestructre into memory 
     xfile = pd.ExcelFile(task) 
     # iterate all sheets 
     for sheet in xfile.sheet_names[:3]: 
      # read the sheet 
      data = pd.read_excel(xfile, sheet) 
      # check if searchString is in numpy representation of dataframe 
      if searchString in data.values.astype(str): 
       # put filename in results queue 
       results.put(task) 
       break 
     xfile.close() 

if __name__ == "__main__": 
    # get all files matching the filter that are in the root path 
    print('gathering files') 
    files = glob.glob(path + '\**\{}'.format(fileFilter), recursive=True) 

    # setup of queues and variables 
    n_proc = 2 
    tasks = mp.Queue() 
    results = mp.Queue() 

    print('Start processing') 
    # setup processes and start them 
    procs = [mp.Process(target=searchFile, 
         args=(tasks, results, searchString)) 
      for x in range(n_proc)] 
    for p in procs: 
     p.daemon = True 
     p.start() 

    # populate queue 
    for file in files: 
     tasks.put(file) 

    for proc in procs: 
     tasks.put('STOP') 

    for p in procs: 
     p.join() 

    # print results 
    for result in range(results.qsize()): 
     print(results.get()) 

    print('Done') 
+0

En remarque - les fichiers '.xlsx' sont des archives compressées en zip avec du contenu XML à l'intérieur. Si la seule chose dont vous avez besoin est de rechercher une valeur, vous pouvez décompresser et rechercher le flux sans aucun module supplémentaire. –

+0

Ce n'est pas la seule chose à faire ... Puis-je les décompresser avec le module zip interne python? – RaJa

+0

J'ai essayé de décompresser les fichiers xlsx et il génère de nombreux documents xml: pour le style, pour chaque feuille de calcul, les thèmes, ... trouver le document xml avec les valeurs est plutôt difficile. – RaJa

Répondre

0

Semble problème dans gc, qui ne peut pas rassembler des cadres dans le contexte de pandas fonction que vous ne laissez jamais. Vous pouvez utiliser multiprocessing.Pool.map qui peut gérer la file d'attente pour vous. La fonction de travail sera appelée pour chaque article et laissez gc faire le travail. Vous pouvez également utiliser maxtasksperchild Param de constructeur de pool pour limiter le nombre d'éléments traités par le worker.

import glob 
import multiprocessing 


def searchFile(task, searchString): 
    xfile = pd.ExcelFile(task) 
    ... 
    if found: 
     return task 


if __name__ == '__main__': 
    files = glob.glob(path + '\**\{}'.format(fileFilter), recursive=True) 
    searchString = '804.486' 

    pool = multiprocessing.Pool(2, maxtasksperchild=10) 

    args = ((fname, searchString) for fname in files) 
    matchedFiles = filter(None, pool.map(searchFile, args)) 
    pool.close() 
+0

Essayé cela ... Ne pas résoudre le problème, car le problème n'est pas le code, mais l'un des fichiers. Vérifié avec un ensemble différent de fichiers et cela fonctionne sans problème. Merci quand même. – RaJa