2013-05-14 1 views
5

J'ai un problème avec fraie asynchrone avec délai d'attente sous-processus en Python 3.asynchrone avec délai d'attente sous-processus

Ce que je veux atteindre: je veux reproduire plusieurs processus de manière asynchrone sans attendre un résultat mais je veux aussi être assuré que chaque processus engendré se terminera dans un délai donné.

J'ai rencontré des problèmes similaires ici: Using module 'subprocess' with timeout et Asynchronous background processes in Python? mais ils ne résolvent pas mon problème.

Mon code ressemble à ceci. J'ai la classe de commandement comme suggéré dans Using module 'subprocess' with timeout:

class Command(object): 
    def __init__(self, cmd): 
    self.cmd = cmd 
    self.process = None 

    def run(self, timeout): 
    def target(): 
     print('Thread started') 
     args = shlex.split(self.cmd) 
     self.process = subprocess.Popen(args, shell=True) 
     self.process.communicate() 
     print('Thread finished') 

    thread = threading.Thread(target=target) 
    thread.start() 

    thread.join(timeout) 
    if thread.is_alive(): 
     print('Terminating process') 
     self.process.terminate() 
     thread.join() 

puis quand je veux reproduire: sous-processus

for system in systems: 
    for service in to_spawn_system_info: 
    command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir, 
     service, system, service_log_dir) 
    command = Command(command_str) 
    command.run(timeout=60) 

Quand je lance cette sortie semble attendre que chaque commande à lancer et à la fin. Je reçois

Thread started 
Thread finished 
Thread started 
Thread finished 
Thread started 
Thread finished 
Thread started 
Thread finished 

Donc, ma question est ce que je fais mal? Maintenant, je commence à me demander s'il est possible d'engendrer un processus et de limiter son exécution par un dépassement de temps. Pourquoi ai-je besoin de cela? Le script spawner s'exécutera dans cron. Il sera exécuté toutes les 10 minutes et il doit générer environ 20 sous-processus. Je veux garantir que chaque sous-processus se terminera avant que le script ne soit à nouveau exécuté depuis cron.

Répondre

3

Comme mentionné précédemment, l'appel à process.communicate() fait que votre code attend l'achèvement du sous-processus. Toutefois, si vous supprimez simplement l'appel de communiquez avec(), le thread se fermera immédiatement après la création du processus, provoquant votre thread.L'appel join() pour quitter trop tôt, et vous allez tuer le sous-processus prématurément. Pour faire ce que vous voulez sans attendre vote ou occupé, vous pouvez régler une minuterie qui va tuer le processus (et le fil de coureur) après un délai d'attente si le processus n'a pas encore fini:

class Command(object): 
    def __init__(self, cmd): 
    self.cmd = cmd 
    self.process = None 

    def run(self, timeout): 
    def target(): 
     print('Thread started') 
     # May want/need to skip the shlex.split() when using shell=True 
     # See Popen() constructor docs on 'shell' argument for more detail. 
     args = shlex.split(self.cmd) 
     self.process = subprocess.Popen(args, shell=True) 
     self.timer.start() 
     self.process.wait() 
     self.timer.cancel() 

    def timer_callback(): 
     print('Terminating process (timed out)') 
     self.process.terminate() 

    thread = threading.Thread(target=target) 
    self.timer = threading.Timer(timeout, timer_callback) 
    thread.start() 
+0

Lorsque j'ai essayé cette solution, elle ne met pas fin à mes threads après l'expiration du délai. J'ai réglé le timeout à 1 sec et ajouté time.sleep (1) dans la fonction cible. Aucun thread n'a été arrêté. – sebast26

+0

Hrmm. Le thread doit se terminer lorsque target() se termine. Gardez à l'esprit que, comme indiqué ci-dessus, vous n'obtiendrez pas une impression si le processus se termine normalement sans temporisation. Je vais regarder de plus près, j'ai peut-être oublié quelque chose. – mshildt

+0

Ainsi, si le thread se termine avant la fin du sous-processus, le sous-processus se termine? C'est le contraire de ce que dit unutbu. Il a dit "alors chaque sous-processus que vous engendrerez subsistera même après la fin de vos discussions". J'avais également l'impression qu'un sous-processus allait continuer. – b10hazard

0
from threading import * 
from time import time 
import shlex 
import subprocess 
from random import randint 
class Worker(Thread): 
    def __init__(self, param, cmd, timeout=10): 
     self.cmd = cmd 
     self.timeout = timeout 

     Thread.__init__(self) 
     self.name = param 
    def run(self): 
     startup = time() 
     print(self.name + ' is starting') 

     args = shlex.split(self.cmd) 
     #Shell should be false when given a list (True for strings) 
     process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) 

     while time()-startup <= self.timeout: 
      if process.poll() != None: 
       break 

     process.stdout.close() 
     process.stdin.close() 
     process.stderr.close() 

     print(self.name + ' is dead') 

for i in range(0, 100): 
    x = Worker('Name-'+str(i), 'ping -n ' + str(randint(0,5)) + ' www.google.se') 
    x.start() 

while len(enumerate()) > 1: 
    pass # Wait for the threads to die 

Cela pourrait-il simplifier votre méthode de travail? D'autant plus que vous n'avez pas besoin d'attendre un résultat, cela va juste lancer un objet de classe dans l'espace extérieur effectuant un travail pour vous avec un timeout de c.

A noter également:

  • stdout fermeture Non, stdin et stderr causeront « Pour de nombreux descripteurs de fichiers ouverts » sur presque tous les systèmes
  • Comme indiqué dans une autre réponse, .communicate() attend un processus pour quitter (utilisez .poll() à la place)
1

Utilisez des fils qui commencent et se terminent indépendamment les uns des autres. Cette méthode serait utile si vous connaissiez toutes les commandes que vous vouliez exécuter à l'avance. Voici un exemple ...

from threading import Thread 
import subprocess 
import Queue 
import multiprocessing 

class Command(object): 
    def __init__(self, cmds): 
     self.cmds = cmds 

    def run_cmds(self): 
     cmd_queue = Queue.Queue() 
     for cmd in self.cmds: 
      cmd_queue.put(cmd) 

     available_threads = multiprocessing.cpu_count() 
     for x in range(0,available_threads): 
      t = Thread(target=self.run_cmd,args=(cmd_queue,)) 
      t.setDaemon(True) 
      t.start() 

     cmd_queue.join() 


    def run_cmd(self, cmd_queue): 
     while True: 
      try: cmd = cmd_queue.get() 
      except: break 
      print 'Thread started' 
      process = subprocess.Popen(cmd, shell=True) 
      process.communicate() 
      print 'Thread finished' 
      cmd_queue.task_done() 


# create list of commands you want to run 
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop'] 
# create class 
c = Command(cmds) 
# run them... 
c.run_cmds() 

Ce imprimerait ....

Thread started 
Thread started 
Thread started 
Thread startedThread finished 

Thread started 
Thread finishedThread finished 

Thread finished 
Thread finished 

Comme vous pouvez le voir sur la sortie des sous-processus début et de fin indépendamment l'un de l'autre et n'attend une autre sous-processus sous-processus à terminer car ils sont tous appelés dans des threads différents. Naturellement, vous pouvez ajouter des délais et tout ce que vous voulez, ce n'est qu'un exemple simple. Cela suppose que vous connaissez toutes les commandes que vous voulez exécuter. Si vous souhaitez ajouter un délai d'attente, consultez la réponse d'epicbrews. Vous pouvez incorporer son exemple de timeout de thread dans celui-ci si vous le souhaitez.

+0

Tout comme j'ai dans mon exemple ? : P Altho je ne l'ai pas décrit aussi propre que vous. – Torxed

+0

En fait, votre exemple avait process.communicate() quand j'ai écrit ma réponse. Sinon, je n'aurais pas répondu. Je vois dans l'historique d'édition que vous l'avez supprimé. – b10hazard

+0

Yepp, mais l'a enlevé dès que je pouvais avant votre poste parce que je viens de coller son code là-bas pour l'obtenir avant que ma connexion se soit brisée (sur un train donc tous les 2 min DC) :) – Torxed

Questions connexes