2017-09-21 2 views
0

Je le module parallel_executor.py suivant que j'utiliser pour exécuter plusieurs processus simultanément,Log sortie de processus dans le fichier journal séparé

import time 
from multiprocessing import Process 

class ParallelExecutor(object): 
    def __init__(self, pool_size=10): 
    self._pool_size = pool_size 
    self._processes = [] 
    self._results = [] 

def add_task(self, target, args=None, kwargs=None): 
    args = [] if not args else args 
    kwargs = {} if not kwargs else kwargs 
    index = len(self._processes) 
    process_args = (index, target, args, kwargs) 
    process = Process(target=self._executor, args=process_args) 
    self._processes.append(process) 
    result = {'result': None, 'end_time': 0, 'completed': False} 
    self._results.append(result) 
    return index 

def run(self, block=True): 
    if not block: 
     for process in self._processes: 
     process.start() 
     return None 
    else: 
     counter = 0 
     processes = [] 
     for process in self._processes: 
     processes.append(process) 
     process.start() 
     if counter >= self._pool_size: 

      # Wait for completion and reset counters. 
      for i in range(len(processes)): 
      processes[i].join() 
      processes = [] 
      counter = 0 
      continue 
     counter += 1 

     # Wait for the left over processes to complete. 
     if len(processes) > 0: 
     for i in range(len(processes)): 
      processes[i].join() 
     return self._results 



def _executor(self, index, target, args, kwargs): 
    try: 
     self._results[index]['result'] = target(*args, **kwargs) 
     self._results[index]['end_time'] = int(round((time.time()))) 
     self._results[index]['completed'] = True 
    except Exception as exc: 
     self._results[index]['exception'] = exc 
     self._results[index]['completed'] = True 
     raise 

Et je l'utilise comme suit (example.py):

from framework.lib.parallel_executor import ParallelExecutor 
import time 
import os 

def foo(x): 
    for i in range(3): 
     print x 
     time.sleep(0.5) 

    return 123 

def main(): 
    runner = ParallelExecutor() 
    runner.add_task(foo, ["This"]) 
    runner.add_task(foo, ["is"]) 
    runner.add_task(foo, ["a"]) 
    runner.add_task(foo, ["test"]) 

    runner.run() 
    runner.wait_for_executor_to_finish() 
    for i in runner.get_results(): 
     print i 

main() 

Ma question est comment j'imprime l'ID de processus avec chaque instruction de 'foo' qui est imprimée à la sortie en n'apportant des modifications qu'au module parallel_executor.py et ne touchant pas le fichier example.py, de sorte que plus tard je pourrais effectuer un 'grep' sur les sorties d'un processus particulier.

Répondre

0

Vous ne pouvez pas le faire sans modifier l'exemple du tout, mais vous pouvez réaliser ce que vous voulez avec une toute petite modification. En utilisant les fonctionnalités Python logging, vous pouvez définir le message de journal par défaut en vous assurant que chaque ligne de log respecte votre standard.

Dans le parallel_executor.py ajouter ce qui suit:

import logging 

log_format = "%(process)d: %(message)s" 
logging.basicConfig(level=logging.INFO, format=log_format) 

Dans l'exemple remplacer la ligne:

print x 

avec:

logging.info(x) 

Et vous verrez vos messages apparaissant comme:

34321: message content here