Qu'est-ce que vous avez besoin est de mettre
CELERY_ACKS_LATE = True
ack tardive signifie que les messages de tâche seront reconnus une fois la tâche exécutée, non seulement avant, ce qui est le comportement par défaut. De cette façon, si le travailleur plante le lapin MQ aura toujours le message.
Il est évident qu'il n'y a aucun moyen de récupérer la tâche en cas de plantage total (Rabbit + workers), sauf si vous implémentez une connexion au début et à la fin de la tâche. Personnellement, j'écris une ligne mongodb chaque fois qu'une tâche commence et une autre quand la tâche se termine (indépendamment du résultat), de cette façon je peux savoir quelle tâche a été interrompue en analysant les journaux de mongo.
Vous pouvez le faire facilement en remplaçant les méthodes __call__
et after_return
de la classe de tâche de base de céleri. Après, vous voyez une partie de mon code qui utilise une classe taskLogger en tant que gestionnaire de contexte (avec des points d'entrée et de sortie). La classe taskLogger écrit simplement une ligne contenant les informations sur la tâche dans une instance mongodb.
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
#Inizialize context managers
self.taskLogger = TaskLogger(args, kwargs)
self.taskLogger.__enter__()
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
J'espère que cela pourrait aider