2017-09-06 4 views
0

Chaque fois que je publie un nouveau message, je crée une nouvelle connexion. Je veux avoir seulement une connexion et un canal pour tous les appels publich.N'a besoin que d'une connexion avec RbbitMQ pour l'application de noeud, pas pour chaque appel de publication

Lecture à partir du site Web rabbitmq: Certaines applications nécessitent plusieurs connexions à un courtier AMQP. Cependant, il n'est pas souhaitable de garder plusieurs connexions TCP ouvertes en même temps, car cela consomme des ressources système et rend plus difficile la configuration des pare-feu. Les connexions AMQP 0-9-1 sont multiplexées avec des canaux qui peuvent être considérés comme des «connexions légères qui partagent une seule connexion TCP».

Mais comment ??? Voici mon code:

Channel.js

var amqp = require('amqplib/callback_api'); 

var url = process.env.AMQP_URL || 'amqp://guest:[email protected]:5672'; 

module.exports = createQueueChannel; 

function createQueueChannel(queue, cb) { 
    console.log("connecting................"); 
    amqp.connect(url, onceConnected); 

    function onceConnected(err, conn) { 
    if (err) { 
     console.error('Error connecting:', err.stack); 
    } 
    else { 
     console.log('connected'); 
     conn.createChannel(onceChannelCreated); 
    } 

    function onceChannelCreated(err, channel) { 
     if (err) { 
     cb(err); 
     } 
     else { 
     channel.assertQueue(queue, {durable: true}, onceQueueCreated); 
     } 

     function onceQueueCreated(err) { 
     if (err) { 
      cb(err); 
     } 
     else { 
      cb(null, channel, conn); 
     } 
     } 
    } 
    } 

} 

Publish.js

var Channel = require('./channel'); 

var queue = 'queue'; 

Channel(queue, function(err, channel, conn) { 
    if (err) { 
    console.error(err.stack); 
    } 
    else { 
    console.log('channel and queue created'); 
    var work = 'Do some work'; 
    channel.sendToQueue(queue, encode(work), { 
     persistent: true 
    }); 
    // setImmediate(function() { 
    // channel.close(); 
    // conn.close(); 
    // }); 
    } 
}); 


function encode(doc) { 
    return new Buffer(JSON.stringify(doc)); 
} 

Répondre

0

Définissez votre connexion (amqpConn) et le canal de l'éditeur (pubChannel) en dehors de la fonction de publication, et l'utilisation que canal lorsque vous publiez un message.

je vous recommande de jeter un oeil à l'exemple de code complet ici: https://gist.github.com/carlhoerberg/006b01ac17a0a94859ba (https://www.cloudamqp.com/blog/2015-05-19-part2-2-rabbitmq-for-beginners_example-and-sample-code-node-js.html) Où également une file d'attente hors-ligne est utilisé dans le cas où la connexion est en panne pendant un certain temps.

Et lorsque vous êtes connecté, vous démarrez l'éditeur.

function whenConnected() { 
    startPublisher() 
} 


var pubChannel = null; 
var offlinePubQueue = []; 
function startPublisher() { 
    amqpConn.createConfirmChannel(function(err, ch) { 
    if (closeOnErr(err)) return; 
    ch.on("error", function(err) { 
    console.error("[AMQP] channel error", err.message); 
    }); 
    ch.on("close", function() { 
    console.log("[AMQP] channel closed"); 
    }); 

    pubChannel = ch; 
    while (true) { 
    var m = offlinePubQueue.shift(); 
    if (!m) break; 
    publish(m[0], m[1], m[2]); 
    } 
}); 

}

Et la fonction de publication comme:

function publish(exchange, routingKey, content) { 
    try { 
    pubChannel.publish(exchange, routingKey, content, { persistent: true }, 
     function(err, ok) { 
     if (err) { 
      console.error("[AMQP] publish", err); 
      offlinePubQueue.push([exchange, routingKey, content]); 
      pubChannel.connection.close(); 
     } 
     } 
    ); 
    } catch (e) { 
    console.error("[AMQP] publish", e.message); 
    offlinePubQueue.push([exchange, routingKey, content]); 
    } 
}