2013-05-14 1 views
3

J'ai un réacteur torsadé à l'écoute des données entrantes. J'ai un second réacteur effectuant des requêtes http à certains intervalles de temps envoyant les résultats au premier réacteur. Les deux fonctionnent bien. Je voudrais maintenant l'assembler pour fonctionner dans un réacteur, mais je ne sais pas comment y parvenir. Quelque chose comme - effectuer des requêtes http toutes les 60 secondes. d'une manière asynchrone de l'intérieur du premier réacteur "principal" d'écoute.Twisted/exécuter des requêtes HTTP asynchrones

Ce que j'ai en ce moment est:

# main reactor listening for incoming data forever 
... 
reactor.listenTCP(8123, TCPEventReceiverFactory()) 

Le réacteur http utilise twisted.internet.defer.DeferredSemaphore() pour effectuer plusieurs contrôles http:

# create semaphore to manage the deferreds 
semaphore = twisted.internet.defer.DeferredSemaphore(2) 

# create a list with all urls to check 
dl = list() 
# append deferreds to list 
for url in self._urls: 
    # returns deferred 
    dl.append(semaphore.run(self._getPage, url)) 

# get a DefferedList 
dl = twisted.internet.defer.DeferredList(dl) 
# add some callbacks for error handling 
dl.addCallbacks(lambda x: reactor.stop(), self._handleError) 

# start the reactor  
reactor.run() 

Comment puis-je ajouter les chronométrés chèques http à la "principale" réacteur, de sorte qu'ils sont effectués de manière asynchrone? Comment fonctionne exactement DeferredSemaphore?

Quelqu'un peut-il m'aider avec ceci?

[Ceci est une sorte de système de surveillance léger traitant les résultats de contrôle http. Je suis nouveau à la programmation Twisted et asynchrone. Je suis sur Xubuntu 12.04 exécutant Python 2.7]

+0

à quoi sert même le sémaphore différé? – SingleNegationElimination

+0

C'est pour limiter les demandes effectuées simultanément. – user937284

Répondre

4

vous n'avez pas besoin de plusieurs réacteurs. Effectuez simplement toutes les différentes actions en utilisant le même réacteur. Si vous appelez le reactor.stop(), vous faites probablement quelque chose de mal, alors laissez-nous vous débarrasser de cela, et reliez le tout en une seule fonction (que nous pourrions utiliser comme rappel); comme il fait un travail asynchrone, il devrait aussi renvoyer un différé, nous allons utiliser le DeferredList que vous utilisez déjà.

def thing_that_does_http(): 
    # create semaphore to manage the deferreds 
    semaphore = twisted.internet.defer.DeferredSemaphore(2) 

    # create a list with all urls to check 
    dl = DeferredList() 
    # append deferreds to list 
    for url in self._urls: 
     # returns deferred 
     dl.append(semaphore.run(self._getPage, url)) 

    # get a DefferedList 
    dl = twisted.internet.defer.DeferredList(dl) 
    # add some callbacks for error handling 
    dl.addErrback(self._handleError) 
    return dl 

La façon naturelle de « réaliser x dans certains intervalles de temps » est avec boucle appel. Avec cette fonction de rappel que nous ne avons pas besoin de faire beaucoup

reactor.listenTCP(8123, TCPEventReceiverFactory()) 
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http) 
# run once per minute, starting now. 
loop_http.start(60) 

Le réacteur LoopingCall et getPage utilisera pour leurs propres fins est twisted.internet.reactor, si vous utilisez un autre réacteur, par exemple si vous êtes En effectuant des tests unitaires, vous devrez remplacer cette valeur par défaut.

Dans le cas de LoopingCall, il est tout à fait simple, après la construction, (mais avant appelant sa méthode start()), définissez son clock attribut:

from twisted.internet.task import Clock 
fake_reactor = Clock() 
loop_http.clock = fake_reactor 
fake_reactor.advance(120) # move time forward two minutes... 

Malheureusement, la situation getPage() est moins agréable. Vous ne pouvez utiliser aucun autre réacteur avec cette interface; Vous aurez besoin d'utiliser le plus récent, plus brillant t.w.c.Agent. À bien des égards, Agent est supérieur, mais ce n'est pas tout à fait aussi pratique quand vous voulez juste le corps de réponse brut comme une chaîne. En plus de nécessiter un réacteur explicite transmis à son constructeur, il s'agit plus d'un contrôle fin sur le cycle requête/réponse que de la commodité fournie par getPage. En tant que tel, il est principalement mis en œuvre en termes de Producer s et Protocol s.Dans le cas de la première, nous pouvons passer une aide de commodité, FileBodyProducer pour envoyer des corps de demande avec un minimum d'agitation; Dans ce dernier cas, nous aurons besoin d'un protocole simple pour mettre en mémoire tampon tous les morceaux de données jusqu'à ce que nous ayons tout compris.

est ici un morceau de code qui pourrait remplacer getPage, avec à peu près la même interface, mais en prenant une instance de Agent comme premier argument

from cStringIO import StringIO 
from twisted.internet.defer import Deferred 
from twisted.internet.protocol import Protocol 
from twisted.web.client import ResponseDone 
from twisted.web.client import FileBodyProducer 


class GetPageProtocol(Protocol): 
    def __init__(self): 
     self.deferred = Deferred() 
     self.data = [] 

    def dataReceived(self, data): 
     self.data.append(data) 

    def connectionLost(self, reason): 
     reason.trap(ResponseDone) 
     data = ''.join(self.data) 
     del self.data 
     self.deferred.callback(data) 


def agentGetPage(agent, url, 
       method="GET", 
       headers=None, 
       postdata=None): 
    if postdata is not None: 
     bodyProducer = FileBodyProducer(StringIO(postdata)) 
    else: 
     bodyProducer = None 

    def _getPageResponded(response): 
     if response.length != 0: 
      proto = GetPageProtocol() 
      response.deliverBody(proto) 
      return proto.deferred 
     else: 
      return None 

    d = agent.request(method, url, headers, bodyProducer) 
    d.addCallback(_getPageResponded) 
    return d 

qui, dans un test unitaire, ressemblerait un peu comme:

from twisted.test.proto_helpers import MemoryReactor 
from twisted.web.client import Agent 
fake_reactor = MemoryReactor() 
agent = Agent(fake_reactor) 
d = agentGetPage(agent, "http://example.com") 

assert fake_reactor.tcpClients # or some such, exercise the code by manipulating the reactor 

Edit: Je voulais d'abord de passer ce pour donner ectomorph, moins d'être confus au sujet; mais alors c'est aussi une bonne idée de tambouriner dans la bonne manipulation des réacteurs tôt, et d'éviter des douleurs inutiles plus tard.

+1

Merci! Génial, c'est exactement ce que je cherchais. 'twisted.internet.reactor' est le réacteur que j'utilise. – user937284

+2

Bonne réponse, mais pas tout à fait correcte :). 'LoopingCall' utilisera' self.clock', qui est juste initialisé par 'twisted.internet.reactor' * par défaut *. La possibilité de le changer est importante, surtout pour les tests. (Malheureusement, 'getPage' est effectivement codé en dur, c'est pourquoi nous recommandons' twisted.web.client.Agent' maintenant.) – Glyph

+2

@Glyph: Mis à jour: Je pense que ça couvre moins bien les réacteurs. . – SingleNegationElimination

Questions connexes