2012-05-14 1 views
3

Je suis en train de tester une application qui devrait compiler beaucoup de projets/fichiers.Parallel.ForEach pas de fil à la fin

J'ai un ConucrrentBag qui devrait être travaillé avec Parallel.

private readonly ConcurrentBag<string> m_files; 

Mon appel pour Parallel est la suivante:

Parallel.ForEach(m_files, new ParallelOptions 
      { 
       MaxDegreeOfParallelism = MaxProcesses, 

      }, currFile => ProcessSingle(currFile.ToString())); 

Le montant de MAXPROCESS est LogicalCpu * 2.

Lorsque je compile 140 projets, Parallel démarrera linéairement moins de threads. Au moins, un seul thread est en cours d'exécution pour les 4 derniers projets. Ce n'est pas bien, mais d'accord.

Maintenant, mon problème:

Quand je compilation sur les projets 14000+ (opération COBOL-SOURCE ;-) et un système vraiment grand) Les derniers modules ne seront pas compilés, parce Parallel.ForEach n » t commencer de nouveaux threads pour cela. Aucun thread de travail n'est actif à ce stade. Mais il y a encore 140 éléments dans le concurrentBag.

Quelqu'un a une idée pour résoudre ce problème?

Modifier: Ce problème se produit uniquement lorsque j'exécute le compilateur. Sans compilateur en cours d'exécution (pour le test plus rapide) Il fonctionne bien ... de

Edit:

Le ConcurrentBag est déjà rempli complètement quand je commence le processus Parallel.ForEach.

Pour plus d'informations, le code SingleProcess:

private void ProcessSingle(string item) 
     { 
      Monitor.Enter(lockingObj); 
      if (m_files.TryTake(out item)) 
      { 
       if (CompilingModules <= 0) 
       { 
        OnQueueStarted(new EventArgs()); 
       } 
       CompilingModules++; 
       Monitor.Exit(lockingObj); 
       OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String)); 

       OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String)); 
       using (CobolCompiler compiler = new CobolCompiler()) 
       { 
        compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e); 
        compiler.Compile(item); 
        Thread.Sleep(2000); 
        if (compiler.LinkFailure) 
        { 
         if (ObjWithoutDll.ContainsKey(item)) 
         { 
          if (ObjWithoutDll[item] <= 2) 
          { 
           m_files.Add(item); 
           OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String)); 
           ObjWithoutDll[item]++; 
          } 
          else 
          { 
           OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String)); 
           ObjWithoutDll.Remove(item); 
          } 
         } 
         else 
         { 
          ObjWithoutDll.Add(item, 0); 
          m_files.Add(item); 
          OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String)); 
         } 
        } 
        else 
        { 
         if (compiler.DllExisting) 
         { 
          ObjWithoutDll.Remove(item); 
         } 
         OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String)); 
        } 

       } 

       Monitor.Enter(lockingObj); 
       CompiledModules++; 
       if (CompiledModules % 300 == 0) 
       { 
        Thread.Sleep(60000); 
       } 
       CompilingModules--; 
       if (CompilingModules <= 0 && m_files.Count <= 0) 
       { 

        try 
        { 
         Process prReschk = new Process(); 
         FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd"); 
         if (!batch.Exists) 
         { 
          Assembly _assembly = Assembly.GetExecutingAssembly(); 
          StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd")); 
         } 

         if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe")) 
         { 
          File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe"); 
         } 

         prReschk.StartInfo.FileName = @"cmd.exe"; 
         prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir; 
         prReschk.StartInfo.CreateNoWindow = true; 
         prReschk.StartInfo.UseShellExecute = false; 
         prReschk.Start(); 
         prReschk.Close(); 
         prReschk.Dispose(); 
        } 
        catch 
        { 
        } 

        OnQueueFinished(new EventArgs()); 
       } 
      } 
      Monitor.Exit(lockingObj); 
     } 

Voici le codesnippet de la classe CobolCompiler:

public void Compile (fichier string) {

 file = file.ToLower(); 

     Process prCompile = new Process(); 
     Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\"); 

     try 
     { 
      // First clean up the folder 
      CleanUpFolder(true, file); 

      // First set lock and copy all sources 
      Monitor.Enter(lockingObj); 
      if (filesToCopy == null) 
      { 
       CopySource(Dir.FullName); 
      } 
      Monitor.Exit(lockingObj); 

      FileInfo batch = new FileInfo(@"batches\compile.cmd"); 
      if (!batch.Exists) 
      { 
       Assembly _assembly = Assembly.GetExecutingAssembly(); 
       StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd")); 
       _textStreamReader.Dispose(); 
      } 

      prCompile.StartInfo.FileName = @"cmd.exe"; 
      prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\")); 
      prCompile.StartInfo.CreateNoWindow = true; 
      prCompile.StartInfo.UseShellExecute = false; 
      prCompile.StartInfo.RedirectStandardOutput = true; 
      prCompile.StartInfo.RedirectStandardError = true; 
      prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1); 
      prCompile.EnableRaisingEvents = true; 
      prCompile.OutputDataReceived += prCompile_OutputDataReceived; 
      prCompile.ErrorDataReceived += prCompile_OutputDataReceived; 
      prCompile.Start(); 
      prCompile.BeginErrorReadLine(); 
      prCompile.BeginOutputReadLine(); 
      prCompile.WaitForExit(); 
      prCompile.Close(); 
      prCompile.Dispose(); 

      CleanUpFolder(false, file); 

      if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe")) 
      { 
       dllExisting = true; 
       linkFailure = false; 
      } 
      else 
      { 
       if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj")) 
       { 
        linkFailure = true; 
       } 
       dllExisting = false; 
      } 



     } 
     catch (ThreadAbortException) 
     { 
      if (prCompile != null) 
      { 
       // On Error kill process 
       prCompile.Kill(); 
       prCompile.Dispose(); 
      } 
     } 
     catch (Win32Exception) 
     { 
     } 
     catch (Exception) 
     { 
      dllExisting = false; 
     } 

     while (true) 
     { 
      try 
      { 
       if (Directory.Exists(Dir.FullName)) 
       { 
        Directory.Delete(Dir.FullName, true); 
        break; 
       } 
       else 
       { 
        break; 
       } 
      } 
      catch 
      { 
      } 
     } 


    } 
private void CopySource(string Destination) 
{ 
    filesToCopy = new StringCollection(); 
    foreach (string strFile in Directory.GetFiles(c.WorkingDir)) 
    { 
     string tmpStrFile = strFile.ToLower(); 

     foreach (string Extension in c.Extensions) 
     { 
      if (tmpStrFile.Contains(Extension)) 
      { 
       filesToCopy.Add(tmpStrFile); 
      } 
     } 
    } 

    if (filesToCopy.Count > 0) 
    { 
     foreach (string strFile in filesToCopy) 
     { 
      File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\"))); 
     } 
    } 
} 

private void CleanUpFolder(bool PreCleanup, string Filename) 
{ 
    //Copy all files from compilationfolder to working directory 
    if (!PreCleanup) 
    { 
     foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*")) 
     { 
      FileInfo fileToMove = new FileInfo(strFile); 

      if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf(".")))) 
      { 
       File.Copy(strFile, c.WorkingDir + fileToMove.Name, true); 
      } 
     } 
    } 

    //Delete useless files 
    foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*")) 
    { 
     bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1)); 
     if (PreCleanup) 
     { 
      // Only delete files, which are not won't be compiled 
      if(!foundExt) 
      { 
       File.Delete(filename); 
      } 
     } 
     else 
     { 
      if (!Config.Instance.SaveLspFile && filename.Contains(".lsp")) 
      { 
       File.Delete(filename); 
      } 

      if (!Config.Instance.SaveLstFile && filename.Contains(".lst")) 
      { 
       File.Delete(filename); 
      } 
     } 
    } 
} 

public void Dispose() 
{ 
    Dispose(true); 
    GC.SuppressFinalize(this); 
} 

protected virtual void Dispose(bool disposing) 
{ 
    if (!disposed) 
    { 
     if (disposing) 
     { 
      Dir = null; 
     } 
     disposed = true; 
    } 
} 

~CobolCompiler() 
{ 
    Dispose (false); 
} 

Je viens essayé avec dormir deux secondes après chaque processus de compilation. Mais cela ne change rien.

En cours de compilation, la CPU est à 100%. L'application collecte 270 Mo de RAM. Au début, c'est seulement 35 Mo. N'ayez pas peur, je dois copier toutes les sources dans les dossiers temporaires, car le compilateur ne peut pas compiler plusieurs fichiers en même temps dans le même répertoire de travail.

Éditer: J'ai déjà résolu le problème de pas de threads, mais ayant encore des éléments.

Dans ProcessSingle, j'ajoute l'élément que j'ai essayé de recompiler, lorsqu'il n'était pas lié à une DLL.

J'ai donc commencé avec 14000 éléments et j'ai ajouté des éléments (s'ils n'ont pas réussi à lier) à ce concurrentBag pendant le traitement de Parallel.ForEach. Je termine donc 14 000 passages de ForEach et j'ai des modules xxx qui doivent être compilés à nouveau.:-(

Je n'ai pas vu ça. La course de prReschk sans WaitForExit est destiné. En raison de la vérification Ressources pour plus de 14000 articles prend beaucoup de temps et ne doit pas obstruer une nouvelle compilation.

Mais la le problème de moins de threads à la fin du ConcurrentBag existe toujours :(Mais il ne s'agit que d'avis, quand il s'agit d'une grande quantité de cycles.)

+0

Est-ce que 'ProcessSingle()' lance une exception que vous ne gérez pas et annule la totalité de l'exécution? –

+0

Le ComplurrentBag est-il complètement rempli avant le Parallel.ForEach()? Si oui alors nous une liste normale <> '. Si non, cela pourrait être la mauvaise façon de le traiter. –

+0

Aucune exception, toutes ajoutées avant de démarrer ProcessSingle() ... – Demigod

Répondre

2

La méthode Parallel.ForEach utilisera le .Net ThreadPool pour allouer un thread. Le nombre de threads qui seront exécutés en parallèle sera géré par le ThreadPool en fonction de la charge des processeurs du système.Vous avez donc peut-être spécifié MaxDegreeOfParallelism mais ce n'est que le maximum, le T hreadPool peut décider d'allouer moins de threads que ce maximum. Selon la preuve que vous avez donnée dans votre question, il me semble que le processus de compilation utilise les ressources du système et ne nettoie pas par la suite. Cela expliquerait pourquoi les 140 compilations se soldent par une diminution progressive du nombre de threads alloués - le ThreadPool n'alloue pas de nouveaux threads car il pense que le CPU est lourdement chargé.

Je regarderais de plus près comment le processus de compilation est terminé. La méthode ProcessSingle retourne-t-elle avant la fin de la compilation? Y a-t-il une fuite de mémoire dans le processus de compilation?

A titre d'expérience, je serais intéressé de savoir si elle se comportait différemment si vous avez ajouté la ligne suivante après avoir appelé ProcessSingle:

System.Threading.Thread.Sleep(2000); 

se met en pause le fil pendant deux secondes avant de passer le contrôle de retour à la ThreadPool pour allouer la tâche suivante. Si cela améliore le comportement de votre application, cela suggère fortement que ma théorie est correcte.

+0

J'ai essayé .. Pas de changement :(Tout d'abord tout va bien ... mais à la fin, il minimise le nombre de threads ... Quand je compile environ 14000+ il ne restera plus qu'un fil sur le dernier 160 et non fil après 140 à gauche ...:/Et quand je compile 140 modules, ça enlèvera les threads sur les 6 derniers modules ... vraiment bizarre ... – Demigod

+0

Demigod, merci de poster le reste du code, ça explique plus clairement ta situation. Je n'ai pas d'environnement de compilation COBOL disponible pour moi donc je ne peux pas tester votre code, mais en le lisant, j'ai remarqué deux problèmes qui peuvent expliquer le comportement: – Kev

+0

1. prReschk.Start(); - Cela devrait être suivi d'un appel WaitForExit. Je revérifier tous vos autres appels Process.Start pour vous assurer que vous attendez leur sortie. 2. Vous semblez avoir un mécanisme confus pour lire le contenu du ConcurrentBag. La méthode Parallel.ForEach parcourt la collection et transmet chaque membre à l'expression Lambda. Mais dans ProcessSingle vous commencez par prendre un autre membre de ConcurrentBag avec 'm_files.TryTake (out item)'. Enlevez ce TryTake, cela vous causera certainement quelques problèmes, cela peut expliquer le problème que vous avez. – Kev

0

Si CopySource lance alors vous avez un verrou non libéré lockingObj et aucune autre progression ne peut être effectuée. utilisez lock (lockingObj) qui utilise un bloc finally pour libérer le verrou.

Questions connexes