2015-10-19 1 views
3

J'écris en python une classe de planificateur personnalisé pour celerybeat basée sur le projet celerybeat-mongo qui fonctionne avec mongodb.Comment tester un planificateur personnalisé avec celerybeat?

En fait, j'essaie de faire en sorte que cela fonctionne avec couchbase au lieu de mongodb. j'ai écrit une classe ScheduleEntry et une classe de planificateur ainsi, je reçois la liste des ordonnanceurs d'un document Couchbase et l'analyse syntaxique en objets ScheduleEntry, etc ...

Mais quand je le lance comme dit dans ce link, rien ne semble arriver

celery -A <my.task.file> beat -S <my.scheduler.CouchBaseScheduler> 

Je suis assez nouveau pour le céleri, je l'ai déjà couru quelques travailleurs des tâches, mais je ne sais pas tout à fait exactement comment fonctionne le planificateur. Celerybeat démarre bien, je sais seulement qu'il lit mes plannings correctement depuis la base de données mais aucune tâche ne semble être appelée malgré le fait que j'ai indiqué les tâches dans mon fichier tasks.py.

Suis-je dans la bonne direction? La ligne de commande ci-dessus est-elle correcte? Comment pourrais-je le déboguer puisque ma seule façon de l'exécuter est à partir de la ligne de commande (en utilisant un sous-processus pour le démarrer à partir d'un script et le déboguer serait sale).

Edit: ajouter quelques détails à ce sujet:

Tout d'abord j'ai écrit une tâche de base dans un fichier tasks.py:

import celery 
import os 
from datetime import datetime 
from celery.utils.log import get_logger 

def log_task_info(task_name, process_index, init_date): 
    # logger.warn(task_name + ': ' + str(process_index) + ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' + 
    #    str(datetime.now())) 
    get_logger(__name__).warning(task_name + ': ' + str(process_index) + 
           ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' + str(datetime.now())) 

@celery.task(name='tasks.heartbeat') 
def heartbeat(): 
    log_task_info('heartbeat', os.getpid(), datetime.now()) 
    return "Hello!" 

Je subclassed les classes Scheduler et SchedulerEntry.

class CouchBaseScheduler(Scheduler): 

    UPDATE_INTERVAL = datetime.timedelta(seconds=5) 

    Entry = CouchBaseScheduleEntry 

    host = "192.168.59.103" 
    port = "8091" 
    bucket = "celery" 
    doc_string = "scheduler_list" 
    password = "1234" 
    scheduleCount = 0 

    def __init__(self, *args, **kwargs): 
     if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_BUCKET"): 
      bucket_str = current_app.conf.CELERY_COUCHBASE_SCHEDULER_BUCKET 
     else: 
      bucket_str = "celery" 
     if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_URL"): 
      cnx_string = "{}/{}".format(current_app.conf.CELERY_COUCHBASE_SCHEDULER_URL, bucket_str) 
     else: 
      cnx_string = "couchbase://{}:{}/{}".format(self.host, self.port, self.bucket) 

     try: 
      self.bucket = Bucket(cnx_string, password=self.password, quiet=True) 
      self.couchcel = CouchBaseCelery(self.bucket, self.doc_string) 
      get_logger(__name__).info("backend scheduler using %s", cnx_string) 
      self._schedule = {} 
      self._last_updated = None 
      Scheduler.__init__(self, *args, **kwargs) 
      self.max_interval = (kwargs.get('max_interval') 
          or self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or 5) 
     except AuthError: 
      get_logger(__name__).error("Couchbase connection %s failed : Auth failed!", cnx_string) 
     except CouchbaseError as cbe: 
      get_logger(__name__).debug("Couchbase connection %s failed : %s", cnx_string, type(cbe)) 


    def setup_schedule(self): 
     pass 

    def requires_update(self): 
     if not self._last_updated: 
      return True 
     return self._last_updated + self.UPDATE_INTERVAL < datetime.datetime.now() 

    def get_from_database(self): 
     self.sync() 
     try: 
      get_logger(__name__).info("Getting scheduler list from couchbase.") 
      couch_scheduler_list = self.couchcel.get_scheduler_list() 
      return couch_scheduler_list 
     except Exception as e: 
      get_logger(__name__).error("Could not get scheduler list from couchbase: {}".format(e)) 

    @property 
    def schedule(self): 
     # self.scheduleCount += 1 
     # get_logger(__name__).info("Scheduling {}".format(self.scheduleCount)) 
     if self.requires_update(): 
      get_logger(__name__).info("Schedule {} requires update".format(self.scheduleCount)) 
      self._schedule = self.get_from_database() 
      self._last_updated = datetime.datetime.now() 
     return self.schedule 

    def sync(self): 
     for entry in self._schedule.values(): 
      entry.save(self.couchcel) 

et

class CouchBaseScheduleEntry(ScheduleEntry): 

    def __init__(self, taskid, task): 
     self._task = task 

     self.app = current_app._get_current_object() 
     self._id = taskid 
     get_logger(__name__).info("Task id: {} processing".format(self._id)) 
     try: 
      if all(k in self._task for k in ('name', 'task', 'enabled')): 
       self.name = self._task['name'] 
       self.task = self._task['task'] 
      else: 
       raise Exception("Field name, task or enabled are mandatory!") 

      self.args = self._task['args'] 
      self.kwargs = self._task['kwargs'] 
      self.options = self._task['options'] 

      if 'interval' in self._task and 'crontab' in self._task: 
       raise Exception("Cannot define both interval and crontab schedule") 
      if 'interval' in self._task: 
       interval = self._task['interval'] 
       if interval['period'] in PERIODS: 
        self.schedule = self._interval_schedule(interval['period'], interval['every']) 
        get_logger(__name__).info("Task contains interval") 
       else: 
        raise Exception("The value of an interval must be {}".format(PERIODS)) 
      elif 'crontab' in self._task: 
       crontab = self._task['crontab'] 
       self.schedule = self._crontab_schedule(crontab) 
       get_logger(__name__).info("Task contains crontab") 
      else: 
       raise Exception("You must define interval or crontab schedule") 

      if self._task['total_run_count'] is None: 
       self._task['total_run_count'] = 0 
      self.total_run_count = self._task['total_run_count'] 
      get_logger(__name__).info("Task total run count: {}".format(self.total_run_count)) 

      if not self._task['last_run_at']: 
       self._task['last_run_at'] = self._default_now() 
      else: 
       self._task['last_run_at'] = datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT) 
      self.last_run_at = self._task['last_run_at'] 
      get_logger(__name__).info("Task last run at: {}".format(self.last_run_at)) 
     except KeyError as ke: 
      print('Key not valid: {}'.format(ke)) 

    def _default_now(self): 
     return self.app.now() 

    def next(self): 
     self._task['last_run_at'] = self.app.now() 
     self._task['total_run_count'] += 1 
     self._task['run_immediately'] = False 
     get_logger(__name__).info("NEXT!") 
     return self.__class__(self._task) 

    __next__ = next 

    def is_due(self): 
     if not self._task['enabled']: 
      return False, 5.0 # 5 secs delay for reenable 
     if self._task['run_immediately']: 
      # figure out when the schedule would run next anyway 
      _, n = self.schedule.is_due(self.last_run_at) 
      return True, n 
     return self.schedule.is_due(self.last_run_at) 

    def _crontab_schedule(self, crontab): 
     return celery.schedules.schedule(minute=crontab['minute'], 
             hour=crontab['hour'], 
             day_of_week=crontab['day_of_week'], 
             day_of_month=crontab['day_of_month'], 
             month_of_year=crontab['month_of_year']) 

    def _interval_schedule(self, period, every): 
     return celery.schedules.schedule(datetime.timedelta(**{period: every})) 


    def __repr__(self): 
     return '<CouchBaseScheduleEntry ({0} {1}(*{2}, **{3}) {{4}})>'.format(
      self.name, self.task, self.args, 
      self.kwargs, self.schedule 
     ) 

    def reserve(self, entry): 
     new_entry = Scheduler.reserve(self, entry) 
     return new_entry 

    @property 
    def getid(self): 
     return self._id 

    @property 
    def gettaskdict(self): 
     return self._task 

    def tojson(self): 
     return json.dumps(self.tocouchdict()) 

    def save(self, couchcel): 
     get_logger(__name__).info("Saving task {} in couchbase".format(self._id)) 
     if self.total_run_count > self._task['total_run_count']: 
      self._task['total_run_count'] = self.total_run_count 
     get_logger(__name__).error("{}, {}".format(self.last_run_at, self._task['last_run_at'])) 
     try: 
      if self.last_run_at and self._task['last_run_at'] \ 
        and self.last_run_at > self._task['last_run_at']: 
       self._task['last_run_at'] = self.last_run_at 

     except TypeError: 
      if self.last_run_at and self._task['last_run_at'] \ 
        and self.last_run_at > datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT): 
       self._task['last_run_at'] = self.last_run_at 
     self._task['run_immediately']= False 
     couchcel.save_scheduler(self) 

L'objet couchcel est utilisé pour l'accès base de données, l'objet ScheduleEntry les données en provenance parse du document Couchbase.

Meilleures salutations

+0

Vous n'êtes pas familier avec le céleri-beat. Cependant, les employés de céleri émettent un journal dans/var/log/céleri. J'ai essayé de regarder là-bas? Augmenter le niveau de journalisation? – FuzzyAmi

+0

Je travaille sous OSX, vérifié dans le dossier racine pour les journaux mais rien. – onizukaek

Répondre

0

La documentation sur le Celery website semble être un peu trompeur. Si vous regardez here, vous pouvez voir que l'option de ligne de commande -S définit la base de données d'état, pas le planificateur pour le travailleur.

Essayez de lancer avec l'option --scheduler à la place:

celery -A <my.task.file> beat --scheduler <my.scheduler.CouchBaseScheduler>