Bull Queue Complete Guide | Redis-Based Job Queue for Node.js

Bull Queue Complete Guide | Redis-Based Job Queue for Node.js

이 글의 핵심

Bull is a Redis-based queue for Node.js that handles job processing, retries, priorities, and delayed jobs. It's battle-tested and used by thousands of production apps.

Introduction

Bull is a Redis-based queue for Node.js that handles distributed job processing. It’s perfect for tasks that are too slow or unreliable to run in API request handlers.

Why Use Queues?

Without queue (blocking):

app.post('/send-email', async (req, res) => {
  await sendEmail(req.body); // Takes 2-3 seconds
  res.json({ message: 'Email sent' });
});

// User waits 2-3 seconds for response
// If email fails, request fails

With queue (non-blocking):

app.post('/send-email', async (req, res) => {
  await emailQueue.add(req.body); // <10ms
  res.json({ message: 'Email queued' });
});

// User gets instant response
// Email processed in background
// Automatic retries on failure

1. Installation

npm install bull

Requires Redis:

# Docker
docker run -d -p 6379:6379 redis

# macOS
brew install redis
redis-server

# Ubuntu
sudo apt install redis-server

2. Basic Queue

const Queue = require('bull');

// Create queue
const emailQueue = new Queue('email', {
  redis: {
    host: '127.0.0.1',
    port: 6379,
  }
});

// Add job to queue
await emailQueue.add({
  to: 'user@example.com',
  subject: 'Welcome!',
  body: 'Thanks for signing up',
});

// Process jobs
emailQueue.process(async (job) => {
  console.log('Processing job:', job.id);
  await sendEmail(job.data);
  console.log('Job completed:', job.id);
});

3. Job Options

await emailQueue.add({
  to: 'user@example.com',
  subject: 'Hello',
}, {
  // Job options
  attempts: 3,              // Retry up to 3 times
  backoff: {
    type: 'exponential',    // Exponential backoff
    delay: 2000,            // Start with 2 second delay
  },
  delay: 5000,              // Delay job by 5 seconds
  priority: 1,              // Priority (1 = highest)
  timeout: 30000,           // Timeout after 30 seconds
  removeOnComplete: true,   // Remove from Redis when done
  removeOnFail: false,      // Keep failed jobs for debugging
});

4. Priority Queues

// Add jobs with different priorities
await queue.add({ task: 'critical' }, { priority: 1 });    // Highest
await queue.add({ task: 'normal' }, { priority: 5 });
await queue.add({ task: 'low' }, { priority: 10 });        // Lowest

// High priority jobs processed first

5. Delayed Jobs

// Process job after 1 hour
await queue.add({ task: 'reminder' }, {
  delay: 60 * 60 * 1000, // 1 hour in milliseconds
});

// Process at specific time
const scheduledTime = new Date('2026-12-25T00:00:00Z');
await queue.add({ task: 'holiday-email' }, {
  delay: scheduledTime.getTime() - Date.now(),
});

6. Repeatable Jobs (Cron)

// Run every 5 minutes
await queue.add({ task: 'cleanup' }, {
  repeat: {
    every: 5 * 60 * 1000, // 5 minutes
  }
});

// Cron syntax
await queue.add({ task: 'daily-report' }, {
  repeat: {
    cron: '0 9 * * *', // Every day at 9 AM
  }
});

// With timezone
await queue.add({ task: 'morning-email' }, {
  repeat: {
    cron: '0 8 * * *',
    tz: 'America/New_York',
  }
});

7. Job Progress

// Report progress
emailQueue.process(async (job) => {
  await job.progress(0);
  
  const emails = job.data.emails;
  
  for (let i = 0; i < emails.length; i++) {
    await sendEmail(emails[i]);
    await job.progress(Math.round(((i + 1) / emails.length) * 100));
  }
  
  return { sent: emails.length };
});

// Listen for progress
queue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} is ${progress}% done`);
});

8. Job Events

// Job completed
queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

// Job failed
queue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
});

// Job stalled (taking too long)
queue.on('stalled', (job) => {
  console.warn(`Job ${job.id} stalled`);
});

// Job removed
queue.on('removed', (job) => {
  console.log(`Job ${job.id} removed`);
});

// Global completed (all jobs)
queue.on('global:completed', (jobId, result) => {
  console.log(`Job ${jobId} completed globally`);
});

9. Multiple Processors

// Processor 1: High priority
queue.process('high-priority', 5, async (job) => {
  // Process up to 5 high-priority jobs concurrently
  await processHighPriority(job.data);
});

// Processor 2: Low priority
queue.process('low-priority', 2, async (job) => {
  // Process up to 2 low-priority jobs concurrently
  await processLowPriority(job.data);
});

// Add jobs
await queue.add('high-priority', { task: 'urgent' });
await queue.add('low-priority', { task: 'background' });

10. Real-World Example: Email Service

const Queue = require('bull');
const nodemailer = require('nodemailer');

// Create queue
const emailQueue = new Queue('email', {
  redis: process.env.REDIS_URL,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000,
    },
    removeOnComplete: 100, // Keep last 100 completed
    removeOnFail: false,   // Keep all failed for debugging
  }
});

// Email transporter
const transporter = nodemailer.createTransport({ /* config */ });

// Process emails
emailQueue.process(async (job) => {
  const { to, subject, html } = job.data;
  
  console.log(`Sending email to ${to}`);
  
  try {
    await transporter.sendMail({ to, subject, html });
    return { sent: true, to };
  } catch (error) {
    console.error(`Failed to send email to ${to}:`, error);
    throw error; // Will retry
  }
});

// Listen for events
emailQueue.on('completed', (job, result) => {
  console.log(`Email sent to ${result.to}`);
});

emailQueue.on('failed', (job, err) => {
  console.error(`Email to ${job.data.to} failed after ${job.attemptsMade} attempts`);
  // Notify admin or log to monitoring service
});

// API endpoint
app.post('/api/send-email', async (req, res) => {
  const { to, subject, html } = req.body;
  
  const job = await emailQueue.add({
    to,
    subject,
    html,
  });
  
  res.json({
    message: 'Email queued',
    jobId: job.id,
  });
});

// Check job status
app.get('/api/jobs/:id', async (req, res) => {
  const job = await emailQueue.getJob(req.params.id);
  
  if (!job) {
    return res.status(404).json({ error: 'Job not found' });
  }
  
  const state = await job.getState();
  const progress = job.progress();
  
  res.json({
    id: job.id,
    state,
    progress,
    data: job.data,
  });
});

11. Rate Limiting

const queue = new Queue('api-calls', {
  redis: process.env.REDIS_URL,
  limiter: {
    max: 100,        // Max 100 jobs
    duration: 60000, // Per 60 seconds
  }
});

// Jobs are automatically rate-limited
await queue.add({ url: 'https://api.example.com/data' });

12. Bull Board (UI Dashboard)

npm install bull-board
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullAdapter(emailQueue),
    new BullAdapter(imageQueue),
    new BullAdapter(reportQueue),
  ],
  serverAdapter: serverAdapter,
});

app.use('/admin/queues', serverAdapter.getRouter());

// Visit http://localhost:3000/admin/queues

13. Job Cleanup

// Remove completed jobs older than 1 day
await queue.clean(24 * 3600 * 1000, 'completed');

// Remove failed jobs older than 1 week
await queue.clean(7 * 24 * 3600 * 1000, 'failed');

// Remove all jobs
await queue.empty();

// Automatic cleanup
emailQueue.on('completed', async (job) => {
  await job.remove();
});

14. Concurrency

// Process 5 jobs concurrently
queue.process(5, async (job) => {
  return await processJob(job.data);
});

// Named processors with different concurrency
queue.process('email', 10, async (job) => {
  // Up to 10 email jobs at once
});

queue.process('image', 3, async (job) => {
  // Up to 3 image jobs at once (CPU intensive)
});

15. Graceful Shutdown

// Handle shutdown
async function shutdown() {
  console.log('Shutting down gracefully...');
  
  // Close queue (waits for active jobs)
  await emailQueue.close();
  
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

16. Best Practices

1. Use Named Jobs

// Good: named jobs
await queue.add('send-welcome-email', { userId: 123 });
await queue.add('generate-report', { reportId: 456 });

queue.process('send-welcome-email', sendWelcomeEmail);
queue.process('generate-report', generateReport);

2. Idempotent Jobs

// Ensure jobs can be retried safely
queue.process(async (job) => {
  const { userId, emailType } = job.data;
  
  // Check if already sent
  const sent = await db.emailLogs.findOne({ userId, emailType });
  if (sent) {
    console.log('Email already sent, skipping');
    return;
  }
  
  // Send email
  await sendEmail(userId, emailType);
  
  // Log
  await db.emailLogs.create({ userId, emailType, sentAt: new Date() });
});

3. Monitor Queue Health

setInterval(async () => {
  const waiting = await queue.getWaitingCount();
  const active = await queue.getActiveCount();
  const failed = await queue.getFailedCount();
  
  console.log(`Queue status - Waiting: ${waiting}, Active: ${active}, Failed: ${failed}`);
  
  // Alert if too many jobs waiting
  if (waiting > 1000) {
    console.warn('Queue backlog too high!');
    // Send alert
  }
}, 60000); // Every minute

Summary

Bull provides reliable background job processing:

  • Redis-based for distributed systems
  • Automatic retries with backoff
  • Job priorities and delays
  • Cron-like scheduling for recurring jobs
  • Progress tracking and events

Key Takeaways:

  1. Use for slow or unreliable tasks
  2. Configure retries and backoff
  3. Monitor queue health
  4. Use Bull Board for UI
  5. Graceful shutdown handling

Next Steps:

  • Deploy with Redis
  • Scale with Docker
  • Monitor with Winston

Resources: