2009-08-25 4 views
8

J'utilise SQLAlchemy avec un backend Postgres pour effectuer une insertion ou une mise à jour en masse. Pour essayer d'améliorer les performances, j'essaie de valider uniquement une fois tous les mille lignes environ:Comment effectuer efficacement une insertion ou une mise à jour en masse avec SQLAlchemy?

trans = engine.begin() 
    for i, rec in enumerate(records): 
    if i % 1000 == 0: 
     trans.commit() 
     trans = engine.begin() 
    try: 
     inserter.execute(...) 
    except sa.exceptions.SQLError: 
     my_table.update(...).execute() 
trans.commit() 

Cependant, cela ne fonctionne pas. Il semble que lorsque l'INSERT échoue, il laisse les choses dans un état étrange qui empêche le UPDATE de se produire. Recule-t-il automatiquement la transaction? Si oui, cela peut-il être arrêté? Je ne veux pas que toute ma transaction soit annulée en cas de problème, c'est pourquoi j'essaie d'attraper l'exception en premier lieu. Le message d'erreur que je reçois, BTW, est "sqlalchemy.exc.InternalError: (InternalError) transaction en cours est abandonnée, les commandes ignorées jusqu'à la fin du bloc de transaction", et cela se produit sur la mise à jour().) appel.

Répondre

5

Vous rencontrez un comportement bizarre propre à Postgresql: si une erreur survient dans une transaction, elle force la totalité de la transaction à être annulée. Je considère que c'est un bug de conception Postgres; il faut un peu de contorsion SQL pour contourner dans certains cas.

Une solution de contournement consiste à effectuer d'abord la mise à jour. Détecter s'il a réellement modifié une ligne en regardant cursor.rowcount; s'il n'a pas modifié les lignes, il n'existe pas, tout comme l'INSERT. (Ce sera plus rapide si vous mettez à jour plus fréquemment que vous insérez, bien sûr.)

Une autre solution consiste à utiliser les points de sauvegarde:

SAVEPOINT a; 
INSERT INTO ....; 
-- on error: 
ROLLBACK TO SAVEPOINT a; 
UPDATE ...; 
-- on success: 
RELEASE SAVEPOINT a; 

Cela a un sérieux problème pour le code de la qualité de la production: vous devez détecter l'erreur avec précision. Vraisemblablement, vous vous attendez à un contrôle de contrainte unique, mais vous risquez de rencontrer quelque chose d'inattendu, et il peut être presque impossible de distinguer de manière fiable l'erreur attendue de l'erreur inattendue. Si la condition d'erreur est incorrecte, cela entraînera des problèmes obscurs où rien ne sera mis à jour ou inséré et aucune erreur ne sera détectée. Soyez très prudent avec ça. Vous pouvez affiner le cas d'erreur en regardant le code d'erreur de Postgresql pour vous assurer que c'est le type d'erreur que vous attendez, mais le problème potentiel est toujours là. Enfin, si vous voulez vraiment effectuer l'insertion ou la mise à jour par lots, vous voulez en faire beaucoup en quelques commandes, pas un seul par commande. Cela nécessite un SQL plus complexe: SELECT imbriqué dans un INSERT, filtrant les bons éléments à insérer et à mettre à jour.

+1

"Si une erreur survient dans une transaction, elle force l'ensemble de la transaction à être annulée Je considère qu'il s'agit d'un bug de conception Postgres." - N'est-ce pas le point des transactions? De [Wikipedia] (http: //en.wikipedia.org/wiki/Database_transaction): "Les transactions fournissent une proposition" tout ou rien ", indiquant que chaque unité de travail exécutée dans une base de données doit être complète ou n'avoir aucun effet." – spiffytech

+0

@Spiffytech Bonne réponse. Cela m'a vraiment fait LOL. –

4

Cette erreur provient de PostgreSQL. PostgreSQL ne vous permet pas d'exécuter des commandes dans la même transaction si une commande crée une erreur. Pour résoudre ce problème, vous pouvez utiliser des transactions imbriquées (implémentées à l'aide de points de sauvegarde SQL) via conn.begin_nested(). Heres quelque chose qui pourrait fonctionner. J'ai fait en sorte que le code utilise des connexions explicites, factorise la partie de segmentation et fasse en sorte que le code utilise le gestionnaire de contexte pour gérer correctement les transactions.

from itertools import chain, islice 
def chunked(seq, chunksize): 
    """Yields items from an iterator in chunks.""" 
    it = iter(seq) 
    while True: 
     yield chain([it.next()], islice(it, chunksize-1)) 

conn = engine.commit() 
for chunk in chunked(records, 1000): 
    with conn.begin(): 
     for rec in chunk: 
      try: 
       with conn.begin_nested(): 
        conn.execute(inserter, ...) 
      except sa.exceptions.SQLError: 
       conn.execute(my_table.update(...)) 

Cela ne fonctionnera toujours pas avec des performances stellaires en raison de la surcharge des transactions imbriquées. Si vous voulez de meilleures performances, essayez de détecter les lignes qui créeront des erreurs au préalable avec une requête select et utilisez le support d'executemany (execute peut prendre une liste de dicts si toutes les insertions utilisent les mêmes colonnes). Si vous devez gérer des mises à jour simultanées, vous devrez toujours gérer les erreurs en réessayant ou en retenant les insertions une par une.

Questions connexes