Dealing with "Oops! Something Went Wrong" in RabbitMQ: How to Make Failed Messages Behave

Exploring the RabbitMQ Delayed Message Exchange Plugin: Enhancing Message Delay and Dead Letter Queue Strategies

Introduction

So, you've come across this nifty tool called RabbitMQ that lets different parts of your software have a chat. You're probably pretty excited about getting messages to bounce around between different areas of your app. But, let's be real for a second - not all message exchanges go off without a hitch. Sometimes, messages trip over their own feet, and that's where two superheroes come in: the Delayed Message Exchange Plugin and the Dead Letter Queue (DLQ).

They're like the problem solvers of the messaging world, ensuring that even if a message takes a misstep or can't be delivered right away, it's not the end of the world. In this blog, we're going to take a beginner-friendly journey into the heart of these concepts. We'll learn how the Delayed Message Exchange Plugin adds a twist to the usual messaging game, and how the DLQ offers a safety net for messages that encounter a hiccup on their way to their destination. So, whether you're new to messaging or just curious to learn more, get ready to uncover the magic that keeps messages moving smoothly in the world of RabbitMQ!

Understanding the RabbitMQ Delayed Message Exchange Plugin

It lets you slow down how messages get handled, which comes in handy, especially when dealing with messages that didn't quite make it. Imagine giving those messages a breather and a chance to try again after a little time has passed. When messages have tried and tried but just can't get through, they get shifted to a special place where we can figure out what went wrong and make things better (the DLQ Zone).

What's this DLQ Magic, Anyway?

The RabbitMQ Dead Letter Queue (DLQ) serves as a backup storage location for messages that could not be delivered successfully. When a message encounters too many hurdles or fails to reach its destination, RabbitMQ routes it to the DLQ for further research. It's a method for determining what went wrong and improving the message process. In a nutshell, DLQ acts as a safety net, helping you understand and fix delivery issues to make your messaging system more reliable.

Let's get started

Setting Up the Delayed Message Exchange

  1. Navigate to the RabbitMQ plugins directory. It's usually located in /usr/lib/rabbitmq/plugins for Linux systems or C:\Program Files\RabbitMQ\plugins for Windows systems.

  2. Download the RabbitMQ Delayed Message Exchange Plugin. Ensure that the version of the plugin you download matches the version of your installed RabbitMQ.

  3. Copy the plugin file (with a .ez extension) to the plugins directory.

  4. Enable the plugin by running the following command:

    •   rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      
  5. Restart RabbitMQ for the changes to take effect.

Declare the necessary queues and exchanges: the primary queue, the Dead Letter Queue (DLQ), and the delayed exchange.

const amqp = require("amqplib");

async function setup() {
  try {
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();

    // Define exchange names
    const PRIMARY_EXCHANGE = "primary_exchange";
    const DELAY_EXCHANGE = "delay_exchange";
    const DEAD_LETTER_EXCHANGE = "dead_letter_exchange";

    // Define queue names
    const PRIMARY_QUEUE = "primary_queue";
    const DEAD_LETTER_QUEUE = "dead_letter_queue";

    // Define dead letter routing name
    const DEAD_LETTER_ROUTING = "dead_letter_routing";

    // Setup primary queue
    const createPrimaryExchangeAndQueue = async () => {
      await channel.assertExchange(PRIMARY_EXCHANGE, "direct", {
        durable: true,
      });
      await channel.assertQueue(PRIMARY_QUEUE, {
        exclusive: false,
        durable: true,
        deadLetterExchange: DEAD_LETTER_EXCHANGE,
        deadLetterRoutingKey: DEAD_LETTER_ROUTING,
      });
      await channel.bindQueue(PRIMARY_QUEUE, PRIMARY_EXCHANGE);
    };

    // Setup delayed exchange
    const createDelayExchange = async () => {
      await channel.assertExchange(DELAY_EXCHANGE, "x-delayed-message", {
        autoDelete: false,
        durable: true,
        arguments: { "x-delayed-type": "direct" },
      });
      await channel.bindQueue(PRIMARY_QUEUE, DELAY_EXCHANGE);
    };

   // Setup DLQ
    const createDeadLetterExchangeAndQueue = async () => {
      await channel.assertExchange(DEAD_LETTER_EXCHANGE, "direct", {
        durable: true,
      });
      await channel.assertQueue(DEAD_LETTER_QUEUE, {
        exclusive: false,
        durable: true,
      });
      await channel.bindQueue(
        DEAD_LETTER_QUEUE,
        DEAD_LETTER_EXCHANGE,
        DEAD_LETTER_ROUTING
      );
    };

    createPrimaryExchangeAndQueue();
    createDelayExchange();
    createDeadLetterExchangeAndQueue();

    await connection.close();
    console.log("Setup complete.");
  } catch (error) {
    console.error("Error during setup:", error);
  }
}

setup();

Let's break down the main three functions step by step:

  1. Creating the Primary Exchange and Queue:
const createPrimaryExchangeAndQueue = async () => {
  await channel.assertExchange(PRIMARY_EXCHANGE, "direct", {
    durable: true,
  });
  await channel.assertQueue(PRIMARY_QUEUE, {
    exclusive: false,
    durable: true,
    deadLetterExchange: DEAD_LETTER_EXCHANGE,
    deadLetterRoutingKey: DEAD_LETTER_ROUTING,
  });
  await channel.bindQueue(PRIMARY_QUEUE, PRIMARY_EXCHANGE);
};

This function creates a direct exchange named primary_exchange, an associated queue named primary_queue, and binds the queue to the exchange. The deadLetterExchange and deadLetterRoutingKey options are set, meaning that if a message in the primary queue fails to be processed, it will be routed to the dead_letter_exchange with the specified routing key.

  1. Creating the Dead Letter Exchange and Queue:

     const createDeadLetterExchangeAndQueue = async () => {
       await channel.assertExchange(DEAD_LETTER_EXCHANGE, "direct", {
         durable: true,
       });
       await channel.assertQueue(DEAD_LETTER_QUEUE, {
         exclusive: false,
         durable: true,
       });
       await channel.bindQueue(
         DEAD_LETTER_QUEUE,
         DEAD_LETTER_EXCHANGE,
         DEAD_LETTER_ROUTING
       );
     };
    

    This function establishes a direct exchange called dead_letter_exchange along with an associated queue named dead_letter_queue. The queue is then linked to the exchange using the designated routing key. Consequently, when a message is negatively acknowledged (nack), any message connected to the main queue will be relocated to this designated queue.

    These messages will remain there indefinitely, offering us the opportunity to thoroughly investigate and determine the root cause of any issues that may have occurred.

  2. Creating the Delay Exchange:

     const createDelayExchange = async () => {
       await channel.assertExchange(DELAY_EXCHANGE, "x-delayed-message", {
         autoDelete: false,
         durable: true,
         passive: true,
         arguments: { "x-delayed-type": "direct" },
       });
       await channel.bindQueue(PRIMARY_QUEUE, DELAY_EXCHANGE);
     };
    

    This function sets up a delayed message exchange called delay_exchange using a special RabbitMQ plugin ("x-delayed-message"). This plugin makes messages wait before being delivered. The passive option is set to true to see if the exchange is already there.

    This exchange is connected to the primary_queue. So, if a message needs another shot at getting through, it will be sent back to the primary_queue based on our retry schedule. This cycle goes on until we give a thumbs-up(ack) or thumbs-down(nack) to the message.

Now, let's create the consumer to see the final action and mimic a real-world situation.

const PRIMARY_QUEUE = "primary_queue";
const DELAY_EXCHANGE = "delay_exchange";

const DELAY_DEFAULT = 1000;
const RETRY_LIMIT = 3;
const DEAD_LETTER_DELAY = 3000;

const consumer = async () => {
  const connection = await amqp.connect("amqp://localhost");
  const channel = await connection.createChannel();

  // Process max 10 messages at the same time
  channel.prefetch(10);

  // Consume the msg
  await channel.consume(
    PRIMARY_QUEUE,
    async (msg) => {
      try {
        //TODO: Business logic to process message

        channel.ack(msg);
      } catch (err) {
        moveMessageToDelayExchange(channel, msg);
        console.log("Server is down. Message is moved to delay exchange.");
      }
    },
    { noAck: false }
  );
};

async function moveMessageToDelayExchange(channel, msg) {
  // Move message to DL queue if msg fails to process withing maximum defined attempt
  let maxRetryLimit = msg.properties.headers["x-retry-limit"];
  let retryCount = msg.fields.deliveryTag;

  // Move msg to dead letter after max retry happened
  if (retryCount >= maxRetryLimit) {
    // If allUpTo is truthy, all outstanding messages prior to and including the given message are rejected
    // If requeue is truthy, the server will try to put the message or messages back on the queue or queues from which they came.
    channel.nack((message = msg), (allUpTo = false), (requeue = false));
    return;
  }

  // Ack the msg to remove it from primary queue
  channel.ack(msg);

  // Transfer msg to delay exchange so it will retry message based on max retry count
  let newDelay = getDelayRetryInterval(retryCount, DELAY_DEFAULT);
  console.log(`New delay time ${newDelay} ms`);
  await channel.publish(DELAY_EXCHANGE, "", msg.content, {
    headers: {
      "x-delay": newDelay,
      "x-retry-limit": RETRY_LIMIT,
    },
  });
}

// Helper function to give linear increasing seconds based on retry count.
const getDelayRetryInterval = (retryCount, defaultDelay) => {
  const interval = Math.pow(2, retryCount - 1) * defaultDelay;
  console.log(`Next message will be retried after ${interval / 1000} sec`);
  return interval;
};


consumer();
  1. Setting Up the Consumer Function:

    The consumer function establishes a connection with the Primary queue to receive and handle incoming messages. By using the prefetch method, it's designed to manage up to 10 messages simultaneously. Think of the prefetch method as a way to control how many messages we handle at once. It's a crucial setting to prevent overwhelming the system, especially when faced with a sudden influx of thousands of messages.

    Now, picture this: when a message smoothly goes through the processing and the code reaches the ack statement within the try block. This is essentially us giving a thumbs-up to the message, signaling that everything went well. As a result, the message earns its exit ticket from the primary queue.

  2. Moving Messages to Delayed Exchange:

    If something goes wrong while we're handling a message (you know, when things don't quite work out), we've got a special trick up our sleeve. We call on the "moveMessageToDelayExchange" function to deal with the hiccup. This function is like the star of the show in this article – it's the reason we're here, talking about Delayed Message Exchange and DLQ in the first place.

    Now, let's talk about this deliveryTag thing. It's like a special number that comes with the message when we use Delayed Exchanges. It keeps track of how many times we've tried to pass that message around. It's like a handy counter that helps us know when to say, "Okay, that's enough trying for now." We have to be smart about it, though. If we keep trying forever, we could end up with a big mess and everything crashing. So, we set a limit on how many times we'll give a message another shot. It's all about finding the right balance!

    1. The function first checks the number of retry attempts (retryCount) and compares it with the maximum defined attempts (maxRetryLimit) specified in the message headers.

    2. If the retry count exceeds the limit, the message is "nacked" using channel.nack, which will send it to a Dead Letter Queue and that message will be no entertained further.

    3. If the retry count is within the limit, the original message is acknowledged using channel.ack to remove it from the primary queue.

    4. The message is then transferred to a delayed exchange (DELAY_EXCHANGE) with a delay interval (x-delay) based on a retry strategy determined by the getDelayRetryInterval function.

    5. The getDelayRetryInterval function calculates a delay interval for retrying messages. It uses a linear increasing interval strategy, where the interval increases exponentially based on the retry count.

Wrapping things up:

Dealing with messages that hit a bump in the road is a big deal when you're putting together strong and trustworthy systems with RabbitMQ. Luckily, the RabbitMQ Delayed Message Exchange Plugin is like your secret weapon here. It lets you tackle message detours, retries, and even sets up Dead Letter Queues – all like a pro.

By fine-tuning how often we give messages another shot and how many times we'll try, along with the magic of Dead Letter Queues, we're making sure that even when things hiccup, we're on top of it. We're not just retrying blindly; we're investigating what went wrong and making things right. This smart strategy adds a hefty dose of stability and resilience to your distributed applications, ensuring they can handle whatever comes their way.