2013-08-21 4 views
2

J'essaye de construire une application de pyramide. J'ai commencé avec l'échafaudage SQLAlchemy. Je me heurte à un problème et je me demande quel est le meilleur moyen d'y remédier. Dans l'une de mes vues, j'ai besoin de sélectionner beaucoup de lignes à partir de deux tables non liées. Je dois m'assurer qu'aucune ligne n'a été insérée dans la deuxième table entre le moment où j'ai sélectionné les lignes de la première table et le moment où j'ai sélectionné les lignes de la deuxième table. J'ai trois modèles, Node, Test et Tasking. Les deux Nodes et Tests ont un peu de métadonnées. Compte tenu d'une liste de Nodes et une liste de Tests, une liste globale de Taskings peut être créée. Par exemple, nous pourrions avoir trois Nodes, a, b et c et deux Tests "nous avons besoin d'un nœud pour faire la tâche P" et "nous avons besoin de deux nœuds pour faire la tâche Q".Comment gérer l'intégrité de plusieurs tables sur plusieurs sélections dans SQLAlchemy dans Pyramid?

À partir de ces informations, trois Tasks doivent être créés. Par exemple:

  1. "Noeud a devrait faire la tâche P"
  2. "Noeud b doit faire la tâche Q"
  3. "Noeud c devrait faire la tâche Q"

Maintenant, je suis essayer de fournir une API REST pour cela. La grande majorité des clients demandera la liste Tasks, ce qui doit être rapide. Cependant, parfois un client peut ajouter un Node ou un Test. Lorsque cela se produit, j'ai besoin de régénérer la liste entière de Tasks.

Voici un exemple grossier:

@view_config(route_name='list_taskings') 
def list_taskings(request): 
    return DBSession.Query(Tasking).all() 

@view_config(route_name='add_node') 
def add_node(request): 
    DBSession.add(Node()) 
    _update_taskings() 

@view_config(route_name='add_test') 
def add_test(request): 
    DBSession.add(Test()) 
    _update_taskings() 

def _update_taskings(): 
    nodes = DBSession.query(Node).all() 
    tests = DBSession.query(Test).all() 

    # Process... 

    Tasking.query.delete() 
    for t in taskings: 
     DBSession.add(t) 

J'utilise la valeur par défaut Pyramide échafaudage SQLAlchemy. Ainsi, chaque requête démarre automatiquement une transaction. Donc, si _update_tasking est appelée à partir d'une requête (disons add_node), alors le nouveau nœud sera ajouté au DBSession local, et l'interrogation de tous les Nodes et Tests dans _update_tasking retournera ce nouvel élément. En outre, la suppression de tous les Taskings existants et l'ajout de ceux nouvellement calculés est également sans danger.

J'ai deux problèmes:

  1. Ce qui se passe si une nouvelle ligne est ajoutée dans la table Tests entre le moment où je reçois ma liste de nodes et ma liste de tests à _update_taskings? Dans mon système de production réel, ces sélections sont proches les unes des autres, mais pas les unes à côté des autres. Il y a la possibilité d'une condition de course. Comment est-ce que je m'assure que deux demandes qui mettront à jour le Taskings ne se remplacent pas l'un l'autre? Par exemple, imaginez si notre système existant en avait un Node et un Test. Deux demandes arrivent en même temps, l'une pour ajouter un Node et l'autre pour ajouter un Test.Même si le problème n ° 1 n'était pas un problème et que je savais que la paire de sélections de chaque requête représentait "une seule instance de temps dans la base de données", il y a toujours le problème d'une requête écrasant l'autre. Si la première demande se termine d'abord avec maintenant deux Nodes et un Test, la deuxième demande sera toujours à sélectionner les anciennes données (potentiellement) et générer une liste de Taskings avec un Node et deux Tests.

Alors, quelle est la meilleure façon de gérer cela? J'utilise SQLite pour le développement et PostgreSQL en production, mais je voudrais une solution agnostique de base de données. Je ne suis pas inquiet pour les autres applications accédant à cette base de données. Mon API REST sera le seul mécanisme d'accès. Devrais-je mettre un verrou autour des demandes qui modifient la base de données (en ajoutant un Node ou un Test)? Dois-je verrouiller la base de données en quelque sorte?

Merci pour toute aide!

Répondre

5

L'utilisation du niveau d'isolation de transaction serializable doit empêcher ces deux problèmes. Si une transaction modifie les données qui peuvent affecter les résultats des lectures précédentes dans une autre transaction, il existe un conflit de sérialisation. Une seule transaction gagne, toutes les autres sont annulées par la base de données à redémarrer par le client. SQLite fait cela en verrouillant toute la base de données, PostgreSQL utilise un mécanisme beaucoup plus complexe (voir docs pour plus de détails). Malheureusement, il n'y a pas de moyen sqlalchemic portable pour attraper l'anomalie de sérialisation et réessayer. Vous devez écrire du code spécifique à la base de données pour pouvoir le distinguer des autres erreurs de manière fiable.

J'ai mis en place un programme d'échantillonnage avec deux threads modifier simultanément les données (une reproduction très basique de votre système), en cours d'exécution dans des conflits et une nouvelle tentative:

https://gist.github.com/khayrov/6291557

Avec middleware de transaction Pyramid et Zope gestionnaire de transactions en cours d'utilisation qui serait encore plus facile. Après avoir détecté l'erreur de sérialisation, au lieu de réessayer manuellement, relancez TransientError et le middleware réessaie la totalité de la requête jusqu'à tm.attempts (dans le config paster) fois.

from transaction.interfaces import TransientError 

class SerializationConflictError(TransientError): 
    def __init__(self, orig): 
     self.orig = orig 

Vous pouvez même écrire votre propre middleware assis ci-dessous pyramid_tm dans la pile qui va attraper les erreurs de sérialisation et de les traduire pour erreurs transitoires en toute transparence.

def retry_serializable_tween_factory(handler, registry): 

    def retry_tween(request): 
     try: 
      return handler(request) 
     except DBAPIError, e: 
      orig = e.orig 
      if getattr(orig, 'pgcode', None) == '40001': 
       raise SerializationConflictError(e) 
      elif isinstance(orig, sqlite3.DatabaseError) and \ 
       orig.args == ('database is locked',): 
       raise SerializationConflictError(e) 
      else: 
       raise 

    return retry_tween 
+0

Merci, c'est exactement ce dont j'avais besoin. Bien que ce ne soit pas 100% joli, sachant que le verrouillage se passe au niveau de la base de données et qu'il est intelligent sur ce qui est considéré comme une erreur de sérialisation. J'ai un problème cependant. J'utilise l'échafaudage Pyramid qui utilise pyramid_tm. De plus, ma session SQLAlchemy est une scoped_session. Je ne peux pas .commit() (bien que je pense que .flush() fasse la même chose dans ce cas particulier), et si .rollback() alors je ne peux pas réessayer la transaction: "ResourceClosedError: Cette transaction est fermée" . Je vais google autour d'un peu. Merci! – jmacdonagh

+0

J'ai mis à jour la réponse pour couvrir le cas du middleware de transaction. – rkhayrov

+0

J'ai posté mon précédent message "pas 100% joli" avant la mise à jour impressionnante que vous avez faite. L'idée tween est parfaite. Ça découpe bien tout. Parfait! Y a-t-il une raison pour laquelle vous déclenchez une SerializationConflictError plutôt qu'une simple TransientError? – jmacdonagh

Questions connexes