RabbitMQ Complete Guide | Message Queues, Exchanges, and Patterns

RabbitMQ Complete Guide | Message Queues, Exchanges, and Patterns

이 글의 핵심

RabbitMQ is the most widely deployed open-source message broker. This guide covers the core AMQP model — exchanges, queues, bindings — plus practical patterns for work queues, pub/sub, and dead-letter handling.

What This Guide Covers

RabbitMQ is a battle-tested message broker that decouples producers from consumers. This guide covers the AMQP model, key exchange types, and production patterns including dead letter queues and publisher confirms.

Real-world insight: Adding RabbitMQ between an image upload endpoint and the resize workers cut API response time from 8s to 50ms — the heavy work happens asynchronously.


Core Concepts

Producer → Exchange → Queue → Consumer
  • Producer: publishes messages to an exchange
  • Exchange: routes messages to queues based on rules
  • Queue: buffers messages until a consumer processes them
  • Consumer: reads and acknowledges messages from a queue
  • Binding: links an exchange to a queue with a routing key

Setup with Docker

docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

Management UI: http://localhost:15672 (guest / guest)


Node.js: amqplib

npm install amqplib

Producer

import amqp from 'amqplib';

async function publish(queue, message) {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, { durable: true });

  channel.sendToQueue(
    queue,
    Buffer.from(JSON.stringify(message)),
    { persistent: true }  // survive broker restart
  );

  console.log('Published:', message);
  await channel.close();
  await connection.close();
}

publish('tasks', { type: 'resize-image', url: 'https://...', width: 800 });

Consumer

async function consume(queue) {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, { durable: true });
  channel.prefetch(1);  // process one message at a time

  console.log('Waiting for messages...');

  channel.consume(queue, async (msg) => {
    if (!msg) return;
    const data = JSON.parse(msg.content.toString());

    try {
      await processTask(data);
      channel.ack(msg);  // acknowledge success
    } catch (err) {
      console.error('Task failed:', err);
      channel.nack(msg, false, false);  // reject, send to DLQ
    }
  });
}

consume('tasks');

Exchange Types

1. Direct Exchange (point-to-point routing)

Messages go to queues where the binding key matches the routing key exactly.

await channel.assertExchange('orders', 'direct', { durable: true });
await channel.assertQueue('order.created', { durable: true });
await channel.bindQueue('order.created', 'orders', 'created');

// Publish
channel.publish('orders', 'created', Buffer.from(JSON.stringify(order)));

2. Topic Exchange (pattern routing)

Routing keys support wildcards: * (one word) and # (zero or more words).

await channel.assertExchange('logs', 'topic', { durable: true });
await channel.assertQueue('error-logs', { durable: true });
await channel.bindQueue('error-logs', 'logs', '*.error');       // any service, error level
await channel.bindQueue('all-logs', 'logs', '#');               // everything

// Publish
channel.publish('logs', 'auth.error', Buffer.from('Login failed'));
channel.publish('logs', 'payment.warn', Buffer.from('Retry 2'));

3. Fanout Exchange (broadcast)

Sends to all bound queues, ignoring routing keys.

await channel.assertExchange('notifications', 'fanout', { durable: true });

// Each consumer declares their own queue and binds to the fanout
await channel.assertQueue('', { exclusive: true }); // auto-named, auto-deleted
await channel.bindQueue(q.queue, 'notifications', '');

// All queues receive every published message
channel.publish('notifications', '', Buffer.from('System update at 2am'));

4. Headers Exchange

Routes based on message headers instead of routing key.

await channel.assertExchange('reports', 'headers', { durable: true });
await channel.bindQueue('pdf-queue', 'reports', '', {
  'x-match': 'all',
  format: 'pdf',
  priority: 'high',
});

channel.publish('reports', '', Buffer.from(data), {
  headers: { format: 'pdf', priority: 'high' }
});

Dead Letter Queues

Route failed messages to a DLQ for inspection and reprocessing:

// Main queue with DLQ config
await channel.assertQueue('tasks', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'failed',
    'x-message-ttl': 30000,  // expire after 30s → also goes to DLQ
  }
});

// Dead letter exchange and queue
await channel.assertExchange('dlx', 'direct', { durable: true });
await channel.assertQueue('tasks.failed', { durable: true });
await channel.bindQueue('tasks.failed', 'dlx', 'failed');

// Consumer: reject to send to DLQ
channel.nack(msg, false, false);

Publisher Confirms

Guarantee the broker received your message:

const channel = await connection.createConfirmChannel();

channel.sendToQueue(
  'tasks',
  Buffer.from(JSON.stringify(data)),
  { persistent: true },
  (err, ok) => {
    if (err) {
      console.error('Message not confirmed — retry');
    } else {
      console.log('Message confirmed by broker');
    }
  }
);

Python: pika

pip install pika
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks', durable=True)

# Publish
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body=json.dumps({'type': 'send-email', 'to': 'user@example.com'}),
    properties=pika.BasicProperties(delivery_mode=2)  # persistent
)

# Consume
def callback(ch, method, properties, body):
    task = json.loads(body)
    print(f"Processing: {task}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Production Checklist

SettingRecommendation
Queue durabilityAlways durable: true
Message persistenceAlways persistent: true
Consumer prefetchSet to 1–10 (avoid overwhelming slow consumers)
Publisher confirmsEnable for critical messages
Dead letter queueConfigure on every work queue
Quorum queuesUse instead of classic for critical data
Clustering3+ nodes for HA; odd number for quorum

Key Takeaways

  • Direct → route by exact key (task routing)
  • Topic → route by pattern (log levels, service names)
  • Fanout → broadcast to all consumers (notifications)
  • Always use durable queues + persistent messages in production
  • prefetch(1) ensures fair dispatch among competing consumers
  • Dead letter queues prevent message loss on failures
  • Publisher confirms close the producer-side reliability gap

RabbitMQ’s exchange model handles nearly every messaging pattern. Start with a direct exchange and a simple work queue, then layer in topic routing and DLQs as your system grows.