2009-02-12 16 views
38

Je suis désolé pour une question redondante. Cependant, j'ai trouvé beaucoup de solutions à mon problème mais aucun d'eux n'est très bien expliqué. J'espère que cela sera clair, ici.Attendez que les discussions groupées soient complétées

Le thread principal de mon application C# engendre des 1..n travailleurs en arrière-plan utilisant le ThreadPool. Je souhaite que le fil d'origine se verrouille jusqu'à ce que tous les travailleurs aient terminé. J'ai fait des recherches sur le ManualResetEvent en particulier, mais je ne suis pas clair sur son utilisation.

En pseudo:

foreach(var o in collection) 
{ 
    queue new worker(o); 
} 

while(workers not completed) { continue; } 

Si nécessaire, je vais connaître le nombre de travailleurs qui sont sur le point d'être mis en attente avant la main.

+0

Salut jeter un oeil à poste similaire ici http://stackoverflow.com/questions/358721/be-notified-when-all-background-threadpool-threads-are-finished – Valentin

Répondre

54

essayer. La fonction prend en compte une liste de délégués d'action. Il ajoutera une entrée ThreadPool worker pour chaque élément de la liste. Il attendra que chaque action se termine avant de revenir.

public static void SpawnAndWait(IEnumerable<Action> actions) 
{ 
    var list = actions.ToList(); 
    var handles = new ManualResetEvent[actions.Count()]; 
    for (var i = 0; i < list.Count; i++) 
    { 
     handles[i] = new ManualResetEvent(false); 
     var currentAction = list[i]; 
     var currentHandle = handles[i]; 
     Action wrappedAction =() => { try { currentAction(); } finally { currentHandle.Set(); } }; 
     ThreadPool.QueueUserWorkItem(x => wrappedAction()); 
    } 

    WaitHandle.WaitAll(handles); 
} 
+6

WaitHandle.WaitAll échoue si le nombre de handles est plus grand que le système le permet. Sur mon serveur Win2k3 ce nombre est de 64, donc je reçois une exception quand j'essaie de générer plus de 64 éléments ... –

+1

@Eran, essayez d'écrire un SpawAndWaitHelper qui a essentiellement le code ci-dessus. Utilisez SpawAndWait pour diviser l'énumérable en 64 morceaux de taille et appeler l'aide pour chaque morceau. – JaredPar

+0

ah ... http://stackoverflow.com/questions/1045980/is-there-a-better-way-to-wait-for-queued-threads/1074770#1074770 –

13

D'abord, combien de temps les travailleurs exécutent-ils? Les threads de pool doivent généralement être utilisés pour des tâches de courte durée - s'ils doivent s'exécuter pendant un certain temps, tenez compte des threads manuels.

Relatif au problème; Avez-vous réellement besoin de bloquer le thread principal? Pouvez-vous utiliser un rappel à la place? Si oui, quelque chose comme:

int running = 1; // start at 1 to prevent multiple callbacks if 
      // tasks finish faster than they are started 
Action endOfThread = delegate { 
    if(Interlocked.Decrement(ref running) == 0) { 
     // ****run callback method**** 
    } 
}; 
foreach(var o in collection) 
{ 
    var tmp = o; // avoid "capture" issue 
    Interlocked.Increment(ref running); 
    ThreadPool.QueueUserWorkItem(delegate { 
     DoSomeWork(tmp); // [A] should handle exceptions internally 
     endOfThread(); 
    }); 
} 
endOfThread(); // opposite of "start at 1" 

Ceci est une façon assez légère (sans primitives d'OS) de suivre les travailleurs.

Si vous avez besoin pour bloquer, vous pouvez faire la même chose en utilisant un Monitor (encore une fois, en évitant un objet OS):

object syncLock = new object(); 
    int running = 1; 
    Action endOfThread = delegate { 
     if (Interlocked.Decrement(ref running) == 0) { 
      lock (syncLock) { 
       Monitor.Pulse(syncLock); 
      } 
     } 
    }; 
    lock (syncLock) { 
     foreach (var o in collection) { 
      var tmp = o; // avoid "capture" issue 
      ThreadPool.QueueUserWorkItem(delegate 
      { 
       DoSomeWork(tmp); // [A] should handle exceptions internally 
       endOfThread(); 
      }); 
     } 
     endOfThread(); 
     Monitor.Wait(syncLock); 
    } 
    Console.WriteLine("all done"); 
+2

Votre code attendra indéfiniment si un des délégués jette une exception. – JaredPar

+2

Si l'un de ces délégués lance une exception, je vais perdre tout le processus, donc c'est assez arbitraire ... Je suppose qu'il ne sera pas lancé, mais je vais le rendre explicite ;-p –

+0

les employés traiteront des opérations coûteuses, y compris la lecture et l'écriture de fichiers, et effectueront des sélections SQL et des insertions impliquant des colonnes Binary/Image. Il est peu probable qu'ils vivent assez longtemps pour exiger des threads explicites, mais les performances pourraient être obtenues en les laissant s'exécuter en parallèle. – Kivin

1

Je pense que vous étiez sur la bonne voie avec ManualResetEvent. Cette link a un échantillon de code qui correspond de près à ce que vous essayez de faire. La clé consiste à utiliser WaitHandle.WaitAll et passer un tableau d'événements d'attente. Chaque thread doit définir l'un de ces événements d'attente.

// Simultaneously calculate the terms. 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateBase)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateFirstTerm)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateSecondTerm)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateThirdTerm)); 

    // Wait for all of the terms to be calculated. 
    WaitHandle.WaitAll(autoEvents); 

    // Reset the wait handle for the next calculation. 
    manualEvent.Reset(); 

Edit:

Assurez-vous que dans votre chemin de code thread de travail que vous définissez l'événement (à savoir autoEvents 1 .Set();). Une fois qu'ils sont tous signalés, waitAll reviendra.

void CalculateSecondTerm(object stateInfo) 
{ 
    double preCalc = randomGenerator.NextDouble(); 
    manualEvent.WaitOne(); 
    secondTerm = preCalc * baseNumber * 
     randomGenerator.NextDouble(); 
    autoEvents[1].Set(); 
} 
29

Voici une approche différente - l'encapsulation; de sorte que votre code pourrait être aussi simple que:

Forker p = new Forker(); 
    foreach (var obj in collection) 
    { 
     var tmp = obj; 
     p.Fork(delegate { DoSomeWork(tmp); }); 
    } 
    p.Join(); 

Lorsque la classe Forker est donnée ci-dessous (je me suis ennuyé sur le ;-p train) ... encore une fois, ce qui évite les objets OS, mais enveloppements choses tout à fait nettement (OMI):

using System; 
using System.Threading; 

/// <summary>Event arguments representing the completion of a parallel action.</summary> 
public class ParallelEventArgs : EventArgs 
{ 
    private readonly object state; 
    private readonly Exception exception; 
    internal ParallelEventArgs(object state, Exception exception) 
    { 
     this.state = state; 
     this.exception = exception; 
    } 

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary> 
    public object State { get { return state; } } 

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary> 
    public Exception Exception { get { return exception; } } 
} 

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary> 
public sealed class Forker 
{ 
    int running; 
    private readonly object joinLock = new object(), eventLock = new object(); 

    /// <summary>Raised when all operations have completed.</summary> 
    public event EventHandler AllComplete 
    { 
     add { lock (eventLock) { allComplete += value; } } 
     remove { lock (eventLock) { allComplete -= value; } } 
    } 
    private EventHandler allComplete; 
    /// <summary>Raised when each operation completes.</summary> 
    public event EventHandler<ParallelEventArgs> ItemComplete 
    { 
     add { lock (eventLock) { itemComplete += value; } } 
     remove { lock (eventLock) { itemComplete -= value; } } 
    } 
    private EventHandler<ParallelEventArgs> itemComplete; 

    private void OnItemComplete(object state, Exception exception) 
    { 
     EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock 
     if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); 
     if (Interlocked.Decrement(ref running) == 0) 
     { 
      EventHandler allHandler = allComplete; // don't need to lock 
      if (allHandler != null) allHandler(this, EventArgs.Empty); 
      lock (joinLock) 
      { 
       Monitor.PulseAll(joinLock); 
      } 
     } 
    } 

    /// <summary>Adds a callback to invoke when each operation completes.</summary> 
    /// <returns>Current instance (for fluent API).</returns> 
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 
     ItemComplete += handler; 
     return this; 
    } 

    /// <summary>Adds a callback to invoke when all operations are complete.</summary> 
    /// <returns>Current instance (for fluent API).</returns> 
    public Forker OnAllComplete(EventHandler handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 
     AllComplete += handler; 
     return this; 
    } 

    /// <summary>Waits for all operations to complete.</summary> 
    public void Join() 
    { 
     Join(-1); 
    } 

    /// <summary>Waits (with timeout) for all operations to complete.</summary> 
    /// <returns>Whether all operations had completed before the timeout.</returns> 
    public bool Join(int millisecondsTimeout) 
    { 
     lock (joinLock) 
     { 
      if (CountRunning() == 0) return true; 
      Thread.SpinWait(1); // try our luck... 
      return (CountRunning() == 0) || 
       Monitor.Wait(joinLock, millisecondsTimeout); 
     } 
    } 

    /// <summary>Indicates the number of incomplete operations.</summary> 
    /// <returns>The number of incomplete operations.</returns> 
    public int CountRunning() 
    { 
     return Interlocked.CompareExchange(ref running, 0, 0); 
    } 

    /// <summary>Enqueues an operation.</summary> 
    /// <param name="action">The operation to perform.</param> 
    /// <returns>The current instance (for fluent API).</returns> 
    public Forker Fork(ThreadStart action) { return Fork(action, null); } 

    /// <summary>Enqueues an operation.</summary> 
    /// <param name="action">The operation to perform.</param> 
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param> 
    /// <returns>The current instance (for fluent API).</returns> 
    public Forker Fork(ThreadStart action, object state) 
    { 
     if (action == null) throw new ArgumentNullException("action"); 
     Interlocked.Increment(ref running); 
     ThreadPool.QueueUserWorkItem(delegate 
     { 
      Exception exception = null; 
      try { action(); } 
      catch (Exception ex) { exception = ex;} 
      OnItemComplete(state, exception); 
     }); 
     return this; 
    } 
} 
+0

(HI MARC! Rappelez-vous cet article ??) Par curiosité, pourquoi var tmp = obj est-il nécessaire? Je l'ai implémenté en passant simplement mon objet et j'ai obtenu des résultats fous. Changer pour utiliser var a fini par marcher. Je ne comprends clairement pas quelque chose! Merci, et voyez si vous vous en souvenez après seulement deux ans :) – DanTheMan

+1

@user La réponse à cela est un peu compliquée, mais en bref, c'est parce que C# ne parvient pas à faire exactement ce que vous vouliez dire sans même vous en rendre compte. Il est généralement très bon de le faire sans ambiguïté à tous les bons endroits, mais pas dans ce cas. –

+4

Vous devez comprendre que le code 'delegate {DoSomeWork (tmp); } '* capture * la variable' tmp'. Chaque appel à ce code capture une variable * différente * chaque fois autour de la boucle, car la portée de 'tmp' est confinée au corps de la boucle. Cependant, la variable 'foreach' est la même variable * à chaque fois autour de la boucle, donc tous les appels à' delegate {DoSomeWork (tmp); } 'capture la même chose. Cela n'a vraiment pas besoin d'être comme ça; limiter la portée de la variable foreach aurait fait beaucoup de code "juste travailler" sans que les gens réalisent même la complexité de la situation. –

1

Utilisation de .NET 4.0 Barrie classe r:

 Barrier sync = new Barrier(1); 

     foreach(var o in collection) 
     { 
      WaitCallback worker = (state) => 
      { 
       // do work 
       sync.SignalAndWait(); 
      }; 

      sync.AddParticipant(); 
      ThreadPool.QueueUserWorkItem(worker, o); 
     } 

     sync.SignalAndWait(); 
+1

Il existe une limite supérieure du nombre de participants pouvant être utilisés. :( –

8

J'utilise la nouvelle bibliothèque de tâches en parallèle dans CTP here:

 Parallel.ForEach(collection, o => 
      { 
       DoSomeWork(o); 
      }); 
+0

Bonne suggestion! Aussi plus facile quand il s'agit de gérer les exceptions.Voir: http://msdn.microsoft.com/en-us/library/dd991486.aspx – Joop

+0

Soyez particulièrement prudent car cela utilise le ThreadPool et ce n'est pas possible pour forcer l'utilisation de threads dédiés (non gérés) Même en utilisant l'option TaskFactory avec LongRunning sous-jacente fournit uniquement un indice au planificateur, mais n'est pas une garantie pour un thread dédié. – eduncan911

3

Voici une solution en utilisant la classe CountdownEvent.

var complete = new CountdownEvent(1); 
foreach (var o in collection) 
{ 
    var capture = o; 
    ThreadPool.QueueUserWorkItem((state) => 
    { 
     try 
     { 
     DoSomething(capture); 
     } 
     finally 
     { 
     complete.Signal(); 
     } 
    }, null); 
} 
complete.Signal(); 
complete.Wait(); 

Bien sûr, si vous avez accès à la classe CountdownEvent alors vous avez tout le TPL de travailler avec. La classe Parallel s'occupe de l'attente pour vous.

Parallel.ForEach(collection, o => 
    { 
    DoSomething(o); 
    }); 
Questions connexes