2017-08-24 4 views
0

TL; DR: - les processus consommateurs finition mais ne pas rejoindre, aucune erreur sont élevés et le script est exécuté à l'infini, coincé dans les limbes sur le join statment? Je vise à accélérer un processus de récupération de données, mais je ne sais pas combien de «tâches» (des données à récupérer) il pourrait y avoir. J'ai donc fait une version modifiée de la méthode de la pilule empoisonnée afin que la tâche reconnaisse quand elle ne récupère plus l'information, et déclenche la déclaration de la pilule empoisonnée if.multitraitement - les processus ne vont pas rejoindre?

J'ai posté une preuve , qui est un exemple de travail de ma méthode de pilule empoisonnée, et un script complet , qui, comme son nom l'indique est le script complet. (Les deux devraient être en mesure de fonctionner comme il est)

preuve:

import multiprocessing 


class Task: 
    def __init__(self, number): 
     self.number = number 

    def __call__(self): 
     """Find officer and company data and combine and save it""" 

     try: 
      # 'gather some data!' 
      self.result = self.number*2 
      print(self.number) 
      # 'fake' finding no data 
      if self.result >= 8: 
       raise NameError 
     except NameError: 
      # become poison pill once latest is done 
      self.result = None 

    def output(self): 
     return self.result 


class Consumer(multiprocessing.Process): 
    """Handle process and re-queue complete tasks""" 
    def __init__(self, waiting_queue, complete_queue): 
     multiprocessing.Process.__init__(self) 
     self.waiting_queue = waiting_queue 
     self.complete_queue = complete_queue 

    def run(self): 
     """process tasks until queue is empty""" 
     proc_name = self.name 
     while True: 
      current_task = self.waiting_queue.get() 
      current_task() 
      if current_task.output() is None: 
       print('{}: Exiting, poison pill reached'.format(proc_name)) 
       self.waiting_queue.task_done() 
       break 
      self.waiting_queue.task_done() 
      self.complete_queue.put(current_task) 
     print('{}: complete'.format(proc_name)) 


class Shepard: 
    """Handle life cycle of Consumers, Queues and Tasks""" 
    def __init__(self): 
     pass 

    def __call__(self, start_point): 

     # initialize queues 
     todo = multiprocessing.JoinableQueue() 
     finished = multiprocessing.JoinableQueue() 

     # start consumers 
     num_consumers = multiprocessing.cpu_count() * 2 
     consumers = [Consumer(todo, finished) for i in range(num_consumers)] 
     for q in consumers: 
      q.start() 

     # decide on (max) end limit (make much longer than suspected amount of data to be gathered 
     start = int(start_point) 
     max_record_range = 100 
     end = start + max_record_range 

     # Enqueue jobs 
     for i in range(start, end): 
      todo.put(Task(i)) 
     print('Processes joining') 
     # wait for processes to join 
     for p in consumers: 
      p.join() 
     print('Processes joined') 

     # process results - UNFINISHED 
     pass 

     # return results - UNFINISHED 
     return 'results!' 


if __name__ == '__main__': 

    # load start points: 
    start_points = {'cat1': 1, 'cat2': 3, 'cat3': 4} 


    master = Shepard() 
    cat1 = master(start_points['cat1']) 
    print('cat1 done') 
    cat2 = master(start_points['cat2']) 
    print('cat2 done') 
    cat3 = master(start_points['cat3']) 

Voici donc le script complet :

import time 
import requests 
import sys 
import json 
import pandas as pd 
import multiprocessing 
import queue 


class CompaniesHouseRequest: 
    """Retreive information from Companies House""" 
    def __init__(self, company, catagory_url=''): 
     """Example URL: '/officers'""" 
     self.company = str(company) 
     self.catagory_url = str(catagory_url) 

    def retrieve(self, key='Rn7RLDV9Tw9v4ShDCotjDtJFBgp1Lr4d-9GRYZMo'): 
     """retrieve data from Companies House""" 
     call = 'https://api.companieshouse.gov.uk/company/' + self.company + self.catagory_url 
     retrieve_complete = False 
     while retrieve_complete is False: 
      resp = requests.get(call, auth=requests.auth.HTTPBasicAuth(key, '')) 
      code = resp.status_code 
      if code == 404: 
       print(resp.status_code) 
       raise NameError('Company not found') 
      elif code == 200: 
       try: 
        self.data = json.loads(resp.content.decode('UTF8')) 
        retrieve_complete = True 
       except json.decoder.JSONDecodeError: 
        print('Decode Error in Officers!') 
      else: 
       print("Error:", sys.exc_info()[0]) 
       print('Retrying') 
       time.sleep(5) 
     return self.data 


class Company: 
    """Retrieve and hold company details""" 
    def __init__(self, company_number): 
     self.company_number = company_number 

    def __call__(self): 
     """Create request and process data""" 
     # make request 
     req = CompaniesHouseRequest(self.company_number) 
     data = req.retrieve() 
     # extract data 
     try: 
      line = [self.company_number, 
        data['company_name'], 
        data['registered_office_address'].get('premises', ''), 
        data['registered_office_address'].get('address_line_1', ''), 
        data['registered_office_address'].get('address_line_2', ''), 
        data['registered_office_address'].get('country', ''), 
        data['registered_office_address'].get('locality', ''), 
        data['registered_office_address'].get('postal_code', ''), 
        data['registered_office_address'].get('region', '')] 
     except KeyError: 
      line = ['' for i in range(0, 9)] 
     # save as pandas dataframe 
     return pd.DataFrame([line], columns=['company_number', 'company_name', 'company_address_premises', 
              'company_address_line_1', 'company_address_line_2', 
              'company_address_country', 'company_address_locality', 
              'company_address_postcode', 'company_address_region']) 


def name_splitter(name): 
    split = name.split(', ') 
    if len(split) > 2: 
     return [split[2], split[1], split[0]] 
    else: 
     return ['', split[1], split[0]] 


class Officers: 
    """Retrieve and hold officers details""" 
    def __init__(self, company_number): 
     self.company_number = company_number 

    def __call__(self): 
     """Create request and process data""" 
     # make request 
     req = CompaniesHouseRequest(self.company_number, '/officers') 
     data = req.retrieve() 
     # extract data 
     for officer in data['items']: 
      if officer['officer_role'] == 'director': 
       name = name_splitter(officer['name']) 
       line = [name[0], 
         name[1], 
         name[2], 
         officer.get('occupation'), 
         officer.get('country_of_residence'), 
         officer.get('nationality'), 
         officer.get('appointed_on', ''), 
         officer['address'].get('premises', ''), 
         officer['address'].get('address_line_1', ''), 
         officer['address'].get('address_line_2', ''), 
         officer['address'].get('country', ''), 
         officer['address'].get('locality', ''), 
         officer['address'].get('postal_code', ''), 
         officer['address'].get('region', '')] 
       break 
     director_count = sum(map(lambda x: x['officer_role'] == 'director', data['items'])) 
     if director_count > 1: 
      line += [True] 
     elif director_count == 1: 
      line += [False] 
     else: 
      line = ['no directors'] * 3 + [''] * 12 
     return pd.DataFrame([line], columns=['title', 'first_name', 'surname', 'occupation', 'country_of_residence', 
              'nationality', 'appointed_on', 
              'address_premises', 'address_line_1', 'address_line_2', 
              'address_country', 'address_locality', 'address_postcode', 
              'address_region', 'multi_director']) 


class Task: 
    def __init__(self, prefix, company_number): 
     self.prefix = prefix 
     self.company_number = company_number 

    def __call__(self): 
     """Find officer and company data and combine and save it""" 
     comp_id = self.prefix + str(self.company_number) 
     print(comp_id) 
     try: 
      # initialise company class 
      comp = Company(comp_id) 
      # initialise officer class 
      off = Officers(comp_id) 
      # retrieve and concatonate 
      self.result = pd.concat([comp(), off()], axis=1) 

     except NameError: 
      # become poison pill once latest is done 
      self.result = None 

    def output(self): 
     return self.result 


class Consumer(multiprocessing.Process): 
    """Handle process and re-queue complete tasks""" 
    def __init__(self, waiting_queue, complete_queue): 
     multiprocessing.Process.__init__(self) 
     self.waiting_queue = waiting_queue 
     self.complete_queue = complete_queue 

    def run(self): 
     """process tasks until queue is empty""" 
     proc_name = self.name 
     while True: 
      current_task = self.waiting_queue.get() 
      current_task() 
      if current_task.output() is None: 
       print('{}: Exiting, poison pill reached'.format(proc_name)) 
       self.waiting_queue.task_done() 
       break 
      self.waiting_queue.task_done() 
      self.complete_queue.put(current_task) 
     print('{}: complete'.format(proc_name)) 


class Shepard: 
    """Handle life of Consumers, Queues and Tasks""" 
    def __init__(self): 
     pass 

    def __call__(self, prefix, start_point): 

     # initialize queues 
     todo = multiprocessing.JoinableQueue() 
     finished = multiprocessing.JoinableQueue() 

     # start consumers 
     num_consumers = multiprocessing.cpu_count() * 2 
     consumers = [Consumer(todo, finished) for i in range(num_consumers)] 
     for q in consumers: 
      q.start() 

     # decide on (max) end limit 
     start = int(start_point) 
     max_record_range = 1000 
     end = start + max_record_range 

     # Enqueue jobs 
     for i in range(start, end): 
      todo.put(Task(prefix, i)) 
     print('Processes joining') 

     # wait for processes to join 
     for p in consumers: 
      p.join() 
     print('Processes joined') 

     # process results - UNFINISHED 
     pass 

     # return results - UNFINISHED 
     return 'results!' 


if __name__ == '__main__': 
    # paths to data 
    data_directory = r'C:\Users\hdewinton\OneDrive - Advanced Payment Solutions\Python\Corporate DM\data' 
    base = r'\base' 

    # load start points: 
    init = {"England": 10926071, "Scotland": 574309, "Ireland": 647561} 

    # gather data for each catagory 
    master = Shepard() 
    ireland = master('NI', init['Ireland']) 
    scotland = master('SC', init['Scotland']) 
    england = master('', init['England']) 
+0

Quelle est l'erreur que vous obtenez? –

+0

Il n'y a pas d'erreur, les consommateurs courent jusqu'à leur dernière déclaration d'impression, mais ne parviennent pas à rejoindre –

Répondre

0

TL; DR - la conséquence (se coincer dans les limbes alors que les consommateurs ne parviennent pas à se joindre) peut être corrigé en changeant ceci:

finished = multiprocessing.JoinableQueue() 

à ceci:

mananger = multiprocessing.Manager() 
finished = mananger.Queue() 

Détails - « Quand un objet est mis sur une file d'attente, l'objet est décapé et un thread d'arrière débusque plus tard, les données marinées à une conduite sous-jacente. Cela a quelques conséquences qui sont un peu surprenantes, mais qui ne devraient pas causer de difficultés pratiques - si elles vous dérangent, alors vous pouvez utiliser une file d'attente créée avec un gestionnaire. "Du documentation

, déclenche l'une des précitées précisions si un certain nombre de tâches y sont ajoutées.En dessous de la limite il n'y a pas de problèmes et au-dessus de la limite conséquence se produit.Cela ne se produit pas dans le mannequin parce que la deuxième file d'attente, La limite dépend de la taille et de la complexité des objets Task, donc je reconnais que cela a quelque chose à voir avec le vidage des données décapées apparaissant seulement après qu'un certain volume de données a été atteint - le volume de données déclenche cette conséquence

Additif - Une autre erreur apparaît également une fois que la solution a été mise en oeuvre: une erreur de tuyau se produit en tant que consommateurs de la file d'attente todo sont terminées avant que la file d'attente est vide en laissant le tuyau dans l'objet file d'attente sans objet de connexion pour envoyer des données. Cela déclenche un WinError 232. Ne vous inquiétez pas, l'erreur de tuyau peut être résolue en vidant la file d'attente avant de quitter les consommateurs. simplement ajouter à la méthode consommateurs d'exécution de classe:

while not self.waiting_queue.empty(): 
      try: 
       self.waiting_queue.get(timeout=0.001) 
      except: 
       pass 
     self.waiting_queue.close() 

cela supprime tous les éléments de la file d'attente, assurez-vous que son après la boucle principale while et l'erreur de conduite ne doit pas se produire parce que les consommateurs videront la volonté file d'attente avant se terminant.