2010-01-22 8 views
1

Fondamentalement, j'ai multi-threads qui ajoute des données dans une file d'attente via SQLite. J'ai un autre thread qui les tire et les traite un à la fois (trop de ressources pour faire plusieurs à la fois). Le fil de traitement fait ceci:Comment gérer cette condition de course de file d'attente (db)?

  1. données DB tirer de
  2. foreach {} proccess
  3. si le nombre == 0 {Thread.Suspend()} (par s'éveiller thread.resume())
  4. répéter

mon thread de travail fait:

  1. données
  2. Valide
  3. insère dans DB
  4. appel Queue.Poke (QueueName)

Quand je touche du bout, si le fil est suspendu, je .resume() il. Ce qui m'inquiète, c'est que si le fil de processus voit count==0, mon ouvrier insère et pique alors mon processus continue dans le si et dort. Il ne réalisera pas qu'il y a quelque chose de nouveau dans la DB.

Comment dois-je écrire ceci de manière à ne pas avoir de condition de concurrence?

Répondre

1

fil de traitement:

  1. event.Reset
  2. données de traction de DB
  3. foreach {} proccess
  4. si count == 0 alors event.Wait
  5. répétition

Et l'autre fil:

  1. données
  2. Valide les insertions dans DB
  3. event.Set()

Vous aurez sillages supplémentaires (suite à une file d'attente vide, rien à traiter , retourne dormir) mais tu n'auras pas manqué d'inserts.

+0

Je ne comprends pas event.Reset, quel event.Wait peut être (sonne comme si je n'utilise pas la classe de thread) et event.set hmm, si cela ne course pas alors event.wait ne bloque pas sauf si l'événement est réinitialisé et effacer ... intéressant. Quelle classe est-ce que j'utilise? –

+0

événement est une instance ManualResetEvent deux thread ont une référence à. La partie clé est l'ordre: traitement d'abord Reset, puis dequeue et la première mise en file d'attente du worker puis Set.Cela garantit qu'il n'y a pas de fenêtre lorsque le traitement passe en veille et qu'il manque un élément mis en file d'attente. –

+0

Excellente réponse. –

0

Je pense que c'est peut-être la structure dont vous avez besoin.

private readonly Queue<object> _Queue = new Queue<object>(); 
private readonly object _Lock = new object(); 

void FillQueue() 
{ 
    while (true) 
    { 
     var dbData = new { Found = true, Data = new object() }; 
     if (dbData.Found) 
     { 
      lock (_Lock) 
      { 
       _Queue.Enqueue(dbData.Data); 
      } 
     } 

     // If you have multiple threads filling the queue you 
     // probably want to throttle it a little bit so the thread 
     // processing the queue won't be throttled. 
     // If 1ms is too long consider using 
     // TimeSpan.FromTicks(1000). 

     Thread.Sleep(1); 
    }  
} 

void ProcessQueue() 
{ 
    object data = null; 

    while (true) 
    { 
     lock (_Lock) 
     { 
      data = _Queue.Count > 0 ? _Queue.Dequeue() : null; 
     } 

     if (data == null) 
     { 
      Thread.Sleep(1); 
     } 
     else 
     { 
      // Proccess 
     }   
    }   
} 
+0

Ce qu'il fait ici, c'est utiliser la file d'attente comme mécanisme de notification au lieu de Thread.resume() que l'OP envisageait. Tant qu'il y a "quelque chose dans la file d'attente", la boucle est exécutée pour vérifier la base de données. –

Questions connexes