Publish and subscribe in RabbitMQ Cluster

Publish and subscribe in RabbitMQ Cluster

Sidharth Patnaik's photo
Sidharth Patnaik

Published on May 31, 2021

4 min read

Subscribe to my newsletter and never miss my upcoming articles

Well, there are many resources that tell how to publish and subscribe on channels using which u can send and receive messages, but there are unlikely very few which tell how this entire thing works when it comes to clustering. This post is all related to a clustered RabbitMQ and a NodeJs application.

In the case of Node, we have amqplib, a wrapper for RabbitMQ. Unfortunately, it wouldn’t accept multiple connection strings. To have our application work with a clustered RabbitMQ, we will have to use ampq-connection-manager which accepts more than one connection string.

Well enough talk, what’s an exchange by the way?

In total, there are four types of exchange available.

  • Direct Exchange
  • Headers Exchange
  • Topic Exchange
  • Fanout Exchange

A detailed explanation of exchanges can be found in the official documentation of RabbitMQ. For the simplicity of this tutorial, we will consider the Fanout exchange.

So what are we planning to do for this walkthrough?

  • Setup AMQP Connection manager
  • Working with AMQP Connection manager and creating a channel
  • Creating an exchange, binding a queue with exchange
  • Publishing and consuming the message.

Please note, we will be using these dummy connection strings for RabbitMQ

amqp://user:awesomePass@node01
amqp://user:awesomePass@node02
amqp://user:awesomePass@node03

Setting Up AMPQ Connection manager

you can either visit the npmjs or run the below command directly in your terminal, to install the AMQP Connection Manager

npm i amqp-connection-manager --save

Working with AMQP Connection manager and creating a channel

Now, let’s create a connection string, and upon connection, let’s create a channel, using which we can perform the rest of our operations. Refer to the below gist for establishing a connection.

const amqp = require('amqp-connection-manager');
require('dotenv').config();

//process.env.mqConns is an array of connection string
let conn = amqp.connect(JSON.parse(process.env.mqConns));
conn.on('connect', () => console.log('Connected!'));
conn.on('disconnect', err => console.log('Disconnected.', err));

Now, we will be creating a channel from the connection instance,

conn.createChannel({
    json: true,
    setup: function (channel) {
        workOnChannel(channel);
    }
});

let workOnChannel = async (channel) => {
  //The channel here is a regular amqplib `ConfirmChannel`.
  //We will perform our operations here.
}

Creating an exchange, binding a queue with exchange

Now we will be adding the rest of the code, in function workOnChannel. We will be asserting an exchange to the channel initially where we will be mentioning the type of exchange, i.e. fanout in our case. After that, we will be asserting a queue to the channel where the exchange will be pushing messages. The following code block will help us achieve the same.

let workOnChannel = async (channel) => {
  //The channel here is a regular amqplib `ConfirmChannel`.
  let exchange = "myFavExchangeName";
  await channel.assertExchange(exchange, 'fanout', {
      durable: false
  });

  let { queue } = await channel.assertQueue('', {
      exclusive: true,
      autoDelete: true
  });

  console.log("[*] Waiting for messages in %s. To exit press CTRL+C", queue);
  channel.bindQueue(queue, exchange, '');

  // ...
}

Publishing and consuming the message

We are almost done, just a step away from publishing and consuming our first-ever message, in the exchange which we have just created. Our publisher would be publishing messages to the exchange, and from there, since we are using Fanout for our example, the messages from the exchange would be sent to all the queues which are bind to the same exchange. Below is a very small code snippet that will perform publishing and consuming the messages

let workOnChannel = async (channel) => {
  // ...

  //Consumer listening to the queue, which was ealier created and bind with exchange
  let { consumerTag } = await channel.consume(queue, function (msg) {
        if (msg.content) {
            let dataReceive = JSON.parse(msg.content.toString());
            console.log(`[✓] Incomming message ${JSON.stringify(dataReceive)}`);
        }
    }, {
        noAck: true
    });

  //Publisher pushing messages to the 'exchange' which was earlier defined 
  //Dummy data block
  let data = {
        message: "This is a sample message",
        action: "nothing"
    }
  channel.publish(exchange, '', Buffer.from(JSON.stringify(data)));
  // channel.sendToQueue(sessionId, Buffer.from(JSON.stringify(data)));
  console.log(`[!] Data Pushed to ${exchange}`)
}

And yes, there you go. We have successfully able to use exchange in RabbitMQ Cluster with the NodeJs client. Got any questions, drop them in the comments section. Let me know your views in the comments.

 
Share this