2017-08-15 6 views
-3

j'ai un DataTable avec des lignes 1k et j'utilise Parallel.ForEach pour parcourir les lignes les itère méthode suivants à travers les lignes, chaque ligne considérée comme MailMessage et initialise les paramètres de MailMessage puis l'enregistre sur le disque comme * .eml fichier plus tard, une file d'attente SMTP reprendra les fichiers EML et les envoyerParallel.ForEach l'accès à une ligne dans un DataTable plus d'une fois

public static bool GenerateValidEmlFiles(DataTable valids) 
    { 
     wroteToDb = false; 
     // init. cmp id from the table 
     CmpId = int.Parse(valids.Rows[0][CampaignId].ToString()); 

     Parallel.ForEach(valids.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 1 }, (currentRow) => 
     {     
      CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); 
      CancellationToken token = tokenSource.Token; 

      // create new task "thread" for every row in the DataTable 
      Task task = Task.Factory.StartNew(() => 
      { 
       try 
       { 


        if (token.IsCancellationRequested) 
         token.ThrowIfCancellationRequested(); 
        string id = currentRow[CampaignRecipientId].ToString(); 
        writeEvent(int.Parse(id), "Recipient row is being processed"); 

        // MailBee.Mime MailMessage 
        using (MailMessage msg = new MailMessage()) 
        { 
         // init. parameters 
         msg.From.AsString = currentRow[EmailFrom].ToString(); 
         msg.To.AsString = currentRow[EmailTo].ToString(); 
         msg.ReplyTo.AsString = currentRow[EmailReplyTo].ToString(); 
         msg.Subject = "=?UTF-8?B?" + Convert.ToBase64String(Encoding.UTF8.GetBytes(currentRow[EmailSubject].ToString())) + "?="; 
         msg.BodyHtmlText = HTMLTags.Replace("ReplaceBody", currentRow[EmailBody].ToString()); 
         //assing X-TWC id number into the header. 
         msg.Headers.Add(ConfigurationManager.AppSettings["TWCHeader"].ToString(), id, false); 
         //if there is an attachment add it to the Message 
         if ((currentRow[EmailAttachment] as object != null) && !string.IsNullOrEmpty(currentRow[EmailAttachmentName].ToString())) 
         { 
          byte[] filedata = (byte[])currentRow[EmailAttachment]; 
          msg.Attachments.Add(filedata, currentRow[EmailAttachmentName].ToString(), "", "", null, NewAttachmentOptions.None, MailTransferEncoding.None); 
         } 

         msg.EncodeAllHeaders(Encoding.UTF8, MailBee.Mime.HeaderEncodingOptions.Base64); 
         //save message as *.Eml to be sent by the SMTP Queue       
         msg.SaveMessage(@"E:\WEBS\Ready\" + id + "_" + msg.To.AsString + ".eml"); 
         writeEvent(int.Parse(id), "EML file written to Disk"); 

         //adding ID to a list to write the whole list back to the DB in a single DB call. 
         if (!IDs.Contains(id)) 
          IDs.Add(id); 

        }       

       } 
       catch (Exception ex) 
       { 
        using (StreamWriter sw = new StreamWriter(AppDomain.CurrentDomain.BaseDirectory + "\\INNER.txt", true)) 
        { 
         sw.WriteLine(DateTime.Now.ToString() + ": " + ex.Source.ToString().Trim() + ", " + ex.Message.ToString().Trim() + ex.StackTrace); 
         sw.Flush(); 
         sw.Close(); 
        } 
       } 

      }, token); 
      task.Wait(); 
     }); 

     if (!wroteToDb) 
     { 
      WriteEmlEvents(); 
      //set lock flag to true 
      wroteToDb = true; 
      return true; 
     } 
     return false; 
    } 

le problème est que les premières lignes sont en cours traité deux fois ou accédé par le FroEach deux fois! J'ai essayé de vérifier les doublons dans le DataTable et je n'y ai trouvé aucun doublon. Recommandez-vous d'utiliser DataReader au lieu de DataTable?

J'ai essayé la réponse suggérée, et j'ai eu cette exception: Pour votre information, il ne se produit que lorsque je retire MaxDegreeOfParallelism et quand je l'ai mis à 1, il fonctionne très bien

8/15/2017 3:37:05 PM: MailBee.NET.45, IOException occurred. InnerException message follows: The process cannot access the file 'E:\WEBS\TerranovaQueue Files\Ready\[email protected]' because it is being used by another process. at a.n.b(String A_0, Byte[] A_1, Int32 A_2, Int32 A_3, Byte[] A_4) 
    at MailBee.Mime.MailMessage.SaveMessage(String filename) 

System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: Collection was modified; enumeration operation might not execute. 
    at System.Data.RBTree`1.RBTreeEnumerator.MoveNext() 
    at System.Linq.Enumerable.<CastIterator>d__94`1.MoveNext() 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk_Buffered(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerator.GrabNextChunk(Int32 requestedChunkSize) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionEnumerator_Abstract`2.MoveNext() 
    at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1() 
    at System.Threading.Tasks.Task.InnerInvoke() 
    at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
    at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object) 
    --- End of inner exception stack trace --- 
    at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions) 
    at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken) 
    at System.Threading.Tasks.Task.Wait() 
    at System.Threading.Tasks.Parallel.PartitionerForEachWorker[TSource,TLocal](Partitioner`1 source, ParallelOptions parallelOptions, Action`1 simpleBody, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally) 
    at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally) 
    at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source, Action`1 body) 
    at EmailValidatorLibrary.EmailGenerator.GenerateValidEmlFiles(DataTable valids) in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\EmailValidatorLibrary\EmailGenerator.cs:line 50 
    at TerranovaService.TerranovaService.StartProcess() in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\TerranovaService\TerranovaService.cs:line 133 
---> (Inner Exception #0) System.InvalidOperationException: Collection was modified; enumeration operation might not execute. 
    at System.Data.RBTree`1.RBTreeEnumerator.MoveNext() 
    at System.Linq.Enumerable.<CastIterator>d__94`1.MoveNext() 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk_Buffered(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerator.GrabNextChunk(Int32 requestedChunkSize) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionEnumerator_Abstract`2.MoveNext() 
    at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1() 
    at System.Threading.Tasks.Task.InnerInvoke() 
    at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
    at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object)<--- 

---> (Inner Exception #1) System.IO.IOException: The process cannot access the file 'E:\WEBS\TWC Mail Services\INNER.txt' because it is being used by another process. 
    at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath) 
    at System.IO.FileStream.Init(String path, FileMode mode, FileAccess access, Int32 rights, Boolean useRights, FileShare share, Int32 bufferSize, FileOptions options, SECURITY_ATTRIBUTES secAttrs, String msgPath, Boolean bFromProxy, Boolean useLongPath, Boolean checkHost) 
    at System.IO.FileStream..ctor(String path, FileMode mode, FileAccess access, FileShare share, Int32 bufferSize, FileOptions options, String msgPath, Boolean bFromProxy, Boolean useLongPath, Boolean checkHost) 
    at System.IO.StreamWriter.CreateFile(String path, Boolean append, Boolean checkHost) 
    at System.IO.StreamWriter..ctor(String path, Boolean append, Encoding encoding, Int32 bufferSize, Boolean checkHost) 
    at System.IO.StreamWriter..ctor(String path, Boolean append) 
    at EmailValidatorLibrary.EmailGenerator.<>c.<GenerateValidEmlFiles>b__19_0(DataRow currentRow) in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\EmailValidatorLibrary\EmailGenerator.cs:line 107 
    at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1() 
    at System.Threading.Tasks.Task.InnerInvoke() 
    at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
    at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object)<--- 
mscorlib 

======== =============================================== MISE À JOUR Le foreach accède toujours au même rangée plus d'une fois! J'ai ajouté un enregistreur avec timestmap pour voir ce qui se passe réellement log

+0

Rien n'indique que quelque chose ne va pas. Quel est le code 'Doing work', nous avons besoin de plus d'informations. –

+1

Vous avez omis la partie importante. Quel code accède réellement au DataTable et fonctionne-t-il? De plus, si vous utilisez 'Parallel.ForEach', vous n'avez probablement pas besoin de vous occuper de tout cela avec' Task'. –

+0

@ JakubDąbek J'ai mis à jour le code dans la question –

Répondre

0

Ce que vous rencontrez est un problème classique avec un closure. La variable currentRow participe à une fermeture, ce qui signifie (entre autres choses) que sa valeur peut changer avant que le Task ait une chance de s'exécuter. Vous pouvez éliminer le problème en supprimant le Task. Exécutez simplement le code directement dans l'expression lambda que vous passez à Parallel.ForEach (et augmentez son degré maximum de parallélisme). Il est déjà en cours d'exécution sur un thread distinct (en raison de la Parallel.ForEach) donc il n'y a absolument aucun intérêt à l'emballer dans un Task aussi.

public static bool GenerateValidEmlFiles(DataTable valids) 
{ 
    wroteToDb = false; 
    // init. cmp id from the table 
    CmpId = int.Parse(valids.Rows[0][CampaignId].ToString()); 

    Parallel.ForEach(valids.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 10 }, (currentRow) => 
    {     
     string id = currentRow[CampaignRecipientId].ToString(); 
     writeEvent(int.Parse(id), "Recipient row is being processed"); 

     // MailBee.Mime MailMessage 
     using (MailMessage msg = new MailMessage()) 
     { 
      //etc. etc. 

P.S. Assurez-vous que IDs est un type qui prend en charge la simultanéité (par exemple, un ConcurrentBag<int>) et assurez-vous que writeEvent est thread-safe.

+0

' currentRow' est un paramètre pour un lambda, ce n'est pas dans une fermeture –

+0

Il est à la fois. C'est un paramètre à un lambda par rapport à 'ForEach'. C'est dans une fermeture par rapport à la 'Task'. Voilà pourquoi je dis se débarrasser de la «tâche» .. –

+1

Le fait est que le code OP est pratiquement synchrone car il attend de toute façon la tâche –