2012-07-12 1 views
5

Comment puis-je accéder au résultat d'une tâche Céleri dans mon processus d'application Django principal? Ou, comment puis-je publier sur une connexion de socket existante à partir d'un processus distinct?django-socketio avec Celery: envoyer au socket une fois la tâche asynchrone terminée dans un processus séparé

J'ai une application dans laquelle les utilisateurs reçoivent des scores. Lorsqu'un score est enregistré, des calculs sont effectués (progression vers les objectifs, etc.) et, sur la base de ces calculs, les notifications sont envoyées aux utilisateurs intéressés. Les calculs peuvent prendre 30s +, donc pour éviter une interface utilisateur léthargique, ces opérations sont effectuées dans un processus d'arrière-plan via une tâche Celery, invoquée par le signal post_save de mon modèle Score.

Idéalement, le signal post_save sur mon modèle Nofication publierait un message aux clients abonnés (j'utilise django-socketio, un wrapper pour gevent-socketio). Cela semble simple ...

  1. Créer un score
  2. faire des calculs sur la nouvelle instance de Score dans un processus d'arrière-plan
  3. Sur la base de ces calculs, créer une notification
  4. sur la notification sauver, saisir la instance et publier à ses clients abonnés via une connexion socket

Cependant après avoir essayé ce qui suit, je ne suis pas sûr que cela est possible:

  • passant SocketIOServer l'instance de gevent à la méthode de rappel invoquée par la tâche, mais cela nécessite un décapage l'objet passé, ce qui est impossible

  • stocker le session_id (différent du id_session de Django) de la douille dans memchache et récupérer cela dans le processus de tâche Celery. En utilisant Redis pubsub, les méthodes appelées par les signaux post_save sur les modèles créés dans un processus d'arrière-plan pourraient simplement publier sur un canal Redis, mais l'écoute du canal de discussion dans le processus d'application principal (qui a accès à la connexion socket) bloque le reste de l'application.

  • J'ai également essayé de générer de nouveaux threads pour chaque client Redis, qui sont créés pour chaque abonné socket. Pour autant que je peux dire cela nécessite une nouvelle ponte gevent.greenlets.Greenlet et gevent ne peut pas être utilisé dans plusieurs threads

Certes, cela est un problème résolu. Qu'est-ce que je rate?

+1

Pouvez-vous montrer du code?Je reçois les exigences, mais votre tentative de mise en œuvre n'est pas claire. À quoi ressemblent votre tasks.py et views.py (en supposant que vous utilisez cette structure de fichier)? –

+0

lorsque vos gestionnaires de socketio démarrent sur le serveur, que font-ils? s'abonnent-ils à une file d'attente redis pub-sub? Si oui, utilisez-vous le monkeypatch redis-py gevent? quelque part dans votre ap vous devez mettre cette ligne: 'import redis, gevent; redis.connection.socket = gevent.socket' https://github.com/andymccurdy/redis-py/pull/199. La chose à comprendre est que les greenlets de gevent comme les gestionnaires de socketio bloqueront toujours le processus si vous ne prenez pas soin d'utiliser les sockets/filedescriptors patchés par gevent. – Thomas

Répondre

0

Vous avez déjà django-socketio, l'écriture d'un pub/sous avec Redis serait dommage :)

côté client:

var socket = new io.Socket(); 
socket.connect(); 
socket.on('connect', function() { 
    socket.subscribe({{ score_update_channel }}); 
}); 
côté serveur

:

from django_socketio import broadcast_channel 
def user_score_update(user): 
    return 'score_updates_user_%s' % user.pk 

channel = user_score_update(user) 
broadcast_channel(score_result_data, channel) 

Vous avez besoin exécuter l'émission sur le processus django-socketio; si vous l'exécutez à partir d'un processus différent (céleri), cela ne fonctionnera pas (les canaux sont référencés en mémoire par le processus django-socketio); Vous pouvez résoudre ceci en l'enveloppant dans une vue et que le céleri appellera (faisant une vraie demande de HTTP) quand la tâche est complète.

Questions connexes