2009-12-07 3 views
3

I possède une file d'attente dans laquelle je peux en file d'attente différents threads, je peux assurer deux choses:Meilleure approche pour la file d'attente synchronisé de fil

  1. appel sont traitées une par une.
  2. demande sont traitées dans l'ordre d'arriver

Deuxième point est important. Sinon, une section critique simple suffirait. J'ai différents groupes de requêtes et seulement dans un seul groupe ces points doivent être remplis. Les demandes provenant de différents groupes peuvent être simultanées.

Il ressemble à ceci:

FTaskQueue.Enqueu('MyGroup'); 
try 
    Do Something (running in context of some thread) 
finally 
    FTaskQueue.Dequeu('MyGroup'); 
end; 

EDIT: Je l'ai supprimé la mise en œuvre effective, car il cache le problème que je veux résoudre

J'ai besoin parce que j'ai un serveur web basé Indy qui accepte les requêtes http. D'abord je trouve une session correspondant à la demande. Ensuite, la demande (code) est exécutée pour cette session. Je peux obtenir plusieurs demandes pour la même session (lire Je peux obtenir de nouvelles demandes pendant que la première est en cours de traitement) et elles doivent s'exécuter une par une dans le bon ordre d'arrivée. Je recherche donc une file d'attente de synchronisation générique qui peut être utilisée dans de telles situations afin que les requêtes puissent être mises en file d'attente. Je n'ai aucun contrôle sur les threads et chaque requête peut être exécutée dans un thread différent. Quelle est la meilleure approche (ususal) pour ce genre de problème? Le problème est que Enqueue et Dequeue doivent être des opeations atomiques afin que l'ordre correct soit preserverd. Ma mise en œuvre actuelle a un goulot d'étranglement substantiel, mais cela fonctionne.

EDIT: Bellow est le problème des opérations atomiques Enqueue/DEQUEUE

Vous Wold normaly faire quelque chose comme ceci:

procedure Enqueue; 
begin 
    EnterCriticalSection(FCritSec); 
    try 
    DoEnqueue; 
    finally 
    LeaveCriticalSection(FCritSec); 
    end; 

    BlockTheCurrentThread; // here the thread blocks itself 
end; 

procedure Dequeue; 
begin 
    EnterCriticalSection(FCritSec); 
    try 
    DoDequeue; 
    UnblockTheNextThread; // here the thread unblocks another thread 
    finally 
    LeaveCriticalSection(FCritSec); 
    end; 
end; 

Maintenant, le problème ici est que ce n'est pas atomique. Si vous avez déjà un thread dans la file d'attente et qu'un autre vient et appelle Enqueue, il peut arriver que le second thread quitte la section critique et essaie de se bloquer. Maintenant, le planificateur de thread reprendra le premier thread, qui tentera de débloquer le prochain (second) thread. Mais le deuxième thread n'est pas encore bloqué, donc rien ne se passe. Maintenant, le deuxième thread continue et se bloque, mais ce n'est pas correct car il ne sera pas débloqué. Si le blocage est à l'intérieur de la section critique, que la section critique n'est jamais relâchée et que nous avons un blocage.

+0

Si vous avez plusieurs threads qui se suspendent et se reprennent pour s'assurer qu'un seul s'exécute à un moment donné - alors vous devriez vous rendre compte que tout votre design est faux. Dans ce cas, une requête ne doit pas * être * égale à un thread. – mghie

+0

Je soupçonne que l'approche n'est pas correcte oui. Mais disons que j'ai un serveur Indy. Je reçois des requêtes http dans un gestionnaire d'événements de serveur Web. Je trouve une session à travers une table de hachage puis j'exécute la requête = exécuter du code pour cette session. Maintenant, si pour une même session je reçois plusieurs requêtes, elles doivent être exécutées une par une. Et chacun sera dans un contexte de fil différent. Je n'ai aucun contrôle sur les threads. Si vous connaissez une meilleure approche, veuillez l'écrire comme réponse. – Runner

+0

Je vois, mais ces contraintes ne sont pas du tout visibles à partir de votre question. Je pense que vous obtiendriez de bien meilleures réponses si vous supprimez les détails de votre propre solution, énoncez le problème uniquement et demandez des approches pour le résoudre. – mghie

Répondre

9

Une autre approche:

Que chaque fil de demande ont un événement de réinitialisation manuelle qui est initialement hors service. Le gestionnaire de files d'attente est un objet simple qui conserve une liste de tels événements sans risque pour les threads. Les méthodes Enqueue() et Dequeue() prennent toutes deux l'événement du thread de requête en paramètre.

type 
    TRequestManager = class(TObject) 
    strict private 
    fCritSect: TCriticalSection; 
    fEvents: TList<TEvent>; 
    public 
    constructor Create; 
    destructor Destroy; override; 

    procedure Enqueue(ARequestEvent: TEvent); 
    procedure Dequeue(ARequestEvent: TEvent); 
    end; 

{ TRequestManager } 

constructor TRequestManager.Create; 
begin 
    inherited Create; 
    fCritSect := TCriticalSection.Create; 
    fEvents := TList<TEvent>.Create; 
end; 

destructor TRequestManager.Destroy; 
begin 
    Assert((fEvents = nil) or (fEvents.Count = 0)); 
    FreeAndNil(fEvents); 
    FreeAndNil(fCritSect); 
    inherited; 
end; 

procedure TRequestManager.Dequeue(ARequestEvent: TEvent); 
begin 
    fCritSect.Enter; 
    try 
    Assert(fEvents.Count > 0); 
    Assert(fEvents[0] = ARequestEvent); 
    fEvents.Delete(0); 
    if fEvents.Count > 0 then 
     fEvents[0].SetEvent; 
    finally 
    fCritSect.Release; 
    end; 
end; 

procedure TRequestManager.Enqueue(ARequestEvent: TEvent); 
begin 
    fCritSect.Enter; 
    try 
    Assert(ARequestEvent <> nil); 
    if fEvents.Count = 0 then 
     ARequestEvent.SetEvent 
    else 
     ARequestEvent.ResetEvent; 
    fEvents.Add(ARequestEvent); 
    finally 
    fCritSect.Release; 
    end; 
end; 

Chaque thread de demande appelle Enqueue() sur le gestionnaire de file d'attente et attend ensuite pour son propre événement pour devenir signalé. Ensuite, il traite la demande et appelle Dequeue():

{ TRequestThread } 

type 
    TRequestThread = class(TThread) 
    strict private 
    fEvent: TEvent; 
    fManager: TRequestManager; 
    protected 
    procedure Execute; override; 
    public 
    constructor Create(AManager: TRequestManager); 
    end; 

constructor TRequestThread.Create(AManager: TRequestManager); 
begin 
    Assert(AManager <> nil); 
    inherited Create(TRUE); 
    fEvent := TEvent.Create(nil, TRUE, FALSE, ''); 
    fManager := AManager; 
    Resume; 
end; 

procedure TRequestThread.Execute; 
begin 
    fManager.Enqueue(fEvent); 
    try 
    fEvent.WaitFor(INFINITE); 
    OutputDebugString('Processing request'); 
    Sleep(1000); 
    OutputDebugString('Request processed'); 
    finally 
    fManager.Dequeue(fEvent); 
    end; 
end; 

{ TForm1 } 

procedure TForm1.Button1Click(Sender: TObject); 
var 
    i: integer; 
begin 
    for i := 1 to 10 do 
    TRequestThread.Create(fRequestManager); 
end; 

Le gestionnaire de file d'attente verrouille la liste des événements à la fois dans Enqueue() et à Dequeue(). Si la liste est vide dans Enqueue(), il définit l'événement dans le paramètre, sinon il réinitialise l'événement. Ensuite, il ajoute l'événement à la liste. Ainsi, le premier thread peut continuer avec la requête, tous les autres bloqueront. Dans Dequeue() l'événement est supprimé du haut de la liste et l'événement suivant est défini (s'il y en a). De cette façon, le dernier thread de demande provoquera le déblocage du thread de requête suivant, complètement sans suspendre ou reprendre les threads. Cette solution n'a pas non plus besoin de threads ou de fenêtres supplémentaires, un seul objet événement par thread de requête est tout ce qui est nécessaire.

+0

C'est exactement l'approche que j'ai utilisée au début. La seule différence était que j'ai utilisé Suspend/Resume à la place si les événements, ce qui est inférieur (mon approche est). Mais regardez les problèmes que vous rencontrez (j'ai mis à jour la question). Les opérations doivent être atomiques.C'est pourquoi j'ai introduit un thread supplémentaire afin que toutes les opérations soient issues d'un autre thread et qu'elles soient complètement atomiques sans aucune section critique. Mais alors un bottlenect substantiel est introduit. Si vous pouvez résoudre ce problème, j'accepterai votre réponse car tout le reste est déjà optimal dans votre réponse. – Runner

+0

Maintenant, il y a deux options ici. Soit je ne vois pas quelque chose et la solution est simple, soit j'aboie complètement au mauvais arbre :) – Runner

+0

Ah vous avez posté le code. Oui, je pense que cela devrait fonctionner. Je savais que c'était simple et j'ai raté quelque chose :) Je vais essayer de l'implémenter de cette manière. Si elle retient l'eau, ce dont je suis sûr qu'elle le sera, j'accepterai votre réponse. Je ne pense pas que cela puisse être mieux fait. – Runner

2

Je vais répondre avec les informations supplémentaires de votre commentaire pris en considération.

Si vous avez un certain nombre de threads qui doivent être sérialisés, vous pouvez utiliser le mécanisme de sérialisation que Windows fournit gratuitement. Que chaque file soit un thread avec sa propre fenêtre et une boucle de message standard. Utilisez SendMessage() au lieu de PostThreadMessage(), et Windows se chargera de bloquer les threads expéditeurs jusqu'à ce que le message ait été traité, et de s'assurer que l'ordre d'exécution correct est maintenu. En utilisant un thread avec sa propre fenêtre pour chaque groupe de requêtes, vous vous assurez que plusieurs groupes sont toujours traités simultanément.

Ceci est une solution simple qui ne fonctionnera que si la requête elle-même peut être traitée dans un contexte de threads différent de son origine, ce qui ne devrait pas poser de problème dans de nombreux cas.

+1

Notez qu'il s'agit du même principe que celui utilisé pour sérialiser les appels de méthodes à un objet COM STA. – mghie

+1

Certes, cela pourrait être une bonne solution et oui ce n'est pas un problème d'introduire un nouveau thread, c'est d'écouter les messages. Le seul inconvénient (en dehors de fenêtres fictives) est, de cette façon, j'ai besoin d'un autre pool de threads. J'ai déjà d'excellents threads du serveur Indy. Mais c'est peut-être la seule solution viable. Laisse attendre si quelqu'un a une autre approche – Runner

+0

Notez qu'il n'y a pas de nouveau thread, puisque vous avez déjà 'TTaskQueueThread'. Vous le laissez juste avoir une fenêtre et utilisez le traitement des messages de blocage. – mghie

0

Avez-vous essayé l'objet TThreadList fourni par Delphi?

Il est thread-safe et gère les verrous pour vous. Vous gérez la liste "en dehors" du fil, au sein de votre fil principal.

Lorsque des demandes vous demandent une nouvelle tâche, vous l'ajoutez à la liste. Lorsqu'un thread se termine, avec l'événement OnTerminate, vous pouvez appeler le thread suivant dans la liste.

+0

Oui, je connais TThreadList. Le problème n'était pas la liste des threads sûrs, j'avais couvert cela. Le problème était de savoir comment synchroniser correctement la file d'attente FIFO. Les réponses acceptées de mghie ont résolu cela parfaitement. Mon problème était que j'ai pris la mauvaise approche avec suspendre/reprendre des discussions au lieu de simplement utiliser des événements. – Runner

Questions connexes