2010-09-28 5 views
6

Je suis sur le point de mettre en œuvre un thread de travail avec mise en file d'élément de travail, et pendant que je pensais à ce problème, je voulais savoir si je fais la meilleure chose.primitive de synchronisation pour léger file d'attente de thread de travail

Le fil en question devra avoir un fil de données locales (préinitialisés à la construction) et parcourera les éléments de travail jusqu'à ce qu'une condition sera remplie.

pseudocode:

volatile bool run = true; 

int WorkerThread(param) 
{ 
    localclassinstance c1 = new c1(); 
    [other initialization] 

    while(true) { 
     [LOCK] 
     [unqueue work item] 
     [UNLOCK] 
     if([hasWorkItem]) { 
      [process data] 
      [PostMessage with pointer to data] 
     } 
     [Sleep] 

     if(!run) 
      break; 
    } 

    [uninitialize] 
    return 0; 
} 

Je suppose que je vais faire le verrouillage par section critique, que la file d'attente sera std :: vecteur ou std :: file d'attente, mais peut-être il y a une meilleure façon. La partie avec le sommeil ne semble pas trop grande, car il y aura beaucoup de sommeil supplémentaire avec de grandes valeurs de sommeil, ou beaucoup de verrouillage supplémentaire lorsque la valeur de sommeil est faible, et ce n'est certainement pas nécessaire.

Mais je ne peux pas penser à une WaitForSingleObject amicale primitive que je pourrais utiliser au lieu de la section critique, comme il pourrait y avoir deux fils qui font la queue des éléments de travail en même temps. Ainsi, Event, qui semble être le meilleur candidat, peut perdre le deuxième élément de travail si l'événement a déjà été défini, et cela ne garantit pas une exclusion mutuelle.

Peut-être il y a même une meilleure approche avec InterlockedExchange type de fonctions qui conduit à encore moins sérialisation.

P.S .: Je pourrais avoir besoin de prétraiter toute la file d'attente et déposez les éléments de travail obsolètes au cours de la phase de unqueuing.

+0

Si le verrouillage et le déverrouillage ne vous coûtent pas cher, est-ce que le fait de ne pas dormir est un problème? – DumbCoder

+0

La sérialisation de 3 ou 4 threads, la gravure de l'unité centrale de traitement et l'exécution éventuelle de verrous-convoys ne sont probablement pas une très bonne chose. Et si je comprends bien, c'est ce qui arrivera sans Sleep ou WaitForSingleObject. Là encore, peut-être que je me trompe. – Coder

+0

Comment les éléments de travail sont-ils livrés dans la file d'attente? –

Répondre

5

Il existe une multitude de façons de procéder.

Une option consiste à utiliser un sémaphore pour l'attente. Le sémaphore est signalé chaque fois qu'une valeur est poussée dans la file d'attente, de sorte que le thread de travail ne bloque que s'il n'y a aucun élément dans la file d'attente. Cela nécessitera toujours une synchronisation séparée sur la file d'attente elle-même.

Une deuxième option consiste à utiliser un événement de réinitialisation manuelle qui est défini lorsqu'il y a des éléments dans la file d'attente et effacé lorsque la file d'attente est vide. Encore une fois, vous devrez effectuer une synchronisation séparée dans la file d'attente. Une troisième option consiste à créer une fenêtre de message invisible uniquement sur le thread et à utiliser un message spécial WM_USER ou WM_APP pour publier des éléments dans la file d'attente, en associant l'élément au message via un pointeur. Une autre option consiste à utiliser condition variables. Les variables de condition Windows natives ne fonctionnent que si vous ciblez Windows Vista ou Windows 7, mais des variables de condition sont également disponibles pour Windows XP avec Boost ou une implémentation de la bibliothèque de threads C++ 0x. Un exemple de file d'attente utilisant des variables de condition boost est disponible sur mon blog: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

+0

La solution de l'événement semble être la meilleure, malheureusement, il semble que je devrais faire plus d'étude et de refactoring à cause de la COM. CoInitialize semble être incompatible avec WaitForSingleObject. Les variables conditionnelles ont l'air sympa, mais je n'ai pas de dépendance Boost dans ce projet et je dois supporter WinXp – Coder

+0

IIRC, COM nécessite une pompe de message, utilisez plutôt MsgWaitForMultipleObjects et pompez les messages Windows si vous en avez. –

+0

si COM est une exigence, alors vous pourriez tout aussi bien poster des messages de threads, ou utiliser une fenêtre de type MESSAGE_HWND comme destination car le mécanisme de pompage de messages Windows a déjà été instancié sur le thread. –

1

Il existe différentes façons de le faire

Pour un, vous pouvez créer un événement au lieu appelé « run » et utiliser alors que pour détecter lorsque le fil doit se terminer, le thread principal alors des signaux. Au lieu de dormir, vous utiliserez alors WaitForSingleObject avec un timeout, de cette façon vous quitterez directement au lieu d'attendre ms.

Une autre façon est d'accepter des messages dans votre boucle, puis inventer un message défini par l'utilisateur que vous publiez sur le fil

EDIT: en fonction de la situation, il peut aussi être sage d'avoir encore un autre thread qui surveille ce fil à vérifier si elle est morte ou non, cela peut être fait par la file d'attente de messages mentionnée ci-dessus, donc répondre à un certain message dans x ms signifierait que le thread n'a pas été verrouillé.

0

Utilisez un sémaphore au lieu d'un événement.

+0

dans ce cas, je pense qu'un événement suffirait car son booléen dans sa nature. –

1

Je restructure un peu:

WorkItem GetWorkItem() 
{ 
    while(true) 
    { 
     WaitForSingleObject(queue.Ready); 
     { 
      ScopeLock lock(queue.Lock); 
      if(!queue.IsEmpty()) 
      { 
       return queue.GetItem(); 
      } 
     } 
    } 
} 

int WorkerThread(param) 
{ 
    bool done = false; 
    do 
    { 
     WorkItem work = GetWorkItem(); 
     if(work.IsQuitMessage()) 
     { 
      done = true; 
     } 
     else 
     { 
      work.Process(); 
     } 
    } while(!done); 

    return 0; 
} 

Points d'intérêt:

  1. ScopeLock est une classe RAII pour faire usage de section critique plus sûre.
  2. Bloquer sur l'événement jusqu'à ce que le travail soit (éventuellement) prêt - puis verrouiller pendant en essayant pour la déquiler.
  3. N'utilisez pas un indicateur global "IsDone", mettez en file d'attente spécial quitmessage WorkItem s.
+0

> WaitForSingleObject (queue.Ready); Vous attendez un événement? Que faire si le fil 1 planifie un élément, signale l'événement, filme en même temps 2 plannings et éléments et signale également l'événement. Le thread de travail reprend, traite un élément, réinitialise l'événement et retourne dans WaitForSingleObject, avec un élément restant suspendu. Le RAII et quitmessage est une bonne idée. – Coder

+1

Si la mémoire me convient, ce qui suit devrait fonctionner: utiliser la réinitialisation manuelle des événements, et la réinitialiser dans la file d'attente si les éléments de travail tombent à zéro. – snemarch

0

Conserver la signalisation et la synchronisation séparées. Quelque chose le long de ces lignes ...

// in main thread 

HANDLE events[2]; 
events[0] = CreateEvent(...); // for shutdown 
events[1] = CreateEvent(...); // for work to do 

// start thread and pass the events 

// in worker thread 

DWORD ret; 
while (true) 
{ 
    ret = WaitForMultipleObjects(2, events, FALSE, <timeout val or INFINITE>); 

    if shutdown 
     return 
    else if do-work 
     enter crit sec 
     unqueue work 
     leave crit sec 
     etc. 
    else if timeout 
     do something else that has to be done 
} 
2

Le plus rapide verrouillait primitive est habituellement un spin-lock ou spin-sommeil-lock. CRITICAL_SECTION est juste un spin-sleep-lock (espace utilisateur). (En plus de ne pas utiliser de primitives de verrouillage, bien sûr, mais cela signifie utiliser des structures de données sans verrou, et celles-ci sont vraiment difficiles à obtenir correctement.)

Pour éviter le sommeil: regardez à condition-variables. Ils sont conçus pour être utilisés avec un "mutex", et je pense qu'ils sont beaucoup plus faciles à utiliser que les EVENT de Windows.

Boost.Thread a une belle mise en œuvre portable à la fois, rapide espace utilisateur spin-sommeil verrous et variables de condition:

http://www.boost.org/doc/libs/1_44_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

Une file d'attente de travail à l'aide Boost.Thread pourrait ressembler à ceci :

template <class T> 
class Queue : private boost::noncopyable 
{ 
public: 
    void Enqueue(T const& t) 
    { 
     unique_lock lock(m_mutex); 

     // wait until the queue is not full 
     while (m_backingStore.size() >= m_maxSize) 
      m_queueNotFullCondition.wait(lock); // releases the lock temporarily 

     m_backingStore.push_back(t); 
     m_queueNotEmptyCondition.notify_all(); // notify waiters that the queue is not empty 
    } 

    T DequeueOrBlock() 
    { 
     unique_lock lock(m_mutex); 

     // wait until the queue is not empty 
     while (m_backingStore.empty()) 
      m_queueNotEmptyCondition.wait(lock); // releases the lock temporarily 

     T t = m_backingStore.front(); 
     m_backingStore.pop_front(); 

     m_queueNotFullCondition.notify_all(); // notify waiters that the queue is not full 

     return t; 
    } 

private: 
    typedef boost::recursive_mutex mutex; 
    typedef boost::unique_lock<boost::recursive_mutex> unique_lock; 

    size_t const m_maxSize; 

    mutex mutable m_mutex; 
    boost::condition_variable_any m_queueNotEmptyCondition; 
    boost::condition_variable_any m_queueNotFullCondition; 

    std::deque<T> m_backingStore; 
}; 
+0

Connaissez-vous un tutoriel expliquant les variables de condition? Je n'ai pas encore vu d'exemple de variable d'état démontrant comment ils sont meilleurs qu'un sémaphore Windows ou un objet événement. Oh, bien sûr, il y a un objet de section critique associé, mais la façon d'utiliser ceci pour en réaliser est totalement évidente pour les non-initiés. –

0

Étant donné que cette question est étiquetée, fenêtres réponse Ill ainsi:

Ne pas créer 1 thread de travail. Vos tâches de threads de travail sont probablement indépendantes, vous pouvez donc traiter plusieurs tâches à la fois. Si oui:

  • Dans votre thread principal, appelez CreateIOCompletionPort pour créer un objet de port d'achèvement io.
  • Créez un pool de threads de travail. Le nombre que vous devez créer dépend du nombre de travaux que vous souhaitez effectuer en parallèle. Un multiple du nombre de cœurs de CPU est un bon début.
  • Chaque fois qu'un travail arrive en appel PostQueuedCompletionStatus() en passant un pointeur sur la structure du travail en tant que struct lpOverlapped.
  • Chaque thread de travail appelle GetQueuedCompletionItem() - récupère l'élément de travail à partir du pointeur lpOverlapped et effectue le travail avant de retourner à GetQueuedCompletionStatus.

Cela semble lourd, mais io ports de terminaison sont mis en oeuvre en mode noyau et représentent une file d'attente qui peut être désérialisé dans l'un des threads de travail associées à la file d'attente (à savoir en attente sur un appel à GetQueuedCompletionStatus). Le port de complétion io sait combien des threads qui traitent un élément utilisent en fait un CPU par opposition à un appel d'E/S et libère plus de threads de travail du pool pour s'assurer que le compte de concurrence est respecté. Donc, ce n'est pas léger, mais c'est très très efficace ... Le port de complétion peut être associé à des poignées de pipe et de socket par exemple et peut déquiler les résultats des opérations asynchrones sur ces poignées. Les conceptions de ports de complétion peuvent évoluer vers la gestion de dizaines de milliers de connexions socket sur un seul serveur - mais sur le bureau, elles constituent un moyen très pratique de réduire le nombre de tâches sur les 2 ou 4 cœurs courants sur les ordinateurs de bureau.

+1

Le plus gros problème des conceptions basées sur les ports de complétion io écrites en C++ moderne est que tous les threads d'un processus partagent un accès à un seul tas, et que la STL a tendance à beaucoup déborder le tas. En conséquence, vous pouvez trouver des performances limitées par chacun des threads de travail en compétition pour les verrous de tas plus que toute autre chose. –

+0

Malheureusement, je dois utiliser des composants COM, et ThreadPools ne sont pas très amicaux avec eux. – Coder

+0

sans savoir comment vous utilisez les objets COM rend cela difficile à juger. Si les objets COM font partie du travail - oui - problème. Si les objets com sont utilisés pour faire le travail, alors chaque thread de travail est son propre appartement, vous avez juste besoin d'un magasin TLS pour obtenir les objets de travail relavant au thread en cours. –

3

Il est possible de partager une ressource entre des threads sans utiliser de verrous de blocage, si votre scénario répond à certaines exigences.

Vous avez besoin d'une primitive d'échange de pointeur atomique, telle que InterlockedExchange de Win32. La plupart des architectures de processeurs fournissent une sorte d'échange atomique, et c'est généralement beaucoup moins cher que d'acquérir un verrou formel.

Vous pouvez stocker votre file d'éléments de travail dans une variable de pointeur accessible à tous les threads qui seront intéressés par celle-ci. (var globale ou champ d'un objet auquel tous les threads ont accès)

Ce scénario suppose que les threads impliqués ont toujours quelque chose à faire, et ne regardent que de temps en temps la ressource partagée. Si vous voulez un design où les threads bloquent l'attente d'une entrée, utilisez un objet d'événement de blocage traditionnel. Avant toute chose, créez votre objet de liste de tâches ou d'éléments de travail et affectez-le à la variable de pointeur partagé. Maintenant, lorsque les producteurs veulent placer quelque chose dans la file d'attente, ils "acquièrent" un accès exclusif à l'objet file en remplaçant un null par la variable de pointeur partagée à l'aide d'InterlockedExchange. Si le résultat du swap renvoie une valeur nulle, alors quelqu'un d'autre est en train de modifier l'objet file d'attente. Sleep (0) pour libérer le reste de la tranche de temps de votre thread, puis boucle pour réessayer le swap jusqu'à ce qu'il retourne non-null. Même si vous finissez en boucle quelques fois, c'est beaucoup. beaucoup plus rapide que de faire un appel au noyau pour acquérir un objet mutex. Les appels du noyau nécessitent des centaines de cycles d'horloge pour passer en mode noyau.

Lorsque vous avez réussi à obtenir le pointeur, apportez vos modifications à la file d'attente, puis réinsérez le pointeur de file d'attente dans le pointeur partagé. Lorsque vous consommez des éléments de la file d'attente, vous faites la même chose: échangez une valeur null dans le pointeur partagé et bouclez jusqu'à ce que vous obteniez un résultat non nul, opérez sur l'objet dans la variable locale, puis l'échangez dans le pointeur partagé var.

Cette technique est une combinaison de permutation atomique et de boucles de courte durée. Cela fonctionne bien dans les scénarios où les threads impliqués ne sont pas bloqués et les collisions sont rares. La plupart du temps, le swap vous donnera un accès exclusif à l'objet partagé au premier essai, et tant que la longueur de l'objet file est exclusivement réservée à un thread, aucun thread ne doit plus boucler qu'un plusieurs fois avant que l'objet queue ne soit à nouveau disponible.Si vous pensez qu'il y a beaucoup de conflits entre les threads dans votre scénario, ou si vous voulez que les threads passent la plupart de leur temps à attendre l'arrivée du travail, vous pouvez être mieux servi par un objet de synchronisation mutex formel.

Questions connexes