2017-08-29 4 views
0

j'ai besoin du flux de travail suivant:Céleri - une tâche exécute des tâches N, les attend et traite les résultats

  • ParentTask est exécuté d'abord
  • À un moment donné, il engendre N instances de ChildTask, qui courent en parallèle
  • ParentTask attend ceux à la fin, recueille les résultats, les processus en quelque sorte et se termine

Cela semble être très facile. Malheureusement, appeler Task().delay() (que j'utilise pour appeler des tâches) à partir d'une tâche semble être complètement ignoré. Je suis complètement perdu ici.

Si vous aimez le code approche plus, je l'inclue aussi bien.

from celery.task import Task 
from celery.result import AsyncResult 

class ParentTask(Task): 
    def run(self, *args, **kwargs): 
     # do some stuff 
     ids = [ChildTask().delay().id for _ in range(N)] # this seems to do nothing here 
     results = [AsyncResult(t) for t in ids] 
     while not all([r.ready() for r in results]): # wait for child tasks to finish 
      sleep(.100) 
     # do some stuff again 
     # return results 

class ChildTask(Task): 
    def run(self, *args, **kwargs): 
     # do some child stuff 
     # return child results 

ParentTask().delay() # this delay works fine 

Merci pour toutes les idées!

+0

Vous avez besoin de [Canvas] (http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups) –

+0

Puis-je combiner ces groupes, chaînes, accords, etc. Si oui, pourriez-vous s'il vous plaît écrire un extrait de code pour moi qui montre l'utilisation? – karlosss

Répondre

0

Ok, je l'ai compris. Une approche de travail peut ressembler à ceci (bien sûr, les tâches peuvent faire tout ce qui est nécessaire):

from time import sleep 
from celery.task import Task 
from celery import chain, group 

class PreTask(Task): 
    def run(self, *args, **kwargs): 
     x = 0 
     for i in range(100000): 
      x += 1 
     return x 


class MidTask(Task): 
    def run(self, *args, **kwargs): 
     sleep(5) 
     return 42 


class PostTask(Task): 
    def run(self, *args, **kwargs): 
     return args 


# call it like this 
res = chain(PreTask().s() | group(MidTask().s() for _ in range(5)) | PostTask().s()).apply_async() 

# and get the result for example like this 
while(True): 
    if res.ready(): 
     print(res.get()) 
    sleep(1) 

Espérons que cela aide quelqu'un.