2012-02-20 1 views
3

Je cherche à faire quelques mises à jour dans Azure Storage Tables. Je veux utiliser le mécanisme de concurrence optimiste correctement. Il semble que vous auriez besoin de faire quelque chose comme:Code générique pour la gestion des conflits de concurrence Azure Tables?

  1. ligne de charge la mise à jour, peut-être des échecs rejugeant
  2. Appliquer les mises à jour à la ligne
  3. Enregistrer la ligne, peut-être des erreurs réseau rejugeant
    1. En cas de un conflit d'accès concurrentiel, recharger les données (éventuellement) et les échecs rejugeant tenter de sauver à nouveau (échecs possibles rejugeant)

Existe-t-il un exemple de classe ou de code générique qui gère cela? Je peux le coder, mais je dois imaginer que quelqu'un a déjà inventé cette roue particulière.

Répondre

2

Si quelqu'un a inventé cette roue, ils ne parlent pas, alors je suis parti et l'ai (ré) inventé moi-même. C'est intentionnellement très générique, plus d'un squelette que d'un produit fini. C'est fondamentalement juste l'algorithme que j'ai décrit ci-dessus. L'appelant doit téléphoner aux délégués pour effectuer le chargement, la mise à jour et l'enregistrement des données. Il y a une logique de réessai de base intégrée, mais je recommanderais de remplacer ces fonctions par quelque chose de plus robuste. Je crois que cela fonctionnera avec des tables ou des BLOBs, et des entités ou des lots individuels, bien que je ne l'ai réellement essayé qu'avec des mises à jour de tables à entité unique.

Tous les commentaires, suggestions, améliorations, etc. seraient appréciés.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Data.Services.Client; 
using Microsoft.WindowsAzure.StorageClient; 
using System.Net; 

namespace SepiaLabs.Azure 
{ 
    /// <summary> 
    /// Attempt to write an update to storage while using optimistic concurrency. 
    /// Implements a basic state machine. Data will be fetched (with retries), then mutated, then updated (with retries, and possibly refetching & remutating). 
    /// Clients may pass in a state object with relevant information. eg, a TableServiceContext object. 
    /// </summary> 
    /// <remarks> 
    /// This object natively implements a very basic retry strategy. 
    /// Clients may want to subclass it and override the ShouldRetryRetrieval() and ShouldRetryPersist() functions to implement more advanced retry strategies. 
    /// 
    /// This class intentionally avoids checking if the row is present before updating it. This is so callers may throw custom exceptions, or attempt to insert the row instead ("upsert" style interaction) 
    /// </remarks> 
    /// <typeparam name="RowType">The type of data that will be read and updated. Though it is called RowType for clarity, you could manipulate a collection of rows.</typeparam> 
    /// <typeparam name="StateObjectType">The type of the user-supplied state object</typeparam> 
    public class AzureDataUpdate<RowType, StateObjectType> 
     where RowType : class 
    { 
     /// <summary> 
     /// Function to retrieve the data that will be updated. 
     /// This function will be called at least once. It will also be called any time a concurrency update conflict occurs. 
     /// </summary> 
     public delegate RowType DataRetriever(StateObjectType stateObj); 

     /// <summary> 
     /// Function to apply the desired changes to the data. 
     /// This will be called after each time the DataRetriever function is called. 
     /// If you are using a TableServiceContext with MergeOption.PreserveChanges set, this function can be a no-op after the first call 
     /// </summary> 
     public delegate void DataMutator(RowType data, StateObjectType stateObj); 

     /// <summary> 
     /// Function to persist the modified data. The may be called multiple times. 
     /// </summary> 
     /// <param name="data"></param> 
     /// <param name="stateObj"></param> 
     public delegate void DataPersister(RowType data, StateObjectType stateObj); 

     public DataRetriever RetrieverFunction { get; set; } 
     public DataMutator MutatorFunction { get; set; } 
     public DataPersister PersisterFunction { get; set; } 

     public AzureDataUpdate() 
     { 
     } 

     public AzureDataUpdate(DataRetriever retrievalFunc, DataMutator mutatorFunc, DataPersister persisterFunc) 
     { 
      this.RetrieverFunction = retrievalFunc; 
      this.MutatorFunction = mutatorFunc; 
      this.PersisterFunction = persisterFunc; 
     } 

     public RowType Execute(StateObjectType userState) 
     { 
      if (RetrieverFunction == null) 
      { 
       throw new InvalidOperationException("Must provide a data retriever function before executing"); 
      } 
      else if (MutatorFunction == null) 
      { 
       throw new InvalidOperationException("Must provide a data mutator function before executing"); 
      } 
      else if (PersisterFunction == null) 
      { 
       throw new InvalidOperationException("Must provide a data persister function before executing"); 
      } 

      //Retrieve and modify data 
      RowType data = this.DoRetrieve(userState); 

      //Call the mutator function. 
      MutatorFunction(data, userState); 

      //persist changes 
      int attemptNumber = 1; 
      while (true) 
      { 
       bool isPreconditionFailedResponse = false; 

       try 
       { 
        PersisterFunction(data, userState); 
        return data; //return the mutated data 
       } 
       catch (DataServiceRequestException dsre) 
       { 
        DataServiceResponse resp = dsre.Response; 

        int statusCode = -1; 
        if (resp.IsBatchResponse) 
        { 
         statusCode = resp.BatchStatusCode; 
        } 
        else if (resp.Any()) 
        { 
         statusCode = resp.First().StatusCode; 
        } 

        isPreconditionFailedResponse = (statusCode == (int)HttpStatusCode.PreconditionFailed); 
        if (!ShouldRetryPersist(attemptNumber, dsre, isPreconditionFailedResponse, userState)) 
        { 
         throw; 
        } 
       } 
       catch (DataServiceClientException dsce) 
       { 
        isPreconditionFailedResponse = (dsce.StatusCode == (int)HttpStatusCode.PreconditionFailed); 
        if (!ShouldRetryPersist(attemptNumber, dsce, isPreconditionFailedResponse, userState)) 
        { 
         throw; 
        } 
       } 
       catch (StorageClientException sce) 
       { 
        isPreconditionFailedResponse = (sce.StatusCode == HttpStatusCode.PreconditionFailed); 
        if (!ShouldRetryPersist(attemptNumber, sce, isPreconditionFailedResponse, userState)) 
        { 
         throw; 
        } 
       } 
       catch (Exception ex) 
       { 
        if (!ShouldRetryPersist(attemptNumber, ex, false, userState)) 
        { 
         throw; 
        } 
       } 

       if (isPreconditionFailedResponse) 
       { 
        //Refetch the data, re-apply the mutator 
        data = DoRetrieve(userState); 
        MutatorFunction(data, userState); 
       } 

       attemptNumber++; 
      } 
     } 

     /// <summary> 
     /// Retrieve the data to be updated, possibly with retries 
     /// </summary> 
     /// <param name="userState">The UserState for this operation</param> 
     private RowType DoRetrieve(StateObjectType userState) 
     { 
      int attemptNumber = 1; 

      while (true) 
      { 
       try 
       { 
        return RetrieverFunction(userState); 
       } 
       catch (Exception ex) 
       { 
        if (!ShouldRetryRetrieval(attemptNumber, ex, userState)) 
        { 
         throw; 
        } 
       } 

       attemptNumber++; 
      } 
     } 

     /// <summary> 
     /// Determine whether a data retrieval should be retried. 
     /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation. 
     /// </summary> 
     /// <param name="attemptNumber">What number attempt is this. </param> 
     /// <param name="ex">The exception that was caught</param> 
     /// <param name="userState">The user-supplied state object for this operation</param> 
     /// <returns>True to attempt the retrieval again, false to abort the retrieval and fail the update attempt</returns> 
     protected virtual bool ShouldRetryRetrieval(int attemptNumber, Exception ex, StateObjectType userState) 
     { 
      //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time 
      if (attemptNumber < 3) 
      { 
       Thread.Sleep(1000); 
       return true; 
      } 
      else 
      { 
       return false; 
      } 
     } 

     /// <summary> 
     /// Determine whether a data update should be retried. If the <paramref name="isPreconditionFailed"/> param is true, 
     /// then the retrieval and mutation process will be repeated as well 
     /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation. 
     /// </summary> 
     /// <param name="attemptNumber">What number attempt is this. </param> 
     /// <param name="ex">The exception that was caught</param> 
     /// <param name="userState">The user-supplied state object for this operation</param> 
     /// <param name="isPreconditionFailedResponse">Indicates whether the exception is a PreconditionFailed response. ie, an optimistic concurrency failure</param> 
     /// <returns>True to attempt the update again, false to abort the retrieval and fail the update attempt</returns> 
     protected virtual bool ShouldRetryPersist(int attemptNumber, Exception ex, bool isPreconditionFailedResponse, StateObjectType userState) 
     { 
      if (isPreconditionFailedResponse) 
      { 
       return true; //retry immediately 
      } 
      else 
      { 
       //For other failures, wait to retry 
       //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time 
       if (attemptNumber < 3) 
       { 
        Thread.Sleep(1000); 
        return true; 
       } 
       else 
       { 
        return false; 
       } 
      } 
     } 
    } 
} 
Questions connexes