2010-09-06 9 views
0

comment bien ce code python? besoin de critique) il y a une erreur dans ce code, parfois le script imprime "TOUT ATTENDRE - PEUT FINIR!" et de geler (plus d'actions sont Happend ..) mais je ne peux pas trouver la raison pour laquelle cela se produit?critiquer ce code python (crawler avec threadpool)

site crawler avec threadpool:

import sys 
from urllib import urlopen 
from BeautifulSoup import BeautifulSoup, SoupStrainer 
import re 
from Queue import Queue, Empty 
from threading import Thread 

W_WAIT = 1 
W_WORK = 0 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, pool, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 
     self.pool = pool 
     self.state = None 

    def is_wait(self): 
     return self.state == W_WAIT 


    def run(self): 
     while True: 
      #if all workers wait - time to exsit 
      print "CHECK WAIT: !!! ",self.pool.is_all_wait() 
      if self.pool.is_all_wait(): 
       print "ALL WAIT - CAN FINISH!" 
       return 
      try: 
       func, args, kargs = self.tasks.get(timeout=3) 
      except Empty: 
       print "task wait timeout" 
       continue 

      self.state = W_WORK 
      print "START !!! in thread %s" % str(self) 
      #print args 

      try: func(*args, **kargs) 
      except Exception, e: print e 
      print "!!! STOP in thread %s", str(self) 
      self.tasks.task_done() 
      self.state = W_WAIT 
      #threads can fast empty it! 
      #if self.tasks.qsize() == 0: 
      # print "QUIT!!!!!!" 
      # break 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     #self.tasks = Queue(num_threads) 
     self.tasks = Queue() 
     self.workers = [] 
     for _ in range(num_threads): 
      self.workers.append(Worker(self,self.tasks)) 


    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 

    def is_all_wait(self): 
     for w in self.workers: 
      if not w.is_wait(): 
       return False 
     return True 

visited = set() 
queue = Queue() 
external_links_set = set() 
internal_links_set = set() 
external_links = 0 

def process(pool,host,url): 

    try: 

     content = urlopen(url).read() 
    except UnicodeDecodeError: 
     return 


    for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): 
     try: 
      href = link['href'] 
     except KeyError: 
      continue 


     if not href.startswith('http://'): 
      href = 'http://%s%s' % (host, href) 
     if not href.startswith('http://%s%s' % (host, '/')): 
      continue 

     internal_links_set.add(href) 


     if href not in visited: 
      visited.add(href) 
      pool.add_task(process,pool,host,href) 

     else: 
      pass 

def start(host,charset): 
    pool = ThreadPool(20) 
    pool.add_task(process,pool,host,'http://%s/' % (host)) 
    pool.wait_completion() 

start('evgenm.com','utf8') 

Thanx aide! Je fais une nouvelle implémentation: Que pouvez-vous dire à propos de ce code # 2? ==================================== ESSAYEZ # 2 ============ ===========================

import sys 
    from urllib import urlopen 
    from BeautifulSoup import BeautifulSoup, SoupStrainer 
    import re 
    from Queue import Queue, Empty 
    from threading import Thread 


    W_STOP = 1 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, pool, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.pool = pool 
     self.state = None 
     self.start() 



    def stop(self): 
     self.state = W_STOP 

    def run(self): 
     while True: 
      if self.state == W_STOP: 
       print "\ncalled stop" 
       break 
      try: 
       func, args, kargs = self.tasks.get(timeout=3) 
      except Empty: 
       continue 
      print "\n***START*** %s" % str(self) 
      try: 
       func(*args, **kargs) 
      except Exception, e: 
       print e 
      print "\n***STOP*** %s", str(self) 
      self.tasks.task_done() 



class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     #self.tasks = Queue(num_threads) 
     self.tasks = Queue() 
     self.workers = [] 
     for _ in range(num_threads): 
      self.workers.append(Worker(self,self.tasks)) 


    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 

    def stop_threads(self): 
     for w in self.workers: 
      w.stop() 

    def wait_stop(self): 
     self.wait_completion() 
     self.stop_threads() 



    visited = set() 
    queue = Queue() 
    external_links_set = set() 
    internal_links_set = set() 
    external_links = 0 

    def process(pool,host,url): 

     try: 

      content = urlopen(url).read() 
     except UnicodeDecodeError: 
      return 


     for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): 
      try: 
       href = link['href'] 
      except KeyError: 
       continue 


      if not href.startswith('http://'): 
       href = 'http://%s%s' % (host, href) 
      if not href.startswith('http://%s%s' % (host, '/')): 
       continue 

      internal_links_set.add(href) 


      if href not in visited: 
       visited.add(href) 
       pool.add_task(process,pool,host,href) 

      else: 
       pass 

    def start(host,charset): 
     pool = ThreadPool(20) 
     pool.add_task(process,pool,host,'http://%s/' % (host)) 
     pool.wait_stop() 

    start('evgenm.com','utf8') 
+0

Critiquer? ... ça alors, quelle balle molle. Je suppose que je vais résister à l'envie d'être drôle, mais ... – bmargulies

+0

bien sûr vous avez raison) de votre point de vue) – Evg

Répondre

1

Vous partagez l'état entre threads (c'est-à-dire, dans is_all_wait) sans synchronisation. De plus, le fait que tous les threads "attendent" n'est pas un indicateur fiable que la file d'attente est vide (par exemple, ils pourraient tous être en train d'obtenir une tâche). Je suspecte que, de temps en temps, les discussions se terminent avant que la file soit vraiment vide. Si cela arrive assez souvent, il vous restera des tâches dans la file d'attente mais pas de threads pour les exécuter. Donc queue.join() attendra pour toujours.

Mon recomendation est:

  1. Débarrassez-vous de is_all_wait - ce n'est pas un indicateur fiable
  2. Débarrassez-vous de la tâche state - ce n'est pas vraiment nécessaire
  3. Fiez queue.join pour vous faire savoir lorsque tout est traité

Si vous devez supprimer les threads (par exemple, cela fait partie d'un programme plus long et plus long), alors Faites-le après le queue.join().

+0

merci pour votre aide, je fais la mise en œuvre # 2, ce que vous en pensez? – Evg

+0

Oui, cela semble raisonnable. Dans l'ensemble, le code semble assez propre et facile à suivre. –

0

Je connais python de base, mais le filetage en python est pas inutile? J'ai vu des tonnes d'articles critiquant l'interprète de verrouillage global pour cela.

+0

Comme vous le dites, "enfiler en python n'est pas inutile" ;-). Le GIL est spécifique à CPython. Jython, par exemple, utilise la synchronisation Java pour sécuriser le thread d'interprétation. En outre, le GIL a un impact moins sévère sur les performances des tâches liées à l'E/S que sur les tâches liées au processeur. Enfin, le code dans les extensions C peut libérer le GIL lorsqu'il n'a pas besoin d'accéder aux structures de données Python. Ce n'est donc pas un marché aussi important que ce qui a été fait pour la plupart des utilisations. –