2017-08-29 3 views
0

Je veux faire quelque chose dans multiprocessing, et je veux reporter résultat get, quelque chose comme ceci:Comment faire déférer tordu obtenir le résultat de la fonction?

from multiprocessing import Pool 
from twisted.internet import defer 
import time 

def f(x): 
    time.sleep(0.5) 
    print(x) 
    return x*x 

pool = Pool(processes=4)    # start 4 worker processes 
def get_result(i): 
    res = pool.apply_async(f, (i,)) # do work in process pool 
    return defer.Deferred(res.get()) # now, I want to make process do something else, so it should not be blocked 

def main(): 
    from twisted.internet import reactor 

    @defer.inlineCallbacks 
    def _run(): 
     for i in range(4): 
      yield get_result(i) 

     reactor.stop() 
    reactor.callLater(1, _run) 
    reactor.run() 


if __name__ == '__main__': 
    main() 

Répondre

0

Pool.apply_async() a une callback arg que vous pouvez exploiter pour lancer la chaîne de rappel dans le Deferred. Le catch (qui est absolument crucial à retenir) est que la fonction Pool-callback sera exécutée dans un autre thread! Par conséquent, vous devez appeler reactor.callFromThread lors de l'application du résultat au Deferred afin que la chaîne de rappel se produise dans le même thread que le reactor. Si vous ne le faites pas, les rappels seront exécutés dans un thread différent dans lequel le réacteur n'a aucun contexte. Voici un exemple légèrement modifié:

from functools import partial 
from multiprocessing import Pool 
import threading 
import time 

from twisted.internet import defer, reactor 


def f(x): 
    time.sleep(5) 
    return x*x 

def get_result(pool, i): 
    deferred = defer.Deferred() # create a Deferred that will eventually provide a result 
    _set_result = partial(set_result, deferred=deferred) # pass the Deferred to the apply_async callback 
    pool.apply_async(f, args=(i,), callback=_set_result) # execute in a separate process, supply callback fn 
    return deferred 

def set_result(result, deferred): 
    """ 
    Set the result in the deferred 
    """ 
    print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result)) 
    reactor.callFromThread(deferred.callback, result) # execute the Deferred callback chain from the reactor thread 

def display_result(result): 
    """ 
    Just display the result 
    """ 
    print('Thread ID: %d, Display %d' % (threading.get_ident(), result)) 

def kill_reactor(null): 
    print('Thread ID: %d, Stopping reactor' % threading.get_ident()) 
    reactor.stop() 

def main(): 
    print('Thread ID: %d, Main' % threading.get_ident()) 
    pool = Pool(processes=4) 
    d = get_result(pool, 3) 
    d.addCallback(display_result) 
    d.addCallback(kill_reactor) 

    reactor.run() 

main() 


#---------- OUTPUT ----------# 
# Thread ID: 803872, Main 
# Thread ID: 533632, Setting result 9 
# Thread ID: 803872, Display 9 
# Thread ID: 803872, Stopping reactor 

J'ai imprimé l'identifiant de fil de sorte que vous pouvez voir que set_result() est en effet appelé dans un autre thread (processus?) Et non dans le thread principal, ce fil de réacteur. Ommitant reactor.callFromThread(deferred.callback, result) dans cet exemple provoquera les callbacks à exécuter dans un fil que reactor.stop() ne fonctionnera pas et Twisted vomit vomit (retraçage) partout! Pensez à utiliser à reactor.spawnProcess car cela limitera les erreurs que vous (ou moi) ferions autrement. Et comme toujours, si vous pouvez faire ce que vous voulez, dans un seul fil et omettre multiprocessing ou threading, je vous suggère de le faire à la place.