2011-03-01 3 views
2

Je suis en train de construire un serveur qui stocke les données clé/valeur sur Redis en utilisant Twisted Python. Le serveur reçoit un dictionnaire JSON via HTTP, qui est converti en dictionnaire Python et placé dans un tampon. Chaque fois que de nouvelles données sont stockées, le serveur planifie une tâche qui extrait un dictionnaire du tampon et écrit chaque tuple dans une instance Redis, en utilisant un client txredis.Encore un autre problème producteur/consommateur dans Twisted Python

class Datastore(Resource): 

isLeaf = True 

def __init__(self): 
    self.clientCreator = protocol.ClientCreator(reactor, Redis) 
    d = self.clientCreator.connectTCP(...) 
    d.addCallback(self.setRedis) 
    self.redis = None 
    self.buffer = deque() 


def render_POST(self, request): 
    try: 
     task_id = request.requestHeaders.getRawHeaders('x-task-id')[0] 
    except IndexError: 
     request.setResponseCode(503) 
     return '<html><body>Error reading task_id</body></html>' 

    data = json.loads(request.content.read()) 
    self.buffer.append((task_id, data)) 
    reactor.callLater(0, self.write_on_redis) 
    return ' ' 

@defer.inlineCallbacks 
def write_on_redis(self): 
    try: 
     task_id, dic = self.buffer.pop() 
     log.msg('Buffer: %s' % len(self.buffer)) 
    except IndexError: 
     log.msg('buffer empty') 
     defer.returnValue(1) 

    m = yield self.redis.sismember('DONE', task_id) 
    # Simple check 
    if m == '1': 
     log.msg('%s already stored' % task_id) 
    else: 
     log.msg('%s unpacking' % task_id) 
     s = yield self.redis.sadd('DONE', task_id) 

     d = defer.Deferred() 
     for k, v in dic.iteritems(): 
      k = k.encode() 
      d.addCallback(self.redis.push, k, v) 

     d.callback(None) 

Fondamentalement, je suis face à un problème producteur/consommateur entre deux connexions différentes, mais je ne suis pas sûr que la mise en œuvre actuelle fonctionne bien dans le Twisted paradygm. J'ai lu la petite documentation sur les interfaces producteur/consommateur dans Twisted, mais je ne suis pas sûr de pouvoir les utiliser dans mon cas. Les critiques sont les bienvenues: j'essaie de comprendre la programmation événementielle, après tant d'années de concurrence entre les threads.

Répondre

2

Les API de production et de consommateur dans Twisted, IProducer et IConsumer, concernent le contrôle de flux. Vous ne semblez avoir aucun contrôle de flux ici, vous relayez simplement les messages d'un protocole à un autre.

Puisqu'il n'y a pas de contrôle de flux, le tampon n'est qu'une complexité supplémentaire. Vous pouvez vous en débarrasser en passant simplement les données directement à la méthode write_on_redis. De cette façon write_on_redis n'a pas besoin de gérer le cas de tampon vide, vous n'avez pas besoin de l'attribut supplémentaire sur la ressource, et vous pouvez même vous débarrasser du callLater (bien que vous puissiez également le faire même si vous gardez le tampon).

Je ne sais pas si cela répond à votre question, cependant. Quant à savoir si cette approche « fonctionne bien », voici les choses que je remarque simplement en lisant le code:

  • Si les données arrivent plus vite que Redis accepte, votre liste d'emplois pourrait éventuellement devenir arbitrairement grand, causant à court de mémoire. C'est ce que le contrôle de flux pourrait aider.
  • Sans traitement d'erreur autour de l'appel sismember ou de l'appel sadd, vous risquez de perdre des tâches si l'une d'entre elles échoue car vous les avez déjà expulsées du tampon de travail.
  • Faire un push comme rappel sur Deferredd signifie également que toute poussée échouée empêchera le reste des données d'être poussé. Il passe également le résultat du Deferred retourné par push (je suppose qu'il renvoie un Deferred) comme premier argument de l'appel suivant, donc à moins que push ne tienne plus ou moins compte de son premier argument, vous ne pousserez pas les bonnes données à redis.

Si vous souhaitez implémenter le contrôle de flux, vous devez avoir votre serveur HTTP vérifier la longueur de self.buffer et éventuellement rejeter la nouvelle tâche - pas ajouter à self.buffer et retourner un code d'erreur au client. Vous n'utiliserez toujours pas IConsumer et IProducer, mais c'est un peu similaire.