2011-01-22 3 views
0

alors imaginons un grand document xml (taille de fichier> 100 mb) que nous voulons itérer en utilisant cElementTree.iterparse.divisant et conquérant etree.iterparse en utilisant le multiprocessing

Mais tous ces cœurs que nous a promis Intel seraient utiles, comment les utilisons-nous? voici ce que je veux:

from itertools import islice 
from xml.etree import ElementTree as etree 

tree_iter = etree.iterparse(open("large_file.xml", encoding="utf-8")) 

first = islice(tree_iter, 0, 10000) 
second = islice(tree_iter, 10000) 

parse_first() 
parse_second() 

Il semble y avoir plusieurs problèmes avec cela, pas le moins étant que le itérateur retourné par iterparse() semble résister à trancher.

Est-il possible de diviser la charge de travail de l'analyse d'un document XML volumineux en deux ou quatre tâches distinctes (sans charger le document entier en mémoire? Le but étant alors d'exécuter les tâches sur les processeurs séparés.

Répondre

0

I . pense que vous avez besoin d'une bonne threadpool avec une file d'attente de tâches pour ce que j'ai trouvé (et utiliser) ce très bon (il est à python3, mais ne devrait pas être trop difficile de se convertir à 2.x):

# http://code.activestate.com/recipes/577187-python-thread-pool/ 

from queue import Queue 
from threading import Thread 

class Worker(Thread): 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: func(*args, **kargs) 
      except Exception as exception: print(exception) 
      self.tasks.task_done() 

class ThreadPool: 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): Worker(self.tasks) 

    def add_task(self, func, *args, **kargs): 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     self.tasks.join() 

Maintenant, vous pouvez simplement lancer la boucle sur l'iterparse et laisser le threadpool diviser le travail pour vous.L'utiliser est simple comme ceci:

def executetask(arg): 
    print(arg) 

workers = threadpool.ThreadPool(4) # 4 is the number of threads 
for i in range(100): workers.add_task(executetask, i) 

workers.wait_completion() # not needed, only if you need to be certain all work is done before continuing 
+0

donc je devine que j'appelle alors workers.add_task avec une fonction qui analyse chaque élément individuel? pour elem dans etree.parseiter(): workers.add_task (parseElem, elem)? le problème est que, puisque l'analyse est relativement simple, cela n'entraîne aucun gain de performance. ce dont j'ai besoin est de diviser le etree.parseiter() en morceaux maniables: idéalement, sur les 100.000 éléments de l'itération, donnez 25.000 à chaque thread de la piscine. est-ce possible? –

+0

Cela dépend de ce que vous faites, mais je suppose. – orlp