2010-06-29 14 views
5

bonjour je veux threads de collaborer un producteur et un consommateur. le consommateur est plutôt lent, et le producteur est très rapide et travaille en rafales. Par exemple, le consommateur peut traiter un message par 20 secondes, et le producteur peut produire 10 messages en une seconde, mais le fait environ une fois de temps en temps pour que le consommateur puisse le rattraper.C# communication inter-thread

je veux quelque chose comme:

Stream commonStream; 
AutoResetEvent commonLock; 

void Producer() 
{ 
    while (true) 
    { 
    magic.BlockUntilMagicAvalible(); 
    byte[] buffer = magic.Produce(); 
    commonStream.Write(buffer); 
    commonLock.Set(); 
    } 
} 

void Consumer() 
{ 
    while(true) 
    { 
    commonLock.WaitOne(); 
    MagicalObject o = binarySerializer.Deserialize(commonStream); 
    DoSomething(o); 
    } 
} 
+0

Quelle version de .Net utilisez-vous, il y a de nouvelles choses à v4 pour exactement ce genre de choses –

+0

.Net 3.5; Les commentaires doivent contenir au moins 15 caractères. –

Répondre

11

Si vous avez .Net 4.0 ou plus vous pouvez le faire de cette façon en utilisant un BlockingCollection

int maxBufferCap = 500; 
BlockingCollection<MagicalObject> Collection 
          = new BlockingCollection<MagicalObject>(maxBufferCap); 
void Producer() 
{ 
    while (magic.HasMoreMagic) 
    { 
     this.Collection.Add(magic.ProduceMagic()); 
    } 
    this.Collection.CompleteAdding(); 
} 

void Consumer() 
{ 
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable()) 
    { 
     DoSomthing(magicalObject); 
    } 
} 

La ligne foreach dormira s'il n'y a pas de données dans la mémoire tampon, il se réveillera automatiquement automatiquement quand quelque chose est ajouté à la collection. La raison pour laquelle je mets le tampon max est que si votre producteur est beaucoup plus rapide que le consommateur, vous pouvez finir par consommer beaucoup de mémoire car de plus en plus d'objets sont mis dans la collection. En configurant une taille de tampon maximale lorsque vous créez la collection de blocage lorsque la taille de la mémoire tampon est atteinte, l'appel Add sur le producteur se bloque jusqu'à ce qu'un élément ait été supprimé de la collection par le consommateur. Un autre bonus de la classe BlockingCollection est qu'il peut avoir autant de producteurs et de consommateurs que vous le souhaitez, il n'a pas besoin d'être un ratio 1: 1. Si DoSomthing prend en charge ce que vous pourriez avoir une boucle foreach par cœur de l'ordinateur (ou même utiliser Parallel.ForEach et utilisez la consommation dénombrable comme source de données)

void ConsumersInParalell() 
{ 
    //This assumes the method signature of DoSomthing is one of the following: 
    // Action<MagicalObject> 
    // Action<MagicalObject, ParallelLoopState> 
    // Action<MagicalObject, ParallelLoopState, long> 
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing); 
} 
+2

Notez que le TPL a été back-porté à .NET 3.5: http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in- net-35-using.html –

0

Vous pouvez obtenir ce que vous voulez en utilisant une file d'attente et de la minuterie. Le producteur ajoute des valeurs à la file d'attente et lance le temporisateur consommateur. L'événement écoulé du temporisateur consommateur (qui se trouve sur un thread Threadpool) arrête le temporisateur et boucle dans la file jusqu'à ce qu'il soit vide, puis disparaît (pas d'interrogation inutile). Le producteur peut ajouter à la file d'attente pendant que le consommateur est toujours en cours d'exécution.

System.Timers.Timer consumerTimer; 
Queue<byte[]> queue = new Queue<byte[]>(); 

void Producer() 
{ 
    consumerTimer = new System.Timers.Timer(1000); 
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed); 
    while (true) 
    { 
     magic.BlockUntilMagicAvailable(); 
     lock (queue) 
     { 
      queue.Enqueue(magic.Produce()); 
      if (!consumerTimer.Enabled) 
      { 
       consumerTimer.Start(); 
      } 
     } 
    } 
} 

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
{ 
    while (true) 
    { 
     consumerTimer.Stop(); 
     lock (queue) 
     { 
      if (queue.Count > 0) 
      { 
       DoSomething(queue.Dequeue()); 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 
} 
+0

votre extrait n'est pas thread sûr ... et le mien implique aucune interrogation –

+0

Ce qui n'est pas thread sûr à ce sujet? Et il n'interroge pas - le minuteur est un one-shot qui n'est activé que lorsque le producteur ajoute à la file d'attente. –

-1

J'utilise Mutex. L'idée est que les deux fonctionnent dans des threads différents. Le thread Consumer est démarré verrouillé par un mutex, où il restera indéfiniment jusqu'à sa libération par le producteur. Il traitera ensuite les données en parallèle en laissant le producteur continuer. Le consommateur se verrouille à nouveau lorsqu'il est terminé.

(code de démarrage du fil, et d'autres bits de qualité ont été omis par souci de concision.)

// Pre-create mutex owned by Producer thread, then start Consumer thread. 
Mutex mutex = new Mutex(true); 
Queue<T> queue = new Queue<T>(); 

void Producer_AddData(T data) 
{ 
    lock (queue) { 
    queue.Enqueue(GetData()); 
    } 

    // Release mutex to start thread: 
    mutex.ReleaseMutex(); 
    mutex.WaitOne(); 
} 

void Consumer() 
{ 
    while(true) 
    { 
    // Wait indefinitely on mutex 
    mutex.WaitOne(); 
    mutex.ReleaseMutex(); 

    T data; 
    lock (queue) { 
     data = queue.Dequeue(); 
    } 
    DoSomething(data); 
    } 

}

Cela ralentit le producteur par un très quelques millisecondes tout est attend pour le consommateur de se réveiller et relâchez le mutex. Si tu peux vivre avec ça.

+0

L'utilisation d'un 'BlockingCollection' est beaucoup mieux. Tout d'abord, c'est beaucoup plus évident quand c'est correct que d'utiliser des mutex, et contrairement à votre modèle, le producteur et le consommateur peuvent travailler en parallèle; vous vous assurez que votre code * est * produisant * ou * consommant, mais jamais les deux. Il ne s'adapte pas bien à plus d'un producteur ou plus d'un consommateur, contrairement à une collection de blocage où cela est trivial. Vous pourriez utiliser une approche basée sur le mutex plus complexe qui avait les avantages d'une collection de blocage, mais ce serait un * lot * de travail, et serait beaucoup moins lisible/maintenable. – Servy

+0

BlockingColletion n'est pas disponible pour moi car je ne peux pas exécuter 4.5. Si je pouvais alors, ce serait probablement la bonne solution. Cependant, ce code fonctionne en parallèle. Je n'ai peut-être pas été clair, mais les deux sont dans des fils différents. Je l'utilise pour exécuter de lourdes requêtes SQL sur un thread, tout en collectant des données sur un autre thread et cela fonctionne bien pour moi. – Ben

+0

BlockingCollection a été ajouté dans 4.0, pas 4.5. – Servy