j'ai besoin du flux de travail suivant:Céleri - une tâche exécute des tâches N, les attend et traite les résultats
ParentTask
est exécuté d'abord- À un moment donné, il engendre N instances de
ChildTask
, qui courent en parallèle ParentTask
attend ceux à la fin, recueille les résultats, les processus en quelque sorte et se termine
Cela semble être très facile. Malheureusement, appeler Task().delay()
(que j'utilise pour appeler des tâches) à partir d'une tâche semble être complètement ignoré. Je suis complètement perdu ici.
Si vous aimez le code approche plus, je l'inclue aussi bien.
from celery.task import Task
from celery.result import AsyncResult
class ParentTask(Task):
def run(self, *args, **kwargs):
# do some stuff
ids = [ChildTask().delay().id for _ in range(N)] # this seems to do nothing here
results = [AsyncResult(t) for t in ids]
while not all([r.ready() for r in results]): # wait for child tasks to finish
sleep(.100)
# do some stuff again
# return results
class ChildTask(Task):
def run(self, *args, **kwargs):
# do some child stuff
# return child results
ParentTask().delay() # this delay works fine
Merci pour toutes les idées!
Vous avez besoin de [Canvas] (http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups) –
Puis-je combiner ces groupes, chaînes, accords, etc. Si oui, pourriez-vous s'il vous plaît écrire un extrait de code pour moi qui montre l'utilisation? – karlosss