2017-09-04 6 views
1

Je pratiquais le céleri et je veux affecter ma tâche à une file d'attente spécifique mais il ne fonctionne pas comme prévuitinéraires de travail de céleri ne fonctionne pas comme prévu

Mon __init__.py

import os 
import sys 
from celery import Celery 

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 

sys.path.append(CURRENT_DIR) 

app = Celery() 
app.config_from_object('celery_config') 

Mon celery_config.py

amqp = 'amqp://guest:[email protected]:5672//' 
broker_url = amqp 
result_backend = amqp 

task_routes = ([ 
    ('import_feed', {'queue': 'queue_import_feed'}) 
]) 

Mon tasks.py

from . import app 

@app.task(name='import_feed') 
def import_feed(): 
    pass 

Comment je lance mon travailleur:

celery -A subscriber1.tasks worker -l info 

__init__.py Mon client:

import os 
import sys 
from celery import Celery 

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 

sys.path.append(CURRENT_DIR) 

app = Celery() 
app.config_from_object('celery_config') 

celery_config.py Mon client:

from kombu.common import Broadcast 

amqp = 'amqp://guest:[email protected]:5672//' 
BROKER_URL = amqp 
CELERY_RESULT_BACKEND = amqp 

Puis, dans la coquille de mon client j'ai essayé:

from publisher import app 
result = app.send_task('import_feed') 

Alors mon travailleur a obtenu la tâche ?! Ce que j'attends ne devrait pas parce que j'ai assigné cela à une file d'attente spécifique. J'ai essayé dans mon client la commande ci-dessous et aucune tâche n'a été reçue par mon travailleur que je pense avoir reçu à la place sur la première

result = app.send_task('import_feed', queue='queue_import_feed') 

On dirait que j'ai mal compris quelque chose dans la partie de routage. Mais ce que je veux vraiment, c'est que la tâche import_feed s'exécute uniquement si la file d'attente queue_import_feed est spécifiée lors de l'envoi d'une tâche

Répondre

1

Vous pouvez modifier la file d'attente par défaut traitée par le worker.

app.send_task('import_feed') envoie la tâche à la file d'attente celery.

app.send_task('import_feed', queue='queue_import_feed') envoie la tâche à queue_import_feed mais votre travail ne traite que les tâches dans la file d'attente celery.

Pour traiter les files d'attente spécifiques, utilisez le -Q commutateur

celery -A subscriber1.tasks worker -l info -Q 'queue_import_feed' 

Modifier

Afin de placer une restriction à send_task telle qu'un travailleur réagit à import_feed tâche que quand il est publié avec une file d'attente , vous devez remplacer send_task sur Celery et également fournir un AMQP personnalisé avec un default_queue défini sur None.

réacteur.py

from celery.app.amqp import AMQP 
from celery import Celery 

class MyCelery(Celery): 
    def send_task(self, name=None, args=None, kwargs=None, **options): 
     if 'queue' in options: 
      return super(MyCelery, self).send_task(name, args, kwargs, **options) 


class MyAMQP(AMQP): 
    default_queue = None 

celery_config.py

from kombu import Exchange, Queue 

... 

task_exchange = Exchange('default', type='direct') 
task_create_missing_queues = False 

task_queues = [ 
    Queue('feed_queue', task_exchange, routing_key='feeds'), 
] 

task_routes = { 
    'import_feed': {'queue': 'feed_queue', 'routing_key': 'feeds'} 
} 

__init__.py

celeree = MyCelery(amqp='reactor.MyAMQP') 
+0

Désolé, mais n'a pas vraiment répondu à la question. Je sais que cela peut être fait de cette façon. Je veux juste faire ma dernière déclaration –

+0

Votre dernière requête demande d'exécuter une tâche spécifique uniquement si une file d'attente est spécifiée pour celle-ci. Vous voulez désactiver [routage automatique] (http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-create-missing-queues) pour que les tâches de ce type ne soient pas mises en file d'attente dans ' celery' queue –

+0

Exactement, ce que je veux c'est exécuter simplement 'celery -A subscriber1.tasks worker -l info' ne spécifiant aucune file d'attente pour exécuter un worker et limitera la tâche' import_feed' pour réagir quand une tâche est publiée avec le file d'attente 'queue_import_feed'. Est-ce que ce n'est pas possible? –