2011-03-17 4 views
14

Existe-t-il un moyen de déterminer si une tâche est perdue et de la réessayer?Réessayer des tâches perdues ou échouées (Celery, Django et RabbitMQ)

Je pense que la raison de la perte peut être un bug de distributeur ou un incident de thread de travail. J'ai planifié de les réessayer mais je ne suis pas sûr de savoir comment déterminer quelles tâches doivent être réessayées?

Et comment faire ce processus automatiquement? Puis-je utiliser mon propre planificateur personnalisé qui va créer de nouvelles tâches?

Éditer: J'ai trouvé dans la documentation que RabbitMQ ne perd jamais de tâches, mais que se passe-t-il lorsque le thread de travail plante au milieu de l'exécution de la tâche?

Répondre

26

Qu'est-ce que vous avez besoin est de mettre

CELERY_ACKS_LATE = True

ack tardive signifie que les messages de tâche seront reconnus une fois la tâche exécutée, non seulement avant, ce qui est le comportement par défaut. De cette façon, si le travailleur plante le lapin MQ aura toujours le message.

Il est évident qu'il n'y a aucun moyen de récupérer la tâche en cas de plantage total (Rabbit + workers), sauf si vous implémentez une connexion au début et à la fin de la tâche. Personnellement, j'écris une ligne mongodb chaque fois qu'une tâche commence et une autre quand la tâche se termine (indépendamment du résultat), de cette façon je peux savoir quelle tâche a été interrompue en analysant les journaux de mongo.

Vous pouvez le faire facilement en remplaçant les méthodes __call__ et after_return de la classe de tâche de base de céleri. Après, vous voyez une partie de mon code qui utilise une classe taskLogger en tant que gestionnaire de contexte (avec des points d'entrée et de sortie). La classe taskLogger écrit simplement une ligne contenant les informations sur la tâche dans une instance mongodb.

def __call__(self, *args, **kwargs): 
    """In celery task this function call the run method, here you can 
    set some environment variable before the run of the task""" 

    #Inizialize context managers  

    self.taskLogger = TaskLogger(args, kwargs) 
    self.taskLogger.__enter__() 

    return self.run(*args, **kwargs) 

def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    #exit point for context managers 
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo) 

J'espère que cela pourrait aider

Questions connexes