Je ne pense pas que vous avez réellement besoin de descendre et sale avec TPL Tasks
directe pour cela. Pour commencer, je voudrais mettre en place un BlockingCollection
autour d'un ConcurrentQueue
(la valeur par défaut) sans BoundedCapacity
sur le BlockingCollection
pour stocker les ID qui doivent être traitées.
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
De là, je voudrais simplement utiliser Parallel::ForEach
sur l'énumération de retour de la BlockingCollection::GetConsumingEnumerable
. Dans l'appel ForEach
vous allez configurer votre ParallelOptions::MaxDegreeOfParallelism
Dans le corps du ForEach
vous allez exécuter votre procédure stockée.
Maintenant, une fois l'exécution de la procédure stockée terminée, vous dites que vous ne voulez pas reconfigurer l'exécution pour au moins deux secondes. Pas de problème, programmez un System.Threading.Timer
avec un rappel qui ajoutera simplement l'ID au BlockingCollection
dans le rappel fourni.
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...
// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;
timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);
// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}
Enfin, lorsque le processus est en cours d'arrêt que vous appelleriez BlockingCollection::CompleteAdding
afin que le dénombrable en cours de traitement avec blocage d'arrêt et complète et parallèle :: ForEach quittera. S'il s'agissait d'un service Windows par exemple, vous le feriez dans OnStop
.
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
Mise à jour
Vous avez soulevé une préoccupation valable dans votre commentaire que vous pourriez être en train de traiter une grande quantité d'ID à un moment donné et craignent qu'il y aurait trop de frais généraux dans une minuterie par ID .Je suis absolument d'accord avec cela. Donc, dans le cas où vous traitez une grande liste d'ID simultanément, je passerais d'une minuterie par ID à une autre file pour conserver les identifiants «sleep» qui sont surveillés par un seul minuteur à intervalle court. D'abord, vous aurez besoin d'un ConcurrentQueue
sur lequel placer les ID qui sont endormis:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
Maintenant, j'utilise deux parties Tuple
ici à titre d'illustration, mais vous pouvez créer un plus fortement typé struct pour cela (ou au moins l'alias avec une instruction using
) pour une meilleure lisibilité. Le tuple a l'id et un DateTime qui représente quand il a été mis dans la file d'attente.
Maintenant, vous aurez également besoin de configurer la minuterie qui surveillera cette file d'attente:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);
Ensuite, vous simplement changer le Parallel::ForEach
pour effectuer les opérations suivantes au lieu de mettre en place une minuterie pour chacun:
(id) =>
{
// ... execute sproc ...
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}
programmation réactive –