2017-08-31 5 views
0

Je rencontre un comportement très étrange avec le genre suivant de flux de céleri:groupe toile Céleri des chaînes qui passent trop d'arguments à des tâches constitutives

workflow = group(
    chain(task1.s(), task2.s()), 
    chain(task3.s(), task4.s()), 
) 

Ceci est dans le contexte de django.

Quand j'appelle le flux de travail comme suit:

workflow.apply_async((n,))

... pour toute valeur entière de n, la première tâche dans chaque chaîne (task1 et task3) échouera avec un TypeError comme ce qui suit (prises de celery events):

args: [9, 8, 7, 5, 4, 3] 
    kwargs: {} 
    retries: 0 
    exception: TypeError('task1() takes exactly 1 argument (6 given)',) 
    state: FAILURE 

les arguments après la première sont toujours des arguments que le flux de travail a déjà été appelé avec. Donc, dans cet exemple, j'ai appelé workflow.apply_async((9,)), à cette occasion, et les autres nombres sont des valeurs qui ont été transmises lors d'occasions précédentes. À chaque fois, les arguments erronés passés à task1 et task3 seront les mêmes.

Je suis tenté de publier ceci comme rapport de bogue au céleri, mais je ne suis pas encore certain que l'erreur ne m'appartienne pas d'une manière ou d'une autre.

Ce que j'ai exclu:

  • Je passe certainement les arguments que je pense que je passe à workflow.apply_async. J'ai séparément construit et enregistré le tuple que je passe, pour m'assurer de cela.
  • Il ne s'agit pas de passer une liste (c'est-à-dire mutable) à apply_async plutôt qu'un tuple. Je passe définitivement un tuple (c'est-à-dire immuable).

La seule chose que modérément inhabituel au sujet de mon installation, bien que je ne vois pas comment il est connecté, il task1 que et task3 sont configurés avec des files d'attente différentes.

Répondre

0

avait rencontré un problème similaire quand je travaillais avec task.chunks de céleri()

Je l'ai résolu en ayant la liste des éléments contenus dans un seul tuple. Par exemple,

supposer que la tâche log_i() est un shared_task qui enregistre essentiellement variable i, et je souhaite me connecter une liste de tous les i s par Chunking Je ferais -

# log_i shared Task 
@shared_task(bind=True) 
def log_i(self, i): 
    logger.info(i) 
    return i 

Et

# some calling site 
# ... 
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)] 
res = log_i.chunks(zip(very_long_list), 10)() 
print(res.get()) 

# ... 

note à moi que faire quelque chose comme -

# ... 
res = log_i.chunks(very_long_list, 10)() 
# ... 

échouera avec l'erreur dont vous parlez lorsque les éléments de la liste ne sont pas iterables. Le zipping déplace l'élément tel quel dans un nouveau tuple et vous pouvez le saisir dans un seul argument dans la tâche log_i.