2011-06-10 3 views
6

Je travaille actuellement sur un projet, où nous avons le défi de traiter des items en parallèle. Jusqu'à présent, pas une grosse affaire;) Maintenant au problème. Nous avons une liste d'ID, où nous périodiquement (toutes les 2 secondes) quoi appeler un StoredProcedure pour chaque ID. Les 2 secondes doivent être vérifiées individuellement pour chaque élément, car elles sont ajoutées et supprimées pendant l'exécution. En outre, nous voulons configurer le degré maximal de parallélisme, car la base de données ne doit pas être inondée de 300 threads simultanément. Un élément en cours de traitement ne doit pas être replanifié pour le traitement tant qu'il n'a pas terminé l'exécution précédente. La raison est que nous voulons empêcher la mise en file d'attente de beaucoup d'éléments, en cas de retards sur la base de données. À l'heure actuelle, nous utilisons un composant auto-développé, qui comporte un thread principal, qui vérifie périodiquement quels éléments doivent être planifiés pour le traitement. Une fois qu'il a la liste, il les supprime sur un pool de threads basé sur IOCP personnalisé, puis utilise les waithandles pour attendre les éléments en cours de traitement. Ensuite, l'itération suivante commence. IOCP en raison du travail-voler il fournit. Je voudrais remplacer cette implémentation personnalisée par une version TPL/.NET 4, et j'aimerais savoir comment vous la résoudriez (idéalement simple et bien lisible/maintenable). Je connais cet article: http://msdn.microsoft.com/en-us/library/ee789351.aspx, mais cela limite simplement la quantité de threads utilisés. Feuilles travaillent vol, exécutant périodiquement les articles ....Question d'architecture TPL

Idéalement, il deviendra un composant générique, qui peut être utilisé pour certaines des tâches qui doivent être effectuées périodiquement pour une liste d'éléments.

tout accueil d'entrée, tia Martin

+0

programmation réactive –

Répondre

9

Je ne pense pas que vous avez réellement besoin de descendre et sale avec TPL Tasks directe pour cela. Pour commencer, je voudrais mettre en place un BlockingCollection autour d'un ConcurrentQueue (la valeur par défaut) sans BoundedCapacity sur le BlockingCollection pour stocker les ID qui doivent être traitées.

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) 
BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 

De là, je voudrais simplement utiliser Parallel::ForEach sur l'énumération de retour de la BlockingCollection::GetConsumingEnumerable. Dans l'appel ForEach vous allez configurer votre ParallelOptions::MaxDegreeOfParallelism Dans le corps du ForEach vous allez exécuter votre procédure stockée.

Maintenant, une fois l'exécution de la procédure stockée terminée, vous dites que vous ne voulez pas reconfigurer l'exécution pour au moins deux secondes. Pas de problème, programmez un System.Threading.Timer avec un rappel qui ajoutera simplement l'ID au BlockingCollection dans le rappel fourni.

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(), 
    new ParallelOptions 
    { 
     MaxDegreeOfParallelism = 4 // read this from config 
    }, 
    (id) => 
    { 
     // ... execute sproc ... 

     // Need to declare/assign this before the delegate so that we can dispose of it inside 
     Timer timer = null; 

     timer = new Timer(
      _ => 
      { 
       // Add the id back to the collection so it will be processed again 
       idsToProcess.Add(id); 

       // Cleanup the timer 
       timer.Dispose(); 
      }, 
      null, // no state, id wee need is "captured" in the anonymous delegate 
      2000, // probably should read this from config 
      Timeout.Infinite); 
    } 

Enfin, lorsque le processus est en cours d'arrêt que vous appelleriez BlockingCollection::CompleteAdding afin que le dénombrable en cours de traitement avec blocage d'arrêt et complète et parallèle :: ForEach quittera. S'il s'agissait d'un service Windows par exemple, vous le feriez dans OnStop.

// When ready to shutdown you just signal you're done adding 
idsToProcess.CompleteAdding(); 

Mise à jour

Vous avez soulevé une préoccupation valable dans votre commentaire que vous pourriez être en train de traiter une grande quantité d'ID à un moment donné et craignent qu'il y aurait trop de frais généraux dans une minuterie par ID .Je suis absolument d'accord avec cela. Donc, dans le cas où vous traitez une grande liste d'ID simultanément, je passerais d'une minuterie par ID à une autre file pour conserver les identifiants «sleep» qui sont surveillés par un seul minuteur à intervalle court. D'abord, vous aurez besoin d'un ConcurrentQueue sur lequel placer les ID qui sont endormis:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>(); 

Maintenant, j'utilise deux parties Tuple ici à titre d'illustration, mais vous pouvez créer un plus fortement typé struct pour cela (ou au moins l'alias avec une instruction using) pour une meilleure lisibilité. Le tuple a l'id et un DateTime qui représente quand il a été mis dans la file d'attente.

Maintenant, vous aurez également besoin de configurer la minuterie qui surveillera cette file d'attente:

Timer wakeSleepingIdsTimer = new Timer(
    _ => 
    { 
     DateTime utcNow = DateTime.UtcNow; 

     // Pull all items from the sleeping queue that have been there for at least 2 seconds 
     foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) 
     { 
      // Add this id back to the processing queue 
      idsToProcess.Enqueue(id); 
     } 
    }, 
    null, // no state 
    Timeout.Infinite, // no due time 
    100 // wake up every 100ms, probably should read this from config 
); 

Ensuite, vous simplement changer le Parallel::ForEach pour effectuer les opérations suivantes au lieu de mettre en place une minuterie pour chacun:

(id) => 
{ 
     // ... execute sproc ... 

     sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
} 
+0

bonne idée, mais vous ne pensez pas que cela va créer un peu d'un problème de ressources? Je veux dire si j'ai par exemple 500 éléments dans la liste, je suis un peu préoccupé par la grande quantité de minuteurs en cours d'exécution ... –

+0

J'ai pensé à cela, mais vous n'avez pas donné de limites claires, alors j'attendais une réponse pour savoir si cela répondait à votre Besoins. Vous pouvez facilement y remédier avec une autre file d'attente et une seule minuterie qui surveille la file d'attente pour les entrées qui sont dues et les replace dans la file d'attente principale. Va ajouter des détails à ma réponse. –

1

Ceci est assez similaire à l'approche que vous avez déjà utilisée dans votre question, mais c'est le cas avec les tâches TPL. Une tâche s'ajoute simplement à une liste de choses à programmer quand c'est fait.

L'utilisation de verrouillage sur une simple liste est assez laid dans cet exemple, voudrait probablement une meilleure collection de tenir la liste des choses à planifier

// Fill the idsToSchedule 
for (int id = 0; id < 5; id++) 
{ 
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id)); 
} 

// LongRunning will tell TPL to create a new thread to run this on 
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning); 

Cela commence le SchedulingLoop, qui effectue en fait la vérifier si ses deux été secondes depuis quelque chose a couru

// Tuple of the last time an id was processed and the id of the thing to schedule 
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>(); 
static int currentlyProcessing = 0; 
const int ProcessingLimit = 3; 

// An event loop that performs the scheduling 
public static void SchedulingLoop() 
{ 
    while (true) 
    { 
     lock (idsToSchedule) 
     { 
      DateTime currentTime = DateTime.Now; 
      for (int index = idsToSchedule.Count - 1; index >= 0; index--) 
      { 
       var scheduleItem = idsToSchedule[index]; 
       var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds; 

       // start it executing in a background task 
       if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit) 
       { 
        Interlocked.Increment(ref currentlyProcessing); 

        Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun); 

        // Schedule this task to be processed 
        Task.Factory.StartNew(() => 
         { 
          Console.WriteLine("Executing {0}", scheduleItem.Item2); 

          // simulate the time taken to call this procedure 
          Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500); 

          lock (idsToSchedule) 
          { 
           idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2)); 
          } 

          Console.WriteLine("Done Executing {0}", scheduleItem.Item2); 
          Interlocked.Decrement(ref currentlyProcessing); 
         }); 

        // remove this from the list of things to schedule 
        idsToSchedule.RemoveAt(index); 
       } 
      } 
     } 

     Thread.Sleep(100); 
    } 
}