2010-11-01 9 views
4

J'ai besoin d'exécuter un ensemble de fonctions lourdes de manière asynchrone en même temps et de remplir les résultats dans une liste. Voici le code de pseudo pour cela:Comment exécuter un ensemble de fonctions en parallèle et attendre les résultats à la fin?

List<TResult> results = new List<TResults>(); 
List<Func<T, TResult>> tasks = PopulateTasks(); 

foreach(var task in tasks) 
{ 
    // Run Logic in question 
    1. Run each task asynchronously/parallely 
    2. Put the results in the results list upon each task completion 
} 

Console.WriteLine("All tasks completed and results populated"); 

J'ai besoin de la logique à l'intérieur du foreach bock. Pouvez-vous m'aider?

J'ai une certaine contrainte: La solution doit être compatible .Net 3.5 (non .Net 4, mais .net 4 solution alternative serait appréciée pour mon but de la connaissance)

Merci à l'avance.

+0

Voir aussi http://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach (ma réponse est à http://stackoverflow.com/a/25877042/67824) –

Répondre

4

Une simple 3.5 mise en œuvre pourrait ressembler à ceci

List<TResult> results = new List<TResults>(); 
List<Func<T, TResult>> tasks = PopulateTasks(); 

ManualResetEvent waitHandle = new ManualResetEvent(false); 
void RunTasks() 
{ 
    int i = 0; 
    foreach(var task in tasks) 
    { 
     int captured = i++; 
     ThreadPool.QueueUserWorkItem(state => RunTask(task, captured)) 
    } 

    waitHandle.WaitOne(); 

    Console.WriteLine("All tasks completed and results populated"); 
} 

private int counter; 
private readonly object listLock = new object(); 
void RunTask(Func<T, TResult> task, int index) 
{ 
    var res = task(...); //You haven't specified where the parameter comes from 
    lock (listLock) 
    { 
     results[index] = res; 
    } 
    if (InterLocked.Increment(ref counter) == tasks.Count) 
     waitHandle.Set(); 
} 
4
List<Func<T, TResult>> tasks = PopulateTasks(); 
TResult[] results = new TResult[tasks.Length]; 
Parallel.For(0, tasks.Count, i => 
    { 
     results[i] = tasks[i](); 
    }); 

TPL for 3.5 apparently exists.

0

la manière traditionnelle est d'utiliser un Sempahore. Initialisez le sémaphore avec le nombre de threads que vous utilisez, puis lancez un thread par tâche et attendez l'objet sémaphore. Lorsque chaque thread se termine, il doit incrémenter le sémaphore. Lorsque le nombre de sémaphores atteint 0, le thread principal en attente continue.

1
public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks) 
    { 
     List<IAsyncResult> asyncContext = new List<IAsyncResult>(); 
     foreach (var task in tasks) 
     { 
      asyncContext.Add(task.BeginInvoke(null, null)); 
     } 
     return asyncContext; 
    } 

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext) 
    { 
     IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator(); 
     foreach (var task in tasks) 
     { 
      iterator.MoveNext(); 
      yield return task.EndInvoke(iterator.Current); 
     } 
    } 

    public static void Main() 
    { 
     var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList(); 

     var asyncContext = RunAsync(tasks); 
     var results = WaitForAll(tasks, asyncContext); 
     foreach (var result in results) 
     { 
      Console.WriteLine(result); 
     } 
    } 

    public static int ComputeValue() 
    { 
     Thread.Sleep(1000); 
     return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    } 
1

Une autre variante serait avec une petite mise en œuvre du futur modèle:

public class Future<T> 
    { 
     public Future(Func<T> task) 
     { 
      Task = task; 
      _asyncContext = task.BeginInvoke(null, null); 
     } 

     private IAsyncResult _asyncContext; 

     public Func<T> Task { get; private set; } 
     public T Result 
     { 
      get 
      { 
       return Task.EndInvoke(_asyncContext); 
      } 
     } 

     public bool IsCompleted 
     { 
      get { return _asyncContext.IsCompleted; } 
     } 
    } 

    public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks) 
    { 
     List<Future<T>> asyncContext = new List<Future<T>>(); 
     foreach (var task in tasks) 
     { 
      asyncContext.Add(new Future<T>(task)); 
     } 
     return asyncContext; 
    } 

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures) 
    { 
     foreach (var future in futures) 
     { 
      yield return future.Result; 
     } 
    } 

    public static void Main() 
    { 
     var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList(); 

     var futures = RunAsync(tasks); 
     var results = WaitForAll(futures); 
     foreach (var result in results) 
     { 
      Console.WriteLine(result); 
     } 
    } 

    public static int ComputeValue() 
    { 
     Thread.Sleep(1000); 
     return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    } 
0

Est-ce que votre traitement dans les cas de travailleurs séparés, chacun sur leur propre fil. Utilisez un rappel pour renvoyer les résultats et signaler au processus appelant que le thread est terminé. Utilisez un dictionnaire pour suivre les threads en cours d'exécution. Si vous avez beaucoup de threads, vous devez charger une file d'attente et lancer de nouveaux threads quand les anciens seront terminés. Dans cet exemple, tous les threads sont créés avant d'être lancés pour empêcher une condition de concurrence dans laquelle le nombre de threads en cours d'exécution tombe à zéro avant le lancement des threads finaux.

Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>(); 
    void LaunchWorkers() 
    { 
     foreach (var task in tasks) 
     { 
      Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult)); 
      Thread thread = new Thread(worker.Done); 
      thread.IsBackground = true; 
      activeThreads.Add(thread.ManagedThreadId, thread); 
     } 
     lock (activeThreads) 
     { 
      activeThreads.Values.ToList().ForEach(n => n.Start()); 
     } 
    } 

    void ProcessResult(int threadId, TResult result) 
    { 
     lock (results) 
     { 
      results.Add(result); 
     } 
     lock (activeThreads) 
     { 
      activeThreads.Remove(threadId); 
      // done when activeThreads.Count == 0 
     } 
    } 
} 

public delegate void WorkerDoneDelegate(object results); 
class Worker 
{ 
    public WorkerDoneDelegate Done; 
    public void Work(Task task, WorkerDoneDelegate Done) 
    { 
     // process task 
     Done(Thread.CurrentThread.ManagedThreadId, result); 
    } 
} 
Questions connexes