2017-09-15 7 views
0

Tâches de céleri exécutées avec succès sans files d'attentetâches de céleri file ne fonctionnant pas avec rabbitmq

configuration.

BROKER_URL = "amqp://user:[email protected]:5672/test" 
# Celery Data Format 
CELERY_ACCEPT_CONTENT = ['application/json'] 
CELERY_TASK_SERIALIZER = 'json' 

CELERYD_TASK_SOFT_TIME_LIMIT = 60 
CELERY_IGNORE_RESULT = True 

@app.task 
def test(a,b,c): 
    print("doing something here...") 
commande

celery worker -A proj -E -l INFO 

Le travailleur de configuration ci-dessus est en cours d'exécution avec succès.

J'ai introduit la file d'attente pour les tâches de céleri.

ajouté configuration avec la configuration précédente

from kombu.entity import Exchange, Queue 

CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'), 
    Queue('normal', Exchange('normal'), routing_key='normal'), 
    Queue('low', Exchange('low'), routing_key='low'), 
) 

CELERY_DEFAULT_QUEUE = 'normal' 
CELERY_DEFAULT_EXCHANGE = 'normal' 
CELERY_DEFAULT_ROUTING_KEY = 'normal' 

CELERY_ROUTES = { 
    'myapp.tasks.test': {'queue': 'high'}, 
} 

commande

celery worker -A proj -E -l INFO -n worker.high -Q high 

appel

test.delay(1, 2, 3) 

Lorsque j'exécute avec le travailleur de la file d'attente ne fonctionne pas. Ai-je manqué une configuration?

+0

Toute information dans les journaux de lapin et de céleri? – lapinkoira

+0

Je n'ai pas ajouté de bûches de céleri. Je vais l'ajouter. Ai-je besoin de configurer les noms de file d'attente sur rabbitmq pour le céleri? –

+0

Vous n'avez rien à faire sur lapin, assurez-vous simplement que le lapin fonctionne et écoute sur ce port et l'interface, si vous êtes sur une machine Linux et que vous avez accès à un shell, exécutez netstat -putan | grep 5672 – lapinkoira

Répondre

1

changement CELERY_ROUTES à CELERY_TASK_ROUTES- changé dans la version 4

0

Tout d'abord, assurez-vous que la connexion établie dans les deux lapins & journaux élevé des travailleurs.

Ensuite, essayez de changer votre CELERY_ROUTES à:

CELERY_ROUTES = { 
    'myapp.tasks.test': { 
     'exchange': 'high', 
     'exchange_type': 'high', 
     'routing_key': 'high' 
    } 
} 

ou appelez la tâche avec queue, par exemple:

test_task = test.signature(args=(1, 2, 3), queue='high', immutable=True) 
test_task.apply_async()