2017-09-08 3 views
0

J'utilise RabbitMQ pour implémenter RPC dans Node.js.RabbitMQ RPC utilisant Node.js

Je suis le tutoriel, j'affirme le même nom de file d'attente pour chaque client comme « rpc_client » afin d'être haute autorité, voici le client.js d'appeler la fonction dans le serveur:

const amqp = require('amqplib'); 

async function client(){ 
    let args = process.argv.slice(2); 
    let corr = generateUuid(); 
    let num = parseInt(args[0]); 
    try { 
     let conn = await amqp.connect('amqp://127.0.0.1'); 
     let ch = await conn.createChannel(); 
     let q = await ch.assertQueue('rpc_client'); 

     console.log(' [x] Requesting fib(%d)', num); 
     console.log(q.queue) 

     await ch.consume(q.queue,msg=>{ 
      if (msg.properties.correlationId == corr) { 
       console.log(' [.] Got %s', msg.content.toString()); 
       ch.ack(msg) 
       setTimeout(function() { conn.close(); process.exit(0) }, 5500); 
      } 
     },{noAck:false}); 

     ch.sendToQueue('rpc_server', 
     new Buffer(num.toString()), 
     { correlationId: corr, replyTo: q.queue }); 

    } catch(err){ 
     console.error(err); 
    } 
} 

function generateUuid() { 
    return Math.random().toString() + 
      Math.random().toString() + 
      Math.random().toString(); 
} 

client(); 

mais je l'ai trouvé quand je cours plusieurs clients à la fois, ce dernier client n'exécutera pas le rappel de consommation (obtenir la réponse du serveur et l'imprimera) jusqu'à ce que la connexion de l'ancien client ait été fermée. par exemple. Le deuxième client obtiendra la réponse et l'imprimera et sa connexion sera fermée dans 5500 ms, et le second devra attendre la fermeture du premier et il imprimera la réponse puis attendra encore 5500 ms pour se fermer.

Alors, pourquoi cela devrait-il être? puisque la file d'attente peut consommer les deux massages en deux travailleurs parallèlement.

et est ici le server.js:

async function server(){ 
    try { 
     let conn = await amqp.connect('amqp://127.0.0.1'); 
     let ch = await conn.createChannel(); 
     process.once('SIGINT',()=>conn.close()); 

     let q = await ch.assertQueue('rpc_server'); 
     ch.prefetch(1); 
     console.log(' [x] Awaiting RPC requests'); 

     await ch.consume(q.queue,msg=>{ 
      let n = parseInt(msg.content.toString()); 

      console.log(" [.] fib(%d)", n); 

      let r = fibonacci(n); 

      ch.sendToQueue(msg.properties.replyTo, 
      new Buffer(r.toString()), 
      {correlationId: msg.properties.correlationId}); 

      ch.ack(msg); 
     },{noAck:false}); 


    } catch(err) { 
     console.error(err); 
    } 
} 

server(); 

function fibonacci (n , ac1 = 1 , ac2 = 1) { 
    if(n <= 1) {return ac2}; 

    return fibonacci (n - 1, ac2, ac1 + ac2); 
} 

Répondre

0

Si le nom de la file d'attente est la même, il est la même file d'attente. Pas deux files d'attente avec les mêmes noms. Dans ce cas, il est logique que les clients reçoivent des messages de manière séquentielle et non parallèle. Essayez donc avec différents noms de file d'attente et cela devrait fonctionner.

+0

mais pourquoi les serveurs avec la même file d'attente peuvent fonctionner parallèlement? se référer à http://www.rabbitmq.com/tutorials/tutorial-two-javascript.html – laoqiren

+0

Plusieurs consommateurs s'abonnent à la même file d'attente. Dans ce cas, ils recevront des messages alternatifs. Comme les bizarres iront o premier consommateur, et même ceux à la seconde. Plus de détails de ce modèle sont disponibles ici: https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html – tbking

+0

c'est vrai, donc la question de la mienne, il y a deux clients, ils appellent la fonction dans le serveur, Le serveur produira alors deux massages dans la file d'attente 'rpc_client', afin que les deux clients puissent les traiter parallèlement. Mais ils ne l'ont pas fait, pourquoi? – laoqiren