2016-11-09 1 views
0

Ce que j'essaie d'atteindre Ecrivez un planificateur qui utilise une base de données pour planifier des tâches similaires à différents moments.Planification d'une tâche à plusieurs temps (avec différents paramètres) en utilisant céleri beat mais tâche exécutée une seule fois (avec des paramètres aléatoires)

Pour le même j'utilise battement de céleri, l'extrait de code ci-dessous donnerait une idée

try: 
    reader = MongoReader() 
except: 
    raise 
try: 
    tasks = reader.get_scheduled_tasks() 
except: 
    raise 
celerybeat_schedule = dict() 
for task in tasks: 
    celerybeat_schedule[task["task_id"]] =dict() 
    celerybeat_schedule[task["task_id"]]["task"] = task["task_name"] 
    celerybeat_schedule[task["task_id"]]["args"] = (task,) 
    celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task) 

app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule) 

de sorte que ces trois étapes - lecture toutes les tâches de datastore - la création d'un dictionnaire, planificateur de céleri qui est peuplé de toutes les tâches ayant des propriétés, nom_de_la_tâche (méthode qui fonctionnerait), les paramètres (données à passer à la méthode), le calendrier (magasins quand exécuter) - mettre à jour ce avec des configurations de céleri

scénario attendu donné toute l'ENTR s exécutent le même nom de la tâche de céleri qui écrit seulement, ont même programme à exécuter toutes les 5 min, ayant des paramètres spécifiant que pour imprimer, permet de dire db a

task name  , parameter , schedule 
regular_print , Hi  , {"minutes" : 5} 
regular_print , Hello  , {"minutes" : 5} 
regular_print , Bye  , {"minutes" : 5} 

Je pense, ceux-ci d'impression pour imprimer toutes les 5 minutes d'imprimer les trois

Qu'est-ce qui se passe un seul de Salut, Bonjour, impressions Bye (possible au hasard, sûrement pas dans l'ordre)

S'il vous plaît aider, Merci beaucoup à l'avance :)

Répondre

0

W comme capable de résoudre cela en utilisant la version 4 de céleri. Échantillon similaire à ce qui a fonctionné pour moi .. peut également trouver dans la documentation par le céleri pour la version 4

#taking address and user-pass from environment(you can mention direct values) 
    ex_host_queue = os.environ["EX_HOST_QUEUE"] 
    ex_port_queue = os.environ["EX_PORT_QUEUE"] 
    ex_user_queue = os.environ["EX_USERID_QUEUE"] 
    ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"] 
    broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//" 

    #celery initialization 
    app = Celery(__name__,backend=broker, broker=broker) 
    app.conf.task_default_queue = 'scheduler_queue' 
    app.conf.update(
     task_serializer='json', 
     accept_content=['json'], # Ignore other content 
     result_serializer='json' 
    ) 
task = {"task_id":1,"a":10,"b":20} 
##method to update scheduler 
def add_scheduled_task(task): 
    print("scheduling task") 
    del task["_id"] 
    print("adding task_id") 
    name = task["task_name"] 
    app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])  

@app.task(name='scheduler_task') 
def scheduler_task(data): 
    print(str(data["a"]+data["b"]))