2011-03-29 3 views
2

J'essaie d'utiliser zeroMQ comme un moyen d'implémenter un système de messagerie entre plusieurs threads. J'ai essayé le code ci-dessous mais ça ne marche pas; dans le spécifique, l'appel à zmq_recv dans chaque thread n'attend pas/bloque tout message à exécuter.communication inter-thread utilisant des messages ZeroMQ

Pouvez-vous m'aider avec ce morceau de code?

J'utilise Linux OS et gcc

Cordialement

AFG

static void * 
    worker_routine (void *context) { 
     // Socket to talk to dispatcher 
     void *receiver = zmq_socket (context, ZMQ_REP); 
     zmq_connect (receiver, "inproc://workers"); 
     while (1) { 

      zmq_msg_t request; 
      zmq_msg_init(&request); 
      zmq_recv(receiver, &request, 0); 
      printf ("Received request\n"); 
      // Do some 'work' 
      usleep (1000); 
      // Send reply back to client 
      zmq_send (receiver, &request, 0); 
     } 
     zmq_close (receiver); 
     return NULL; 
    } 

    int main (void) { 

    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
    } 
+0

Je lisais à nouveau le guide ZeroMQ. Est-ce que quelqu'un sait si je dois créer un zmq_device comme QUEUE à mes fins? J'ai également remarqué qu'il y a des exemples utilisant "ipc" comme protocole .. J'ai toujours supposé que pour MT je DOIS utiliser "inproc" .. quelqu'un sait si cela peut avoir un impact? –

Répondre

3

Vous êtes arrêter la prise et ZeroMQ juste après que vous créez les fils. Ils n'ont probablement pas le temps d'atteindre un état bloquant, et s'ils le faisaient, ils échoueraient dès que vous détruiriez le contexte zmq. De l'zmq_term man page:

terminaison de contexte est effectuée dans les étapes suivantes:

Toutes les opérations de blocage actuellement en cours sur les sockets ouverts dans le contexte doivent retourner immédiatement avec un code d'erreur de ETERM.

+0

Je pourrais probablement besoin d'ajouter également essayer d'ajouter un peu de "sommeil"; de toute façon je ne sais pas si le comportement que je suis attendu est rempli avec cette configuration .. voir mes commentaires ci-dessous les questions. PS. Merci de votre aide! –

6

Les deux sockets sont REP. Ce que vous voulez, c'est REQ + REP.

0

Tout d'abord, comme @sustrik a noté que vous devez utiliser REQ et REP, le thread principal et les threads de travail ne peuvent pas être REP.

Deuxièmement, vous devez fournir une sorte de boucle de blocage dans votre thread principal:

int main (int argc, char **argv) 
{ 
    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    while (TRUE) 
    { 
        // worker thread connected asking for work 
        zmq_msg_t request; 
        zmq_msg_init (&request); 
     zmq_recv (clients, &request, 0); 
     zmq_msg_close (&request); 

     // do whatever you need to do with the clients' request here 

     // send work to clients 
     zmq_msg_t reply; 
     zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL); 
     zmq_send (clients, &reply, 0); 
     zmq_msg_close (&reply); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
} 
Questions connexes