2017-09-12 1 views
0

Lorsque j'utilise le groupe de céleri et les chaînes pour planifier les tâches ci-dessousQuelqu'un peut-il m'expliquer comment fonctionne le groupe de céleri?

(group([group_task]) | sum_task).apply_async() 

les tâches de groupe peuvent être exécutées dans de nombreux travailleurs, après que toutes les tâches de groupe terminées, sum_task commencer à exécuter (peut-être dans l'autre travailleur), donc qui peut me dire comment céleri connu les tâches de groupe sont tous terminés et ensuite commencé le sum_task?

Répondre

1

Vous pouvez spécifier une file d'attente différente pour chaque tâche chaînée et chaque tâche de rappel de groupe/d'accord.

Snippet comme:

@shared_task(name="analyze_atom", queue="atom") 
def analyze_atom(image_urls, targetdir=target_path, studentuid=None): 
    return {} 


@shared_task(name="summary_up", queue="summary") 
def summary_up(rets, studentuid, images): 
    return {} 


chord(analyze_atom.s([image]) for image in images)(summary_up.s(studentuid, images)) 

Et, lorsque les tâches en cours d'exécution, vous pouvez vérifier le contenu de courtier, supposons que vous utilisez rabbitmq en tant que courtier, vous pouvez vérifier la profondeur de file d'attente par plug-in de gestion de rabbitmq, ou extrait d'interface pyrabbit ici:

from pyrabbit.api import Client 
cl = Client('localhost:15672', 'guest', 'guest') 
count = cl.get_queue_depth('/', 'summary') # this guy check queue depth 
cl.get_messages('/','paperanalyzer') # this guy get messages within queue 

Et, vous devriez avoir le backend de résultat, vous pourriez obtenir chaque résultat de tâche par l'identification de tâche.

Je pense aux compétences ci-dessus, il est facile d'inspecter comment céleri tâche continue.

Bonne chance :-)