Créer un pipeline de travail, la meilleure façon que je sais comment créer un pipeline qui utilise les deux parties du code qui doit être unique filetés et des pièces qui doivent être multi-thread est d'utiliser TPL Dataflow
public static class Example
{
private class Dto
{
public Dto(string filePath, byte[] data)
{
FilePath = filePath;
Data = data;
}
public string FilePath { get; }
public byte[] Data { get; }
}
public static async Task ProcessFiles(string path)
{
var getFilesBlock = new TransformBlock<string, Dto>(filePath => new Dto(filePath, File.ReadAllBytes(filePath))); //Only lets one thread do this at a time.
var hashFilesBlock = new TransformBlock<Dto, Dto>(dto => HashFile(dto),
new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = Environment.ProcessorCount, //We can multi-thread this part.
BoundedCapacity = 50}); //Only allow 50 byte[]'s to be waiting in the queue. It will unblock getFilesBlock once there is room.
var writeToDatabaseBlock = new ActionBlock<Dto>(WriteToDatabase,
new ExecutionDataflowBlockOptions {BoundedCapacity = 50});//MaxDegreeOfParallelism defaults to 1 so we don't need to specifiy it.
//Link the blocks together.
getFilesBlock.LinkTo(hashFilesBlock, new DataflowLinkOptions {PropagateCompletion = true});
hashFilesBlock.LinkTo(writeToDatabaseBlock, new DataflowLinkOptions {PropagateCompletion = true});
//Queue the work for the first block.
foreach (var filePath in Directory.EnumerateFiles(path))
{
await getFilesBlock.SendAsync(filePath).ConfigureAwait(false);
}
//Tell the first block we are done adding files.
getFilesBlock.Complete();
//Wait for the last block to finish processing its last item.
await writeToDatabaseBlock.Completion.ConfigureAwait(false);
}
private static Dto HashFile(Dto dto)
{
using (var md5 = System.Security.Cryptography.MD5.Create())
{
return new Dto(dto.FilePath, md5.ComputeHash(dto.Data));
}
}
private static async Task WriteToDatabase(Dto arg)
{
//Write to the database here.
}
}
Cela crée un pipeline avec 3 segments.
Un fil simple qui lit les fichiers du disque dur en mémoire et les enregistre sous la forme byte[]
. Un second qui peut utiliser jusqu'à Enviorement.ProcessorCount
threads pour hacher les fichiers, il ne permettra que 50 éléments d'être assis dans sa file d'attente entrante, lorsque le premier bloc essaie de l'ajouter cessera de traiter de nouveaux éléments jusqu'au prochain bloc est prêt à accepter de nouveaux articles.
Et un troisième qui est monothread et ajoute les données à la base de données, il n'autorise que 50 éléments dans sa file d'attente entrante à la fois.
En raison des deux 50 limites, il y aura au plus 100 byte[]
dans la mémoire (50 dans hashFilesBlock
file d'attente, 50 dans la file d'attente writeToDatabaseBlock
, les articles en cours de traitement pris en compte pour le
limite BoundedCapacity
. Update: pour le plaisir, j'ai écrit une version qui rapporte aussi des progrès, elle n'a pas encore été testée et utilise les fonctionnalités C# 7.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static class Example
{
private class Dto
{
public Dto(string filePath, byte[] data)
{
FilePath = filePath;
Data = data;
}
public string FilePath { get; }
public byte[] Data { get; }
}
public static async Task ProcessFiles(string path, IProgress<ProgressReport> progress)
{
int totalFilesFound = 0;
int totalFilesRead = 0;
int totalFilesHashed = 0;
int totalFilesUploaded = 0;
DateTime lastReported = DateTime.UtcNow;
void ReportProgress()
{
if (DateTime.UtcNow - lastReported < TimeSpan.FromSeconds(1)) //Try to fire only once a second, but this code is not perfect so you may get a few rapid fire.
{
return;
}
lastReported = DateTime.UtcNow;
var report = new ProgressReport(totalFilesFound, totalFilesRead, totalFilesHashed, totalFilesUploaded);
progress.Report(report);
}
var getFilesBlock = new TransformBlock<string, Dto>(filePath =>
{
var dto = new Dto(filePath, File.ReadAllBytes(filePath));
totalFilesRead++; //safe because single threaded.
return dto;
});
var hashFilesBlock = new TransformBlock<Dto, Dto>(inDto =>
{
using (var md5 = System.Security.Cryptography.MD5.Create())
{
var outDto = new Dto(inDto.FilePath, md5.ComputeHash(inDto.Data));
Interlocked.Increment(ref totalFilesHashed); //Need the interlocked due to multithreaded.
ReportProgress();
return outDto;
}
},
new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = Environment.ProcessorCount, BoundedCapacity = 50});
var writeToDatabaseBlock = new ActionBlock<Dto>(arg =>
{
//Write to database here.
totalFilesUploaded++;
ReportProgress();
},
new ExecutionDataflowBlockOptions {BoundedCapacity = 50});
getFilesBlock.LinkTo(hashFilesBlock, new DataflowLinkOptions {PropagateCompletion = true});
hashFilesBlock.LinkTo(writeToDatabaseBlock, new DataflowLinkOptions {PropagateCompletion = true});
foreach (var filePath in Directory.EnumerateFiles(path))
{
await getFilesBlock.SendAsync(filePath).ConfigureAwait(false);
totalFilesFound++;
ReportProgress();
}
getFilesBlock.Complete();
await writeToDatabaseBlock.Completion.ConfigureAwait(false);
ReportProgress();
}
}
public class ProgressReport
{
public ProgressReport(int totalFilesFound, int totalFilesRead, int totalFilesHashed, int totalFilesUploaded)
{
TotalFilesFound = totalFilesFound;
TotalFilesRead = totalFilesRead;
TotalFilesHashed = totalFilesHashed;
TotalFilesUploaded = totalFilesUploaded;
}
public int TotalFilesFound { get; }
public int TotalFilesRead{ get; }
public int TotalFilesHashed{ get; }
public int TotalFilesUploaded{ get; }
}
'1094353ms avec environ de 60 mille files' .... c'est un style bizarre chiffre significatif ;-) – Stefan
Quand vous pensez comme ça, il est vraiment pas très important, mais étant donné une grande partie des fichiers don Même pas 200 Ko de taille, je pense qu'il y a une certaine optimisation à faire. –
L'IO peut être un obstacle. Il est peut-être préférable de mesurer d'abord la charge d'E/S pour déterminer s'il s'agit d'un goulot d'étranglement. Cela dit, cela semble (pour moi) une opération typique 1 fois. Êtes-vous sûr de vouloir l'optimiser aussi loin? – Stefan