2010-10-14 4 views
1

Mon application est constituée du processus principal et de deux threads, tous s'exécutant simultanément et utilisant trois FIFO:Comment synchroniser trois threads?

Les fifo-q sont Qmain, Q1 et Q2. En interne, les files d'attente utilisent chacune un compteur qui est incrémenté lorsqu'un élément est placé dans la file d'attente et décrémenté lorsqu'un élément est «extrait» de la file d'attente.

Le traitement comporte deux fils,
QMaster, qui obtiennent de Q1 et Q2, et mis en Qmain,
Monitor, qui a mis en Q2,
et le processus principal, qui obtenir de Qmain et mis en Q1 . La boucle QMaster-thread vérifie consécutivement les compteurs de Q1 et Q2 et si des éléments sont dans les q, ils les récupèrent et les placent dans Qmain.

La boucle Monitor-thread obtient des données de sources externes, les regroupe et les place dans Q2.

Le processus principal de l'application exécute également une boucle vérifiant le compte de Qmain, et si des éléments, obtenir un élément de Qmain à chaque itération de la boucle et le traiter plus loin. Au cours de ce traitement, place parfois un élément dans Q1 pour le traiter plus tard (lorsqu'il est obtenu à partir de Qmain à son tour).

Le problème:
J'ai implémenté tout comme décrit ci-dessus, et cela fonctionne pour un temps aléatoire (court) puis se bloque. J'ai réussi à identifier la source de l'écrasement de se produire dans l'incrément/décrément du compte d'un fifo-q (il peut arriver dans l'un d'entre eux).

Ce que j'ai essayé:
En utilisant trois de mutex: QMAIN_LOCK, Q1_LOCK et Q2_LOCK, que je verrouille chaque fois que get/put opération se fait sur une fifo-q pertinente. Résultat: l'application ne démarre pas, elle se bloque.

Le processus principal doit continuer à s'exécuter tout le temps, ne doit pas être bloqué sur un 'read' (named-pipes échoue, socketpair échoue).

Un conseil?
Je pense que je ne suis pas en train d'implémenter correctement le mutex, comment cela devrait-il être fait?
(Tous les commentaires sur l'amélioration de la conception ci-dessus également les bienvenus)

[modifier] ci-dessous sont les processus et la fifo-q-modèle:
Où & comment, dans ce que je devrais placer pour éviter les problèmes du mutex décrits ci-dessus ?

main-process: 
... 
start thread QMaster 
start thread Monitor 
... 
while (!quit) 
{ 
    ... 
    if (Qmain.count() > 0) 
    { 
     X = Qmain.get(); 
     process(X) 
      delete X; 
    } 
    ... 
    //at some random time: 
    Q2.put(Y); 
    ... 
} 

Monitor: 
{ 
    while (1) 
    { 
     //obtain & package data 
     Q2.put(data) 
    } 
} 

QMaster: 
{ 
    while(1) 
    { 
     if (Q1.count() > 0) 
      Qmain.put(Q1.get()); 

     if (Q2.count() > 0) 
      Qmain.put(Q2.get()); 
    } 
} 

fifo_q: 
template < class X* > class fifo_q 
{ 
    struct item 
    { 
     X* data; 
     item *next; 
     item() { data=NULL; next=NULL; } 
    } 
    item *head, *tail; 
    int count; 
public: 
    fifo_q() { head=tail=NULL; count=0; } 
    ~fifo_q() { clear(); /*deletes all items*/ } 
    void put(X x) { item i=new item(); (... adds to tail...); count++; } 
    X* get() { X *d = h.data; (...deletes head ...); count--; return d; } 
    clear() {...} 
}; 
+0

Comment utilisez-vous les mutex n'enveloppent que l'incrément et décrément avez-vous pensé à utiliser un secion critique – rerun

+0

J'ai vu "section critique" mentionné quand je googlé - s'il vous plaît mettre en place une réponse sur la façon dont vous le feriez mettre en œuvre pour ce qui précède, ou au moins pointer moi au bon matériel d'étude. – slashmais

+0

Le système se bloque-t-il sans les mutex? Quand il se bloque, certaines files d'attente sont-elles pleines? – mouviciel

Répondre

1

Un exemple de la façon dont j'adapter la conception et verrouiller l'accès de file d'attente le chemin posix. Remarque que j'enveloppez le mutex à utiliser RAII ou utiliser boost-threading et que j'utilisepasSTL :: deque ou stl :: file d'attente file d'attente, mais en restant le plus près possible de votre code:

main-process: 
... 
start thread Monitor 
... 
while (!quit) 
{ 
    ... 
    if (Qmain.count() > 0) 
    { 
     X = Qmain.get(); 
     process(X) 
      delete X; 
    } 
    ... 
    //at some random time: 
    QMain.put(Y); 
    ... 
} 

Monitor: 
{ 
    while (1) 
    { 
     //obtain & package data 
     QMain.put(data) 
    } 
} 

fifo_q: 
template < class X* > class fifo_q 
{ 
    struct item 
    { 
     X* data; 
     item *next; 
     item() { data=NULL; next=NULL; } 
    } 
    item *head, *tail; 
    int count; 
    pthread_mutex_t m; 
public: 
    fifo_q() { head=tail=NULL; count=0; } 
    ~fifo_q() { clear(); /*deletes all items*/ } 
    void put(X x) 
    { 
     pthread_mutex_lock(&m); 
     item i=new item(); 
     (... adds to tail...); 
     count++; 
     pthread_mutex_unlock(&m); 
    } 
    X* get() 
    { 
     pthread_mutex_lock(&m); 
     X *d = h.data; 
     (...deletes head ...); 
     count--; 
     pthread_mutex_unlock(&m); 
     return d; 
    } 
    clear() {...} 
}; 

Remarque aussi que le mutex doit encore être initialisé comme dans l'exemple here et que le nombre() devrait également utiliser le mutex

+0

Mis en œuvre à votre façon et tout est heureux - merci. J'ai réécrit un gros morceau, vous savez comment ça se passe: voyez quelque chose qui serait plus agréable, alors ça, alors ça, effet boule de neige; mais c'est fait maintenant :) Maintenant je vais faire une pause (c'est une soirée sympa en SA) – slashmais

0

Acquérir plusieurs verrous simultanément? C'est généralement quelque chose que vous voulez éviter. Si vous devez le faire, assurez-vous de toujours acquérir les verrous dans le même ordre dans chaque thread (ceci est plus restrictif pour votre concurrence et pourquoi vous voulez généralement l'éviter).

Autres conseils de concurrence: Acquérir le verrou avant de lire les tailles de la file d'attente? Si vous utilisez un mutex pour protéger les files d'attente, l'implémentation de votre file d'attente n'est pas simultanée et vous devez probablement acquérir le verrou avant de lire la taille de la file d'attente.

+0

Je fais un get-put dans une seule instruction (par exemple: Qmain.put (Q2.get())) et place les deux verrous avant et déverrouille après. – slashmais

+1

@slashmais: cela ne doit pas être fait: verrouillez toujours 1 file d'attente pour obtenir un message (localement), déverrouillez cette file puis verrouillez l'autre pour mettre le message. Si l'autre thread fait la même chose dans l'ordre inverse, alors vous aurez un blocage à un moment donné. – stefaanv

+0

Comment le faire sinon alors? – slashmais

1

Vous ne devez pas verrouiller le second mutex lorsque vous en avez déjà verrouillé un. Comme la question est marquée C++, je suggère d'implémenter le verrouillage dans la logique get/add de la classe de file d'attente (par exemple en utilisant des verrous boost) ou d'écrire un wrapper si votre file d'attente n'est pas une classe.

Cela vous permet de simplifier la logique de verrouillage.

En ce qui concerne les sources que vous avez ajoutés: vérification de la taille de la file d'attente et suivants PUT/GET doit être fait en une seule opération par ailleurs un autre thread peut modifier la file d'attente entre

0

1 problème peut se produire en raison de cette règle « La main- processus doit continuer à fonctionner tout le temps, ne doit pas être bloqué sur une 'lecture' ". Comment l'avez-vous mis en œuvre? Quelle est la différence entre 'get' et 'read'?

Problème semble être dans votre implémentation, pas dans la logique. Et comme vous l'avez indiqué, vous ne devriez pas être dans un verrou mort parce que vous n'achetez pas un autre verrou que ce soit dans une serrure.

+0

Utilisation d'une fifo-queue homebrew (template class) avec deux méthodes: get & put qui ne bloque pas. – slashmais

1

Utilisez le débogueur. Lorsque votre solution avec mutexes se bloque regardez ce que les threads font et vous aurez une bonne idée de la cause du problème.

Quelle est votre plateforme? Sous Unix/Linux, vous pouvez utiliser les files d'attente de messages POSIX (vous pouvez également utiliser les files d'attente de messages System V, les sockets, les FIFO, ...) de sorte que vous n'avez pas besoin de mutex.

En savoir plus sur les variables de condition. Par votre description, il semble que votre thread Qmaster est en train de boucler, en brûlant votre CPU.

une de vos réponses suggèrent que vous faites quelque chose comme:

Q2_mutex.lock() 
Qmain_mutex.lock() 
Qmain.put(Q2.get()) 
Qmain_mutex.unlock() 
Q2_mutex.unlock() 

mais vous voudrez probablement le faire comme:

Q2_mutex.lock() 
X = Q2.get() 
Q2_mutex.unlock() 

Qmain_mutex.lock() 
Qmain.put(X) 
Qmain_mutex.unlock() 

et comme Grégoire suggéré ci-dessus, encapsulent la logique dans le get /mettre.

EDIT: Maintenant que vous avez posté votre code je me demande, est-ce un exercice d'apprentissage? Parce que je vois que vous codez votre propre classe de file d'attente FIFO au lieu d'utiliser la norme std :: queue C++. Je suppose que vous avez très bien testé votre classe et que le problème n'est pas là.

En outre, je ne comprends pas pourquoi vous avez besoin de trois files d'attente différentes. Il semble que la file d'attente Qmain suffise, et vous n'aurez pas besoin du thread Qmaster qui est en effet occupé. A propos de l'encapsulation, vous pouvez créer une classe synch_fifo_q qui encapsule la classe fifo_q. Ajoutez une variable mutex privée, puis les méthodes publiques (put, get, clear, count, ...) devraient être comme put (X) {lock m_mutex; m_fifo_q.put (X); déverrouiller m_mutex; }

question: que se passerait-il si vous avez plus d'un lecteur dans la file d'attente? Est-il garanti qu'après un "count()> 0" vous pouvez faire un "get()" et obtenir un élément?

+0

J'ai essayé les deux manières dans vos exemples. Je suis sur Linux: avez-vous des liens pour POSIX et autres? Quand vous dites encapsuler dans get/put, voulez-vous dire placer des verrous autour des parties inc et dec? – slashmais

+0

@slashmais: Utilisez le google, Luke! Les parties intéressantes sont POSIX (IEEE 1003) 1b et 1c. 1b sont les extensions en temps réel, y compris les files d'attente de messages et les E/S asynchrones, et 1c sont les extensions de threads. Par encapsulation, je veux dire que la méthode "put" devrait probablement être comme put (X) {lock queue mutex; mettre X dans la file d'attente; déverrouiller la file d'attente mutex; } – src

1

j'ai écrit une simple application ci-dessous:

#include <queue> 
#include <windows.h> 
#include <process.h> 
using namespace std; 

queue<int> QMain, Q1, Q2; 
CRITICAL_SECTION csMain, cs1, cs2; 

unsigned __stdcall TMaster(void*) 
{ 
    while(1) 
    { 
     if(Q1.size() > 0) 
     { 
      ::EnterCriticalSection(&cs1); 
      ::EnterCriticalSection(&csMain); 
      int i1 = Q1.front(); 
      Q1.pop(); 
      //use i1; 
      i1 = 2 * i1; 
      //end use; 
      QMain.push(i1); 
      ::LeaveCriticalSection(&csMain); 
      ::LeaveCriticalSection(&cs1); 
     } 
     if(Q2.size() > 0) 
     { 
      ::EnterCriticalSection(&cs2); 
      ::EnterCriticalSection(&csMain); 
      int i1 = Q2.front(); 
      Q2.pop(); 
      //use i1; 
      i1 = 3 * i1; 
      //end use; 
      QMain.push(i1); 
      ::LeaveCriticalSection(&csMain); 
      ::LeaveCriticalSection(&cs2); 
     } 
    } 
    return 0; 
} 

unsigned __stdcall TMoniter(void*) 
{ 
    while(1) 
    { 
     int irand = ::rand(); 
     if (irand % 6 >= 3) 
     { 
      ::EnterCriticalSection(&cs2); 
      Q2.push(irand % 6); 
      ::LeaveCriticalSection(&cs2); 
     } 
    } 
    return 0; 
} 

unsigned __stdcall TMain(void) 
{ 
    while(1) 
    { 
     if (QMain.size() > 0) 
     { 
      ::EnterCriticalSection(&cs1); 
      ::EnterCriticalSection(&csMain); 
      int i = QMain.front(); 
      QMain.pop(); 
      i = 4 * i; 
      Q1.push(i); 
      ::LeaveCriticalSection(&csMain); 
      ::LeaveCriticalSection(&cs1); 
     } 
    } 
    return 0; 
} 

int _tmain(int argc, _TCHAR* argv[]) 
{ 
    ::InitializeCriticalSection(&cs1); 
    ::InitializeCriticalSection(&cs2); 
    ::InitializeCriticalSection(&csMain); 
    unsigned threadID; 
    ::_beginthreadex(NULL, 0, &TMaster, NULL, 0, &threadID); 
    ::_beginthreadex(NULL, 0, &TMoniter, NULL, 0, &threadID); 
    TMain(); 

    return 0; 
} 
+0

J'ai copié ceci comme exemple lorsque j'ai atteint ce niveau de problème. Merci. (merci outis) – slashmais

Questions connexes