2011-04-14 4 views
1

J'ai besoin de découper les gros fichiers CSV en plusieurs insertions db différentes en utilisant SqlBulkCopy. J'ai l'intention de le faire via 2 tâches distinctes, 1 pour le traitement par lots du fichier CSV et un autre pour l'insertion dans la base de données. À titre d'exemple voici ce que je suis la chose:Producteur/Consommateur parallèle avec tolérance aux pannes?

public class UberTask 
{ 
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>(); 

    public void PerformTask() 
    { 
     var notifier = new UINotifier(); 
     Task.Factory.StartNew(() => 
            { 
             for (int i =0; i < 10; i++) 
             { 
              string description = string.Format("Scenario {0}", i); 

              notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description))); 

              // represents reading the CSV file. 
              Thread.Sleep(500); 
              notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description))); 
              _store.Add(new Tuple<string, int>(description, i)); 
             } 
             _store.CompleteAdding(); 
            }); 

     var consumer = Task.Factory.StartNew(() => 
               { 
                foreach (var item in _store.GetConsumingEnumerable()) 
                { 
                 var poppedItem = item; 
                 notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1))); 
                 // represents sending stuff to the database. 
                 Thread.Sleep(1000); 
                } 
               }); 
     consumer.Wait(); 
     Console.WriteLine("complete"); 
    } 
} 

Est-ce un bon moyen d'appairer 2 ensembles de tâches connexes? Ce que le code ci-dessus ne gère pas (ce qui doit):

  • Si la tâche qui représente les défauts de lecture CSV, l'autre tâche doit cesser
  • (même s'il y a encore des éléments à _store.) Si la tâche qui représente le db insère des erreurs, l'autre processus peut simplement arrêter le traitement.
  • Si l'une des tâches appariées est défectueuse, j'aurai besoin d'effectuer une action pour annuler les mises à jour db (je ne m'inquiète pas de la façon dont je vais annuler), mais plutôt de savoir comment une des tâches jumelées, je dois donc faire du rangement ".

Une aide sur ce qui précède serait grandement appréciée!

Répondre

2

Vous pouvez utiliser des jetons de gestion des exceptions et d'annulation pour ce faire. Lorsqu'un stade de pipeline détecte une erreur, il l'attrape et définit le jeton. Cela annulera les autres étapes. Le bloc finally s'assure que l'appel à CompleteAdding() est effectué. Ceci est important car l'étape de pipeline de réception peut être bloquée dans une attente sur la collection et ne traitera pas l'annulation tant qu'elle n'aura pas été débloquée.

Vous souhaitez également déplacer des objets non traités dans votre collection ou, dans votre cas, nettoyer vos connexions DB lorsque l'étape de pipeline est terminée (dans la phase finale) et/ou lorsque l'ensemble du pipeline est arrêté.

Voici un exemple d'une étape de pipeline qui le fait:

static void LoadPipelinedImages(IEnumerable<string> fileNames, 
            string sourceDir, 
            BlockingCollection<ImageInfo> original, 
            CancellationTokenSource cts) 
    { 
     // ... 
     var token = cts.Token; 
     ImageInfo info = null; 
     try 
     { 
      foreach (var fileName in fileNames) 
      { 
       if (token.IsCancellationRequested) 
        break; 
       info = LoadImage(fileName, ...); 
       original.Add(info, token); 
       info = null; 
      }     
     } 
     catch (Exception e) 
     { 
      // in case of exception, signal shutdown to other pipeline tasks 
      cts.Cancel(); 
      if (!(e is OperationCanceledException)) 
       throw; 
     } 
     finally 
     { 
      original.CompleteAdding(); 
      if (info != null) info.Dispose(); 
     } 
    } 

Le code global de pipeline ressemble à ceci. Il prend également en charge l'annulation du pipeline en externe (à partir de l'interface utilisateur) en définissant le jeton d'annulation.

static void RunPipelined(IEnumerable<string> fileNames, 
          string sourceDir, 
          int queueLength, 
          Action<ImageInfo> displayFn, 
          CancellationTokenSource cts) 
    { 
     // Data pipes 
     var originalImages = new BlockingCollection<ImageInfo>(queueLength); 
     var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength); 
     var filteredImages = new BlockingCollection<ImageInfo>(queueLength); 
     try 
     { 
      var f = new TaskFactory(TaskCreationOptions.LongRunning, 
            TaskContinuationOptions.None); 
      // ... 

      // Start pipelined tasks 
      var loadTask = f.StartNew(() => 
        LoadPipelinedImages(fileNames, sourceDir, 
             originalImages, cts)); 

      var scaleTask = f.StartNew(() => 
        ScalePipelinedImages(originalImages, 
             thumbnailImages, cts)); 

      var filterTask = f.StartNew(() => 
        FilterPipelinedImages(thumbnailImages, 
             filteredImages, cts)); 

      var displayTask = f.StartNew(() => 
        DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
         ... cts)); 

      Task.WaitAll(loadTask, scaleTask, filterTask, displayTask); 
     } 
     finally 
     { 
      // in case of exception or cancellation, there might be bitmaps 
      // that need to be disposed. 
      DisposeImagesInQueue(originalImages); 
      DisposeImagesInQueue(thumbnailImages); 
      DisposeImagesInQueue(filteredImages);     
     } 
    } 

Pour un échantillon complet voir l'exemple Pipeline dans le téléchargement ici:

http://parallelpatterns.codeplex.com/releases/view/50473

Discuté ici:

http://msdn.microsoft.com/en-us/library/ff963548.aspx

+0

Merci pour la réponse détaillée Ade! – primalgeek

Questions connexes