2017-10-10 15 views
0

Fondamentalement, j'essaie de générer des événements côté serveur dans un thread séparé. J'ai un celery.task qui devrait émettre des événements, mais son code n'est jamais exécuté.Comment émettre des événements à partir d'une tâche de céleri

import json 
import time 

from celery import Celery 

from flask import Flask 
from flask import jsonify 
from flask import render_template 
from flask import request 

from flask_socketio import SocketIO 

broker_url = "redis://localhost:6379/1" 

celery = Celery(broker=broker_url) 
app = Flask(__name__) 
socketio = SocketIO(app, message_queue=broker_url) 


@celery.task 
def countdown(n): 
    print("countdown", n) 
    for i in range(n+1): 
     time.sleep(1) 
     socketio.emit(
      "countdown", 
      {"remaining": n - i}, 
      namespace="/test/" 
     ) 


@app.route("/") 
def index(): 
    return render_template("index.html") 


@app.route("/start_countdown/", methods=["POST"]) 
def start_countdown(): 
    data = json.loads(request.data.decode()) 
    countdown.delay([int(data["time"])]) 
    return jsonify(time_to_wait=data["time"]) 


if __name__ == '__main__': 
    socketio.run(debug=True) 

Les vues répondent bien, mais la tâche est silencieuse et je ne comprends pas, pourquoi?

UPD

j'avais réarrangé mon code comme here. La structure du dossier est complètement identique et les fichiers sont identiques. Dans le dossier app/main j'ai un fichier supplémentaire tasks.py.

import time 

from celery import Celery 
from flask_socketio import emit 

from app import socketio 
from config import broker_url 


celery = Celery(broker=broker_url) 

@celery.task 
def countdown(n): 
    print(n) 

    for i in range(n+1): 
     time.sleep(1) 
     print("Socket", socketio) 
     print("Server", socketio.server) 
     socketio.emit(
      "countdown", 
      {"remaining": n - i}, 
      namespace="/test/" 
     ) 

Je commence le travail de céleri avec la commande celery -A app.main.tasks worker. Et lorsque le code de la tâche countdown est exécutée, elle échoue à cette exception:

[2017-10-12 19:04:07,797: WARNING/ForkPoolWorker-1] 13 
[2017-10-12 19:04:08,799: WARNING/ForkPoolWorker-1] <flask_socketio.SocketIO object at 0x7f07d2a0fc50> 
[2017-10-12 19:04:08,803: ERROR/ForkPoolWorker-1] Task app.main.tasks.countdown[68ae2e43-6ab7-4d52-8b3a-a9aaff46c489] raised unexpected: AttributeError("'NoneType' object has no attribute 'emit'",) 
Traceback (most recent call last): 
    File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 374, in trace_task 
    R = retval = fun(*args, **kwargs) 
    File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 629, in __protected_call__ 
    return self.run(*args, **kwargs) 
    File "/path/to/tasks.py", line 24, in countdown 
    namespace="/test/" 
    File "/path/to/venv/lib/python3.4/site-packages/flask_socketio/__init__.py", line 357, in emit 
    self.server.emit(event, *args, namespace=namespace, room=room, 
AttributeError: 'NoneType' object has no attribute 'emit' 

socketio.server dans ma tâche est None pour une raison quelconque, alors que dans app/main/events.py fichier, il est objet approprié. On dirait que dans ma tâche socketio l'objet n'est pas complètement initialisé, probablement, car dans le processus d'exécution du céleri, le flux est différent, mais je ne sais pas comment le réparer.

+0

Comment vous connectez-vous à cette application à partir du client? – Miguel

+0

@Miguel, voici index.html et magic.js https://gist.github.com/montreal91/8a9ba9033df1469ccd51e1c2e7a3ac9f. Je peux les ajouter à la poste, si nécessaire. 'start_countdown' est appelé, mais la tâche d'arrière-plan ne démarre pas. – Montreal

+0

Exécutez-vous au moins un processus de travail Celery en plus de votre serveur principal? – Miguel

Répondre

0

Votre tâche céleri est-elle dans un module tasks.py distinct? par exemple:

from celery import Celery 

celery = Celery('tasks', broker=broker_url) 
socketio = SocketIO(app, message_queue=broker_url) 

@celery.task 
def countdown(n): 
    print("countdown", n) 
    for i in range(n+1): 
     time.sleep(1) 
     socketio.emit(
      "countdown", 
      {"remaining": n - i}, 
      namespace="/test/" 
     ) 

Ensuite, vous pouvez commencer à ce travailleur à l'invite:

celery -A tasks worker --loglevel=info 

vous devez alors exécuter votre code principal et ajouter:

from tasks import countdown 

et appelez votre fonction de compte à rebours qui sera exécuter dans un processus séparé