2011-12-19 4 views
6

J'ai un morceau de code qui utilise Parallel.ForEach, probablement basé sur une ancienne version des extensions Rx ou la bibliothèque parallèle des tâches. J'ai installé une version actuelle des extensions Rx mais je ne trouve pas Parallel.ForEach. Je ne suis pas d'utiliser tout autre trucs de fantaisie de la bibliothèque et que vous voulez juste pour traiter des données en parallèle comme celui-ci:Extensions Rx: Où est Parallel.ForEach?

Parallel.ForEach(records, ProcessRecord); 

Je trouve this question, mais je ne voudrais pas dépendre d'une ancienne version de Rx. Mais je n'ai pas été en mesure de trouver quelque chose de similaire pour Rx, alors quelle est la manière actuelle et la plus simple de le faire en utilisant une version Rx actuelle? Le projet utilise .NET 3.5.

+2

Mettre à niveau vers .NET 4.0? – dtb

+0

duplication possible de [Parallel.ForEach manquant de Reactive Extensions pour .Net 3.5] (http://stackoverflow.com/questions/7398962/parallel-foreach-missing-from-reactive-extensions-for-net-3-5) –

+1

@Hasan: J'ai lié à la question que vous avez mentionnée, donc j'étais évidemment au courant. Mais la réponse propose d'utiliser une ancienne version Rx, que je ne veux pas utiliser. – Achim

Répondre

24

Pas besoin de faire tout cela goosery stupide si vous avez Rx:

records.ToObservable() 
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .ToList() 
    .First(); 

(Ou, si vous voulez que l'ordre des éléments maintenus à la coût de l'efficacité):

records.ToObservable() 
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .Concat() 
    .ToList() 
    .First(); 

Ou si vous voulez limiter le nombre d'articles en même temps:

records.ToObservable() 
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))) 
    .Merge(5 /* at a time */) 
    .ToList() 
    .First(); 
+0

Comment voulez-vous limiter cela à ne traiter que n éléments à un moment donné? J'essaye ceci avec des centaines d'articles et j'essaye de les faire tout à la fois ce qui conduit à d'énormes problèmes de mémoire dans mon application. Je voudrais être en mesure de limiter à 5 à la fois dire. – Clint

+2

Mis à jour pour inclure la simultanéité limitative –

1

est ici un simple remplacement:

class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body) 
    { 
     if (source == null) 
     { 
      throw new ArgumentNullException("source"); 
     } 
     if (body == null) 
     { 
      throw new ArgumentNullException("body"); 
     } 
     var items = new List<T>(source); 
     var countdown = new CountdownEvent(items.Count); 
     WaitCallback callback = state => 
     { 
      try 
      { 
       body((T)state); 
      } 
      finally 
      { 
       countdown.Signal(); 
      } 
     }; 
     foreach (var item in items) 
     { 
      ThreadPool.QueueUserWorkItem(callback, item); 
     } 
     countdown.Wait(); 
    } 
} 
+0

Cela semble prometteur, mais comment le code sait-il quand tous les articles ont été traités? Parallel.ForEach bloque jusqu'à ce que tous les éléments aient été traités, donc je n'ai pas à m'en soucier. – Achim

+1

C'est évidemment une simplification de ce que fait Parallel.ForEach. J'ai étendu ma réponse à bloquer jusqu'à ce que tous les éléments ont été traités. – dtb

+2

Il vaut la peine de considérer ce que le compromis est ici par rapport à la mise à niveau vers la version 4.0. Les auteurs de 'Parallel.ForEach' ont mis beaucoup de travail pour obtenir un bon équilibre entre le degré de concurrence et la quantité de temps système. D'autre part, @dtb a donné une bonne réponse qui prend environ 30LoC. Si vous trouvez cette réponse, tout ce dont vous avez besoin est bien, alors les jours heureux. Si vous vous trouvez en train de peaufiner et de peaufiner le code pour qu'il devienne des centaines de LoC, alors vous n'avez plus une bonne version courte, mais une mauvaise version longue où vous réécrivez essentiellement de plus en plus ce que "Parallel" a testé. –

Questions connexes