Apache Kafka Complete Guide | Producers, Consumers, Topics & Node.js

Apache Kafka Complete Guide | Producers, Consumers, Topics & Node.js

이 글의 핵심

Kafka handles millions of events per second with durable, ordered, replayable message streams. This guide covers topics, partitions, consumer groups, offset management, and full Node.js TypeScript integration with kafkajs.

Why Kafka?

Most message queues delete a message once a consumer reads it. That works fine for simple task queues, but it creates problems when you need multiple independent services to react to the same event, or when you need to replay past events for debugging, analytics, or onboarding a new service.

Kafka solves this by treating messages as a persistent, ordered log. Messages are written to disk and retained for a configurable period (default: 7 days). Any number of consumer groups can read the same topic independently, each tracking its own position.

Traditional queue (RabbitMQ):
  Producer → Queue → Consumer (message deleted after consumption)
  One consumer per message
  No replay of past messages

Kafka:
  Producer → Topic (partitioned, stored on disk) → Consumer Group 1
                                                  → Consumer Group 2
                                                  → Consumer Group 3
  Multiple independent consumers
  Replay any message from any offset
  Retain messages for days/weeks
  Millions of messages per second

This architecture means you can add a new analytics service six months later and have it replay all historical events — without touching the producer or existing consumers. That’s the fundamental shift Kafka enables.

Kafka wins for:

  • Event sourcing (immutable event log)
  • Audit trails (every event persisted)
  • Stream processing (real-time analytics)
  • Decoupling microservices (producer doesn’t know consumers)
  • High-throughput pipelines (logs, metrics, clickstreams)

Core Concepts

Understanding four terms unlocks most of Kafka: topic, partition, offset, and consumer group.

A topic is a named category for events — think of it like a database table, but append-only. You write orders to the orders topic, user events to user-events, and so on.

A partition is how Kafka scales. Each topic is split into N partitions, and each partition is an independent ordered log. Messages within a partition are strictly ordered; across partitions they are not. More partitions means more parallelism — but you can’t reduce partitions later without recreating the topic.

An offset is a message’s position within a partition. Consumers track which offset they’ve processed and commit it to Kafka. If a consumer restarts, it resumes from the last committed offset.

A consumer group is a set of consumers that collectively read a topic. Kafka assigns each partition to exactly one consumer in the group — so throughput scales with partition count. Multiple consumer groups read the same topic independently, each maintaining their own offset.

Topic:        Named stream of records (like a database table for events)
Partition:    Topic is split into partitions — unit of parallelism
              Messages within a partition are ordered
              Messages across partitions are NOT ordered

Offset:       Position of a message within a partition
              Consumers track their offset — where they left off

Producer:     Writes messages to topics
Consumer:     Reads messages from topics
Consumer Group: Set of consumers that together read a topic
              Each partition assigned to exactly one consumer in the group
Broker:       A Kafka server instance
Cluster:      Multiple brokers (typically 3+ for production)
Topic: "orders"  (3 partitions)

Partition 0: [offset 0: order#1] [offset 1: order#4] [offset 2: order#7]
Partition 1: [offset 0: order#2] [offset 1: order#5] [offset 2: order#8]
Partition 2: [offset 0: order#3] [offset 1: order#6] [offset 2: order#9]

Consumer Group A (3 consumers = 1 consumer per partition):
  Consumer A1 → Partition 0
  Consumer A2 → Partition 1
  Consumer A3 → Partition 2

Consumer Group B (1 consumer):
  Consumer B1 → Partition 0, 1, 2 (reads all partitions independently)

Setup

# Docker Compose (local development)
# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 3
docker compose up -d

# Install Node.js client
npm install kafkajs

Producer

The producer is responsible for writing messages to topics. The most important decision a producer makes is which partition to write to, because partition determines ordering and which consumer will process the message.

By default, kafkajs routes messages with a key to a consistent partition (same key always goes to the same partition) and distributes keyless messages round-robin. This means if you use orderId as the key, all events for a given order are guaranteed to arrive in order at the same consumer.

// src/producer.ts
import { Kafka, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092'],
  // For production: retry configuration
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

const producer = kafka.producer({
  createPartitioner: Partitioners.LegacyPartitioner,
});

await producer.connect();

// Send a single message
await producer.send({
  topic: 'orders',
  messages: [
    {
      key: 'order-123',         // Optional: determines partition
      value: JSON.stringify({
        orderId: 'order-123',
        userId: 'user-456',
        items: [{ productId: 'p1', quantity: 2 }],
        total: 59.98,
        timestamp: Date.now(),
      }),
      headers: {
        'event-type': 'order.created',
        'correlation-id': 'req-789',
      },
    },
  ],
});

// Send multiple messages (batch — more efficient)
await producer.send({
  topic: 'orders',
  messages: events.map(event => ({
    key: event.orderId,
    value: JSON.stringify(event),
  })),
});

await producer.disconnect();

Partitioning Strategy

// Key-based partitioning: same key → same partition (ordering guaranteed)
// Useful: all events for a user go to same partition → ordered per user
{ key: userId, value: JSON.stringify(event) }

// Round-robin: no key → distributed across partitions
{ key: null, value: JSON.stringify(event) }

// Custom partitioner
const producer = kafka.producer({
  createPartitioner: () => ({ message, partitionMetadata }) => {
    // Route high-priority orders to partition 0
    const order = JSON.parse(message.value?.toString() ?? '{}');
    if (order.priority === 'high') return 0;
    // Others: round-robin
    return Math.floor(Math.random() * partitionMetadata.length);
  },
});

Consumer

Consumers read from topics by subscribing and running a message handler. The groupId is critical: all instances of your service should share the same group ID so Kafka distributes partitions among them. If you run three instances with the same group ID on a topic with six partitions, each instance handles two partitions — automatic horizontal scaling.

When a consumer commits an offset, it’s telling Kafka “I’ve successfully processed everything up to here.” If the consumer restarts, it resumes from that committed offset. This is why committing after successful processing matters: committing before means a crash will skip messages.

// src/consumer.ts
import { Kafka, EachMessagePayload } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'email-service',
  brokers: ['localhost:9092'],
});

const consumer = kafka.consumer({
  groupId: 'email-service-group',   // Consumer group ID
});

await consumer.connect();

// Subscribe to topic
await consumer.subscribe({
  topic: 'orders',
  fromBeginning: false,  // Start from latest (true = replay all messages)
});

// Process messages
await consumer.run({
  // Process one message at a time (default)
  eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
    const key = message.key?.toString();
    const value = message.value?.toString();

    if (!value) return;

    const order = JSON.parse(value);
    console.log(`Processing order ${order.orderId} from partition ${partition}`);

    try {
      await sendOrderConfirmationEmail(order);
    } catch (error) {
      console.error(`Failed to process order ${order.orderId}:`, error);
      // Handle error: retry, DLQ, alert
      throw error;  // kafkajs will retry based on retry config
    }
  },
});

Batch Processing

// Process multiple messages at once (higher throughput)
await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
    for (const message of batch.messages) {
      const order = JSON.parse(message.value?.toString() ?? '{}');

      await processOrder(order);

      // Commit offset after each successful message
      resolveOffset(message.offset);

      // Prevent consumer timeout for long-running batches
      await heartbeat();
    }

    await commitOffsetsIfNecessary();
  },
});

Manual Offset Management

// Disable auto-commit for more control
const consumer = kafka.consumer({
  groupId: 'my-group',
});

await consumer.run({
  autoCommit: false,   // Manual commits only
  eachMessage: async ({ topic, partition, message }) => {
    try {
      await processMessage(message);

      // Only commit after successful processing
      await consumer.commitOffsets([{
        topic,
        partition,
        offset: (parseInt(message.offset) + 1).toString(),
      }]);
    } catch (error) {
      // Don't commit — message will be redelivered
      console.error('Processing failed, will retry:', error);
    }
  },
});

Topics and Partitions Management

const admin = kafka.admin();
await admin.connect();

// Create topic
await admin.createTopics({
  topics: [
    {
      topic: 'orders',
      numPartitions: 6,           // 6 = can have up to 6 parallel consumers
      replicationFactor: 3,       // 3 = tolerates 2 broker failures
      configEntries: [
        { name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) },  // 7 days
        { name: 'cleanup.policy', value: 'delete' },
      ],
    },
  ],
});

// List topics
const topics = await admin.listTopics();

// Get topic metadata
const metadata = await admin.fetchTopicMetadata({ topics: ['orders'] });

// List consumer groups
const groups = await admin.listGroups();

// Get consumer group offsets (see where consumers are)
const offsets = await admin.fetchOffsets({ groupId: 'email-service-group', topics: ['orders'] });

await admin.disconnect();

Dead Letter Queue Pattern

const DLQ_TOPIC = 'orders.dlq';

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    let attempt = 0;
    const maxAttempts = 3;

    while (attempt < maxAttempts) {
      try {
        await processMessage(message);
        return;  // Success
      } catch (error) {
        attempt++;
        if (attempt === maxAttempts) {
          // Send to DLQ after max retries
          await producer.send({
            topic: DLQ_TOPIC,
            messages: [{
              key: message.key,
              value: message.value,
              headers: {
                ...message.headers,
                'dlq-original-topic': topic,
                'dlq-original-partition': String(partition),
                'dlq-error': String(error),
                'dlq-attempts': String(maxAttempts),
              },
            }],
          });
        } else {
          await sleep(1000 * Math.pow(2, attempt));  // Exponential backoff
        }
      }
    }
  },
});

Real-World: Order Processing Pipeline

// Order Service → Kafka → Email Service + Inventory Service + Analytics Service

// order-service/producer.ts
await producer.send({
  topic: 'orders',
  messages: [{
    key: order.id,                  // Same customer → same partition → ordered
    value: JSON.stringify({
      type: 'ORDER_CREATED',
      orderId: order.id,
      userId: order.userId,
      items: order.items,
      total: order.total,
      timestamp: Date.now(),
    }),
  }],
});

// email-service/consumer.ts (Consumer Group: email-service)
await consumer.subscribe({ topic: 'orders' });
// Receives ORDER_CREATED → sends confirmation email

// inventory-service/consumer.ts (Consumer Group: inventory-service)
await consumer.subscribe({ topic: 'orders' });
// Receives ORDER_CREATED → deducts inventory

// analytics-service/consumer.ts (Consumer Group: analytics-service)
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
// Receives all events → updates dashboards

Each consumer group reads independently — adding a new service doesn’t affect existing ones.


Production Configuration

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: process.env.KAFKA_BROKERS!.split(','),

  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: process.env.KAFKA_USERNAME!,
    password: process.env.KAFKA_PASSWORD!,
  },

  retry: {
    initialRetryTime: 300,
    retries: 10,
  },
});

const producer = kafka.producer({
  allowAutoTopicCreation: false,    // Don't auto-create in production
  transactionTimeout: 30000,
  idempotent: true,                 // Exactly-once producer semantics
});

Related posts: