Vous pouvez spécifier une file d'attente différente pour chaque tâche chaînée et chaque tâche de rappel de groupe/d'accord.
Snippet comme:
@shared_task(name="analyze_atom", queue="atom")
def analyze_atom(image_urls, targetdir=target_path, studentuid=None):
return {}
@shared_task(name="summary_up", queue="summary")
def summary_up(rets, studentuid, images):
return {}
chord(analyze_atom.s([image]) for image in images)(summary_up.s(studentuid, images))
Et, lorsque les tâches en cours d'exécution, vous pouvez vérifier le contenu de courtier, supposons que vous utilisez rabbitmq en tant que courtier, vous pouvez vérifier la profondeur de file d'attente par plug-in de gestion de rabbitmq, ou extrait d'interface pyrabbit ici:
from pyrabbit.api import Client
cl = Client('localhost:15672', 'guest', 'guest')
count = cl.get_queue_depth('/', 'summary') # this guy check queue depth
cl.get_messages('/','paperanalyzer') # this guy get messages within queue
Et, vous devriez avoir le backend de résultat, vous pourriez obtenir chaque résultat de tâche par l'identification de tâche.
Je pense aux compétences ci-dessus, il est facile d'inspecter comment céleri tâche continue.
Bonne chance :-)