2017-09-22 6 views
0

J'utilise Céleri avec une application Flask, et voici ma config:Les files d'attente multiples de céleri ne fonctionnent pas correctement. Toutes les tâches sont envoyées à la file par défaut

app.config['CELERY_TASK_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'), 
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('processing', Exchange('processing'), routing_key='processing'), 
) 

app.config['CELERY_TASK_ROUTES'] = { 
    'app.tasks.extract_text': {'queue': 'processing', 'routing_key': 'processing'}, 
    ... 

    'app.tasks.vt_notifications': {'queue': 'default', 'routing_key': 'default'}, 
    ... 

    'app.tasks.update_files_from_search': {'queue': 'fast', 'routing_key': 'fast'}, 
    ... 
} 


app.config['CELERY_DEFAULT_QUEUE'] = 'default' 
app.config['CELERY_DEFAULT_EXCHANGE'] = 'default' 
app.config['CELERY_DEFAULT_ROUTING_KEY'] = 'default' 

J'ai fini avec l'exécution des instances de céleri comme ceci:

celery -A app.tasks.celery worker -Q 'processing' --concurrency 1 -l debug -n processing 
celery -A app.tasks.celery worker -Q 'fast' --concurrency 1 -l debug -n fast 
celery -A app.tasks.celery worker -Q 'default' --concurrency 1 -l debug -n default 

Donc, le problème est que toutes les tâches sont envoyées à la file d'attente 'par défaut'. Toute aide est grandement appréciée. Merci!

+0

version céleri? – ItayB

Répondre

3

Si vous utilisez le céleri> 4, je recommande quelques petites choses: Tout d'abord, essayez d'ajouter name à votre tâche (pour vous assurer que vous utilisez le bon nom dans CELERY_TASK_ROUTES Par exemple:.

@app.task(name='extract_text']) 
    def extract_text(..): 
     pass 

deuxième , essayez de changer le CELERY_TASK_ROUTES à:.

CELERY_ROUTES = { 
    'extract_text': { 
     'exchange': 'processing', 
     'exchange_type': 'direct', 
     'routing_key': 'processing' 
    } 
} 

(au lieu de queue - essayez d'ajouter exchange et exchange_type)

Dernière chose, vous n'avez pas à l'utiliser, juste pour le débogage, vous pouvez explicitement la tâche de la route lors du déclenchement:

(extract_text.signature(args=(...), queue='processing')).delay() 

EDIT:

êtes-vous que vous utilisez config comme requis? voici un exemple:

celery_app = Celery() 
celeryconfig = {} 
celeryconfig['BROKER_URL'] = 'amqp://' 
celeryconfig['CELERY_RESULT_BACKEND'] = 'redis://localhost' 
celeryconfig['CELERY_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'), 
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('processing', Exchange('processing'), routing_key='processing'), 
) 
celeryconfig['CELERY_ROUTES'] = { 
    'extract_text': { 
     'exchange': 'processing', 
     'exchange_type': 'direct', 
     'routing_key': 'processing' 
    } 
} 

celery_app.config_from_object(celeryconfig) 
+2

Si je route explicitement la tâche lors du déclenchement, cela fonctionne parfaitement. On dirait que la variable CELERY_TASK_ROUTES est simplement ignorée –

+0

alors, pouvez-vous accepter la réponse s'il vous plaît? :-) – ItayB

+1

avez-vous essayé 'CELERY_ROUTES' à la place? peut-être que vous n'utilisez pas la configuration au besoin, je vais mettre à jour ma réponse dans un second – ItayB