2017-06-18 3 views
1

Je suis nouveau à céleri. J'ai une tâche de céleri qui doit être exécutée lorsqu'une condition est remplie. Sinon, réessayez après quelques minutes. À partir du code ci-dessous, je suis bloqué sur la façon de réessayer la même tâche dans d'autres conditions? Apprécier ton aide.comment exécuter des tâches de céleri conditionnellement python

@app.task(bind=True,soft_time_limit=4 * 3600) 
def task_message_queue(id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"): 
    ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest}) 
    num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0] 
    if num_files < 20: 
    #Move files from src to destination 
    else: 
    #wait for 2 minutes and retry the task 

Répondre

1

Vous devez appeler retry pour faire le céleri relancez la tâche et vous pouvez régler le compte à rebours pour le céleri attendra beaucoup de temps et essayer de nouveau la tâche. Ci-dessous est le code emprunté de celery docs officiel. Modifier le @task décorateur en fonction de vos besoins et aussi self.retry

from celery.task import task 
@app.task(bind=True,soft_time_limit=4 * 3600) 
def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"): 
    ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest}) 
    num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0] 
    try: 
     if num_files < 20: 
      #Move files from src to destination 
     else: 
      raise SOME_EXCEPTION 
      #wait for 2 minutes and retry the task 
    except SOME_EXCEPTION as exc: 
     self.retry(exc=exc, countdown=TIME_TO_WAIT_BEFORE_RETRY) 
+0

Merci Arpit. Donc, j'ai ajouté une "augmentation" dans d'autres conditions. et self.retry à l'intérieur de l'exception. Je vais le tester. – user2406718

+0

Eh bien, en fait, vous n'avez pas besoin d'une condition de relèvement, si cela fonctionne ici –

+0

si je ne soulève pas l'exception, alors "exc" n'est pas défini. J'ai aussi eu un problème même lorsque je soulève exception, parce que exc ne obtient pas la tâche réelle à exécuter – user2406718

2

Au lieu de compter sur retry vous pourriez également déclencher simplement à nouveau la tâche.

from celery.task import task 
@app.task(bind=True,soft_time_limit=4 * 3600) 
def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"): 
    ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest}) 
    num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0] 

    if num_files < 20: 
     #Move files from src to destination 
    else: 
     # Trigger the task again in 120 seconds. 
     task_message_queue.apply_async(countdown=120)