Pool.apply_async()
a une callback
arg que vous pouvez exploiter pour lancer la chaîne de rappel dans le Deferred
. Le catch (qui est absolument crucial à retenir) est que la fonction Pool-callback sera exécutée dans un autre thread! Par conséquent, vous devez appeler reactor.callFromThread
lors de l'application du résultat au Deferred
afin que la chaîne de rappel se produise dans le même thread que le reactor
. Si vous ne le faites pas, les rappels seront exécutés dans un thread différent dans lequel le réacteur n'a aucun contexte. Voici un exemple légèrement modifié:
from functools import partial
from multiprocessing import Pool
import threading
import time
from twisted.internet import defer, reactor
def f(x):
time.sleep(5)
return x*x
def get_result(pool, i):
deferred = defer.Deferred() # create a Deferred that will eventually provide a result
_set_result = partial(set_result, deferred=deferred) # pass the Deferred to the apply_async callback
pool.apply_async(f, args=(i,), callback=_set_result) # execute in a separate process, supply callback fn
return deferred
def set_result(result, deferred):
"""
Set the result in the deferred
"""
print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result))
reactor.callFromThread(deferred.callback, result) # execute the Deferred callback chain from the reactor thread
def display_result(result):
"""
Just display the result
"""
print('Thread ID: %d, Display %d' % (threading.get_ident(), result))
def kill_reactor(null):
print('Thread ID: %d, Stopping reactor' % threading.get_ident())
reactor.stop()
def main():
print('Thread ID: %d, Main' % threading.get_ident())
pool = Pool(processes=4)
d = get_result(pool, 3)
d.addCallback(display_result)
d.addCallback(kill_reactor)
reactor.run()
main()
#---------- OUTPUT ----------#
# Thread ID: 803872, Main
# Thread ID: 533632, Setting result 9
# Thread ID: 803872, Display 9
# Thread ID: 803872, Stopping reactor
J'ai imprimé l'identifiant de fil de sorte que vous pouvez voir que set_result()
est en effet appelé dans un autre thread (processus?) Et non dans le thread principal, ce fil de réacteur. Ommitant reactor.callFromThread(deferred.callback, result)
dans cet exemple provoquera les callbacks à exécuter dans un fil que reactor.stop()
ne fonctionnera pas et Twisted vomit vomit (retraçage) partout! Pensez à utiliser à reactor.spawnProcess
car cela limitera les erreurs que vous (ou moi) ferions autrement. Et comme toujours, si vous pouvez faire ce que vous voulez, dans un seul fil et omettre multiprocessing
ou threading
, je vous suggère de le faire à la place.