2016-09-20 3 views
0

Je travaille sur un projet qui implique de faire de nombreuses requêtes à un API et pour chaque feedback je prends une décision et je sauvegarde dans le db. J'utilise adbapi pour communiquer avec mysql.Blocs de réacteurs torsadés pour de longues tâches différées

Je reçois la demande en tant que POST contenant une liste d'éléments qui doivent être envoyés à une API distante et enregistrés.

J'ai noté que lors du traitement des éléments dans un bloc d'opérations différées toutes les autres opérations jusqu'à ce qu'une partie soit terminée.

Ce qui suit est un exemple qui montre quelque chose de similaire à ce que je fais.

#!/usr/bin/python2.7 

from twisted.web.server import Site 
from twisted.web.resource import Resource 
from twisted.internet import reactor, defer 
from twisted.web.server import NOT_DONE_YET 

from utils import send_mail, save_in_db 


def get_params(request): 
    params = {} 
    for k, v in request.args.items(): 
     if k and v: 
      params[k] = v[0] 
    return params 


class SendPage(Resource): 

    def render_POST(self, request): 
     params = get_params(request) 
     emails = params['emails'] 
     message = params['message'] 
     self.process_send_mail(message, emails) 
     request.write('Received') 
     request.finish() 
     return NOT_DONE_YET 

    def process_send_mail(self, message, emails): 
     defs = [] 
     for email in emails: 
      d = send_mail(email, message) 
      defs.append(d) 
     d1 = defer.DeferredList(defs) 
     d1.addCallback(self.process_save) 

    def process_save(self, result): 
     defs = [] 
     for r in result: 
      d = save_in_db(r) 
      defs.append(d) 
     d1 = defer.DeferredList(defs) 
     d1.addCallback(self.post_save) 

    def post_save(self, result): 
     print "request was completed" 


root = Resource() 
root.putChild("", SendPage()) 
factory = Site(root) 
reactor.listenTCP(8880, factory) 
reactor.run() 

Dans les exemples ci-dessus, quand j'ai beaucoup d'e-mails dans la liste comme 100 000 quand je fais send_mail il bloque d'autres opérations jusqu'à ce que son fini. Si j'essaye d'envoyer une autre requête pendant que cela se produit, elle bloque jusqu'à ce qu'elle soit terminée.

Ma question est la suivante: y a-t-il un moyen de faire en sorte que les opérations se déroulent simultanément? Puis-je envoyer un mail et de manière simultanée save_in_db? Est-ce que je peux faire cela quand je reçois d'autres demandes et que je les manipule sans devoir attendre l'autre?

Répondre

0

Vous pouvez omettez simplement attendre les résultats ou attendre tous les résultats: l'envoi et l'enregistrement dans la base de données comme ceci:

def process_send_mail(self, message, emails): 
    defs = [] 
    for email in emails: 
     d = send_mail(email, message) 
     defs.append(d) 
     d = save_in_db(email) 
     defs.append(d) 

    d1 = defer.DeferredList(defs) 
    d1.addCallback(self.post_save)  

def post_save(self): 
    print "request was completed" 
+0

Mais ce résultat que vous êtes en boucle dans 'for r in result' n'est pas défini ci-dessus? Je vais devoir obtenir le résultat de send_mail pour pouvoir l'utiliser. J'ai noté des attentes différées pour tous les mails à envoyer. Je voudrais un moyen de traiter chaque courrier et enregistrer en db au lieu d'attendre pour tous. –

+0

Fixe. Je ne sais pas ce qui retourne 'send_mail'. Email utilisé pour le transmettre à la base de données. Je suppose que c'est le paramètre à passer à 'save_in_db'. –

0

Un truc que j'ai mis à profit dans le passé est une combinaison de inlineCallbacks et yield. Fondamentalement, vous pouvez itérer n nombre d'éléments puis yield ou faire une pause à un intervalle donné afin que le réacteur puisse effectuer d'autres tâches. Donc dans votre cas, vous décorer toutes les fonctions qui ont potentiellement bloquer les boucles avec @inlineCallbacks, enumerate la boucle, puis yield/pause à un certain point pour redonner le contrôle au réacteur.

@defer.inlineCallbacks 
def process_send_mail(self, message, emails): 
    defs = [] 
    for i, email in enumerate(emails): # enumerate 
     d = send_mail(email, message) 
     defs.append(d) 
     if i % 1000 == 0: 
      yield # pause every 1000 elements 
    d1 = defer.DeferredList(defs) 
    d1.addCallback(self.process_save) 

Vous devrez modifier la valeur de l'intervalle en fonction de vos besoins, car la valeur dépendra de la rapidité avec laquelle les résultats peuvent être produits. J'espère que cela t'aides.

0

Il y a en fait deux questions; Je vais les aborder séparément. Le premier est: "Existe-t-il un moyen pour que les opérations se déroulent simultanément? Puis-je envoyer sendmail et de manière simultanée" save_in_db "?

La réponse est: oui et non. Vous ne pouvez pas le faire en même temps, car autant que je sache, l'enregistrement des données dans la base de données nécessite un certain résultat de l'envoi du courrier. Mais si vous vouliez dire: puis-je commencer à enregistrer des choses dans la base de données dès que j'obtiens le premier résultat, sans attendre que TOUS les résultats de courrier arrivent avant d'enregistrer des choses dans la BD - oui, vous pouvez le faire; juste combiner deux fonctions de traitement en un seul:

def process_send_mail_and_save(self, message, emails): 
    defs = [] 
    for email in emails: 
     d = send_mail(email, message) 
     # might require tuning for save_in_db parameters if not matching send_mail callback output 
     d.addCallback(save_in_db) 
     defs.append(d) 
    d1 = defer.DeferredList(defs) 
    d1.addCallback(self.post_save) 

2) « je peux le faire que je reçois d'autres demandes et la poignée sans avoir à attendre les uns des autres pour terminer? »

Bien sûr, vous pouvez le faire dans Twisted. Mais vous devez écrire votre code de la bonne manière.Vous ne nous dites pas ce que send_mail ou save_in_db font - je suppose que vous les avez écrits, et je suppose que ces fonctions bloquent et causent la plupart de vos problèmes - peut-être send_mail fait tout le travail SMTP et seulement quand il a fini ça revient? Il devrait revenir différé immédiatement, et le rappel lorsque le travail est terminé:

http://twistedmatrix.com/documents/16.4.0/core/howto/clients.html

Je vous suggère de mettre la journalisation des appels avec des horodateurs dans le send_mail et les fonctions save_in_db - autour du moment où vous les appelez, pas le moment de leur incendies différés. Rappelez-vous: le point entier des reports de Twisted est que les différés sont renvoyés IMMEDIATEMENT sans bloquer, alors que le rappel que vous leur associez se déclenche plus tard, quand quelque chose est exécuté. Si quelque chose bloque n'importe où, Twisted ne peut rien faire - c'est un thread unique, fondamentalement un multitâche coopératif. Mais Twisted ne peut transformer votre code en non-bloquant magiquement - VOUS devez le faire.

Sidenote: la façon dont vous utilisez server.NOT_DONE_YET est inutile. Renvoyez juste "Received" comme une chaîne et oubliez l'objet de demande. Vous utilisez NOT_DONE_YET lorsque vous appelez request.finish() ailleurs, pas immédiatement.

+0

Allan, je ne fais aucun appel bloquant dans mon code. Les send_mail et save_in_db ne font aucun appel bloquant renvoie un différé. Comme je l'ai dit le problème est noté lorsque les demandes sont nombreuses (comme 50k demandes). J'ai modifié mon code de la façon dont vous avez suggéré où je commence à enregistrer dès que j'obtiens la réponse de la fonction send_mail mais j'ai quand même noté que la méthode save ne démarre que lorsque toutes les requêtes send_mail ont été différées, ce qui peut prendre du temps et le réacteur ne fait rien d'autre pendant ce temps. –

+0

Sur votre note à propos de NOT_DONE_YET, quand je ne le renvoie pas (par exemple, renvoyez 'Received' comme vous l'avez suggéré), j'obtiens une exception ('exceptions.RuntimeError: Request.write appelé sur une requête après que Request.finish a été appelé'). Est-ce ainsi que cela devrait fonctionner? –

+0

Oui. Vous ne devriez rien faire avec la demande. Pas d'écriture(), pas de fin() - juste retour "Reçu". –