J'ai un plan dans mon application flacon.Est-il possible d'appeler une tâche de céleri à partir d'un plan?
J'initalized le céleri dans le fichier main.py:
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app.config.update(
DEBUG = True,
MAIL_SERVER='smtp.gmail.com',
MAIL_PORT=465,
MAIL_USE_TLS = False,
MAIL_USE_SSL= True,
MAIL_USERNAME = email,
MAIL_PASSWORD = password,
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)
mail = Mail(app)
celery = make_celery(app)
J'ai aussi un plan que je me suis inscrit avec l'application dans mon fichier main.py.
Edit:
mes tâches:
@celery_tasks(name="call_api1")
def call_api1:
api = requests.post("blah blah")
return api
@celery.task(name="call_api2")
def call_api2():
api2 = requests.post("api2 call")
return api2
dès maintenant, j'appelle ces tâches dans mon fichier main.py.
Cependant, je voudrais les utiliser dans mon plan (que je suis inscrit sur mon main.py)
J'ai défini les tâches de céleri dans main.py et je voudrais savoir si je peux appeler la tâches de céleri de mon plan.
Cordialement, Galeej
oui. Quelque chose de ce genre. Je souhaite envoyer un appel à des API tierces de manière asynchrone et les diffuser sur mon frontal. Donc la tâche elle-même est très simple (j'ai écrit des cours qui feront le travail/les appels nécessaires). Le plan est là pour rendre ma vie plus facile :) – galeej
@galeej pouvez-vous également fournir le code de ces plans? –