2011-06-14 23 views
40

J'utilise celerybeat pour lancer une tâche principale qui débute d'un certain nombre de tâches secondaires. J'ai déjà écrit les deux tâches.Céleri tâche qui exécute plus de tâches

Y at-il un moyen de le faire facilement? Est-ce que Celery permet d'exécuter des tâches depuis des tâches?

Mon exemple:

@task 
def compute(users=None): 
    if users is None: 
     users = User.objects.all() 

    tasks = [] 
    for user in users: 
     tasks.append(compute_for_user.subtask((user.id,))) 

    job = TaskSet(tasks) 
    job.apply_async() # raises a IOError: Socket closed 

@task 
def compute_for_user(user_id): 
    #do some stuff 

compute est appelée à partir celerybeat, mais provoque une IOError quand il tente d'exécuter apply_async. Des idées?

+0

http://celeryproject.org/docs/userguide/tasksets.html – bdd

+0

peut être un taskset lancé à partir d'une tâche? –

+6

Les tâches et les ensembles de tâches peuvent être appliqués à partir d'une tâche, mais vous ne devriez jamais attendre leurs résultats (voir http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks) – asksol

Répondre

23

Pour répondre à vos questions d'ouverture: Depuis la version 2.0, Celery fournit un moyen facile de démarrer des tâches à partir d'autres tâches. Ce que vous appelez des «tâches secondaires» sont ce qu'il appelle des «sous-tâches». Voir la documentation pour Sets of tasks, Subtasks and Callbacks, que @Paperino a eu la gentillesse de lier.

Pour la version 3.0, Celery a été modifié en utilisant groups pour ce type de comportement et d'autres.

Votre code indique que vous connaissez déjà cette interface. Votre vraie question semble être: «Pourquoi est-ce que je reçois un 'Socket Closed' IOError quand j'essaie d'exécuter mon ensemble de sous-tâches? Je ne pense pas que quiconque puisse répondre à cette question, car vous n'avez pas fourni suffisamment d'informations sur votre programme. Votre extrait ne peut pas être exécuté tel quel, nous ne pouvons donc pas examiner le problème que vous vous posez. S'il vous plaît poster le stacktrace fourni avec le IOError, et avec un peu de chance, quelqu'un qui peut vous aider avec votre crasher viendra.

6

Vous pouvez utiliser quelque chose comme ça (Support à 3,0)

g = group(compute_for_user.s(user.id) for user in users) 
g.apply_async() 
+0

Donc, dans votre implémentation, la méthode "compute (users = None):" n'est pas du tout nécessaire, oui? –

+1

Cela n'est pas recommandé si vous souhaitez attendre la fin des sous-tâches et récupérer le résultat. – Siddharth

Questions connexes