2017-07-17 3 views
1

Débutant ici Python et sympy. J'ai une question simple.quelle est la sémantique du multitraitement python. rejoindre avec timeout. Pourquoi ça pend

Si je démarre un worker en utilisant le multitraitement, et lui donne un petit délai d'attente disons 3 secondes, dans l'appel join. Que se passe-t-il si le travailleur est occupé lui-même, peut-être attendre que quelque chose se termine? que se passe-t-il lorsque le délai expire et que le travailleur attend lui-même sympy?

J'ai trouvé que si le travailleur appelle sympy sur un long calcul, la jointure se bloque en attente de l'opérateur, beaucoup plus longtemps que la période de délai.

Voici une MWE

from multiprocessing import Process, Queue 
from sympy import * 
x = symbols('x') 

def worker(integrand,que): 
    que.put(integrate(integrand, x)) 

if __name__ == '__main__':  
    que = Queue() 
    result = "still waiting " 

    #this first call works OK since it is easy integral 
    #action_process = Process(target=worker,args=(cos(x),que)) 

    #this one hangs inside sympy since hard integral 
    p = Process(target=worker,args=(1/(x**3+cos(x)),que)) 

    # start the process and wait for max of 3 seconds. 
    p.start() 
    p.join(timeout=3) 
    result=que.get()  
    print("result from worker is " + str(result)) 
    p.terminate() 

Question: Est-il possible de forcer le travailleur de mettre fin au délai d'attente? Y at-il une autre option à utiliser pour faire cela? Est-ce que je fais quelque chose de mal dans ce qui précède?

S'il n'est pas possible de forcer timeout à rejoindre, alors que signifient join() après n secondes, si le worker ne peut pas rejoindre? La seule raison pour laquelle j'essaye ce qui précède est de trouver un moyen de temporiser sur un long calcul en sympy, puisque je suis sur windows et les timers et les alarmes ne fonctionnent pas sur windows, donc j'ai pensé essayer le multiprocessing avec timeout, mais ne semble pas faire ce que je veux et par conséquent cela ne servira à rien ce que je veux.

ps. Si vous exécutez ce qui précède, le travailleur restera à l'intérieur sympy pendant une longue période, peut être 10 minutes ou plus, et pourrait devoir le tuer manuellement.

Ce qui précède est sur windows. Mais j'ai aussi essayé Linux, et ça se bloque aussi.

Anacode 4.3.1, Python 3.6

Mise à jour

Merci à allusion, en réponse, cela semble maintenant ci-dessous pour travailler

from multiprocessing import Process, Queue 
from sympy import * 
x = symbols('x') 

def worker(integrand,que): 
    que.put(integrate(integrand, x)) 

if __name__ == '__main__':  
    que = Queue() 
    result = "timed out " 

    #this one hangs inside sympy since hard integral 
    p = Process(target=worker,args=(1/(x**3+cos(x)),que)) 

    # start the process and wait for max of 3 seconds. 
    p.start() 
    p.join(timeout=3) 

    try: 
     result=que.get(block=False)  
    except: 
     print("timed out on que.get()") 
     pass 

    print("result from worker is " + str(result)) 
    p.terminate() 

Je dois encore tester plus à faire Je suis sûr que le processus de travail() sera tué par la fin, car je ne veux pas avoir de travailleurs zombies.

+0

Etes-vous sûr que cette fonction a une intégrale élémentaire? – Anis

+0

@Anis Celui-ci ne le fait pas. Mais c'est vraiment hors sujet. Je devais juste définir un délai d'attente sur les appels Sympy. – Nasser

Répondre

1

La raison pour laquelle votre code n'atteint jamais le print "à temps" est qu'il ne dépasse pas le result=que.get(). Vous avez spécifié un délai d'attente pour p.join qui fait reprendre le processus appelant après le délai spécifié. Toutefois, que.get bloque également et attend donc qu'un élément ait été placé dans la file d'attente. Vous pouvez également spécifier un délai d'expiration ici: que.get(timeout=...) ou simplement désactiver le blocage: que.get(block=False). Dans tous les cas, si aucun élément n'est disponible dans la file d'attente, il déclenchera une exception queue.Empty.

Toutefois, la documentation contient severalwarnings pour terminer un processus qui contient une référence (partagée) à une file d'attente. Les Programming Guidelines semblent contenir une solution dans ce cas et préviennent également d'éventuels blocages (voir "Rejoindre des processus qui utilisent des files d'attente").

+0

merci. J'ai ajouté du code mis à jour en utilisant 'que.get (block = False)' il semble fonctionner jusqu'à présent. Mais je ne suis pas sûr si j'ai encore besoin de vérifier quelque chose d'autre ou non. – Nasser