[2026] C++ Message Queues: RabbitMQ and Kafka Integration Complete Guide [#50-7]

[2026] C++ Message Queues: RabbitMQ and Kafka Integration Complete Guide [#50-7]

이 글의 핵심

Complete message queue guide: Decouple services with AMQP and Kafka, producers and consumers, serialization, backpressure, performance comparison, real-world examples, and production patterns.

Introduction: “Sync calls are the bottleneck”

If service A calls B over HTTP and waits, A pays B’s full latency (DB, external APIs, heavy CPU). Message queues let A publish work and return quickly—decoupling producers and consumers and buffering spikes. Topics:

  • RabbitMQ (AMQP) — exchanges, queues, acks (e.g. SimpleAmqpClient patterns)
  • Kafka — topics, partitions, consumer groups (e.g. librdkafka)
  • Serialization — JSON, Protobuf
  • Errors, tuning, production patterns Environment: C++17+.

Table of contents

  1. When queues help
  2. RabbitMQ vs Kafka
  3. RabbitMQ implementation
  4. Kafka implementation
  5. Serialization strategies
  6. Real-world examples
  7. Performance comparison
  8. Error handling
  9. Common mistakes
  10. Best practices
  11. Production patterns

1. When queues help

Use cases

  1. Order pipeline — user gets immediate ACK; payment, stock, email, analytics run asynchronously.
  2. Traffic spikes — queue absorbs bursts; workers drain at sustainable rate.
  3. Microservices — publish OrderCreated; downstream services subscribe independently.
  4. Centralized logs/metrics — Kafka-style log aggregation.
  5. Heavy jobs — image resize, encoding, fan-out to worker pools.

Architecture comparison

아래 코드는 mermaid를 사용한 구현 예제입니다. 비동기 처리를 통해 효율적으로 작업을 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

flowchart LR
    subgraph sync["Synchronous (HTTP)"]
        A1[Service A] -->|wait| B1[Service B]
        B1 -->|wait| C1[Service C]
    end
    
    subgraph async["Asynchronous (Queue)"]
        A2[Service A] -->|publish| Q[Queue]
        Q --> B2[Service B]
        Q --> C2[Service C]
    end

Benefits:

  • Decoupling: Services don’t need to know about each other
  • Buffering: Handle traffic spikes
  • Reliability: Retry failed messages
  • Scalability: Add more consumers

2. RabbitMQ vs Kafka

RabbitMQKafka
ModelClassic broker, routingDurable log / streaming
RetentionOften delete after ackTime/size retention
ThroughputVery high (100k+ msg/s)Extremely high (1M+ msg/s)
OrderingPer queuePer partition
LatencyLow (ms)Low-medium (ms-10ms)
Use caseRPC-ish, work queuesEvent logs, analytics
ReplayLimitedFull replay support
RoutingFlexible (exchanges)Simple (topics)

When to use RabbitMQ

  • Traditional messaging patterns
  • Complex routing (topic, fanout, direct)
  • RPC-style request/response
  • Lower message volume (<100k msg/s)
  • Message acknowledgment critical

When to use Kafka

  • Event sourcing
  • Log aggregation
  • High throughput (>100k msg/s)
  • Message replay needed
  • Stream processing
  • Multiple consumers reading same data

3. RabbitMQ implementation

Installation

아래 코드는 bash를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

# Ubuntu
sudo apt-get install rabbitmq-server
# Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
# C++ client
sudo apt-get install librabbitmq-dev
# Or use AMQP-CPP: https://github.com/CopernicaMarketingSoftware/AMQP-CPP

Producer example

다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <string>
class RabbitMQProducer {
    amqp_connection_state_t conn_;
    amqp_socket_t* socket_;
    std::string exchange_;
    
public:
    RabbitMQProducer(const std::string& host, int port, const std::string& exchange)
        : exchange_(exchange) {
        conn_ = amqp_new_connection();
        socket_ = amqp_tcp_socket_new(conn_);
        
        if (amqp_socket_open(socket_, host.c_str(), port) != 0) {
            throw std::runtime_error("Cannot open socket");
        }
        
        amqp_rpc_reply_t reply = amqp_login(conn_, "/", 0, 131072, 0,
            AMQP_SASL_METHOD_PLAIN, "guest", "guest");
        
        if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
            throw std::runtime_error("Login failed");
        }
        
        amqp_channel_open(conn_, 1);
        amqp_get_rpc_reply(conn_);
    }
    
    ~RabbitMQProducer() {
        amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
        amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
        amqp_destroy_connection(conn_);
    }
    
    void publish(const std::string& routing_key, const std::string& message) {
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("application/json");
        props.delivery_mode = 2;  // Persistent
        
        amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
        
        int result = amqp_basic_publish(
            conn_,
            1,
            amqp_cstring_bytes(exchange_.c_str()),
            amqp_cstring_bytes(routing_key.c_str()),
            0,
            0,
            &props,
            message_bytes
        );
        
        if (result != 0) {
            throw std::runtime_error("Publish failed");
        }
    }
};

Consumer example

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class RabbitMQConsumer {
    amqp_connection_state_t conn_;
    std::string queue_;
    
public:
    RabbitMQConsumer(const std::string& host, int port, const std::string& queue)
        : queue_(queue) {
        conn_ = amqp_new_connection();
        amqp_socket_t* socket = amqp_tcp_socket_new(conn_);
        
        amqp_socket_open(socket, host.c_str(), port);
        amqp_login(conn_, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
        amqp_channel_open(conn_, 1);
        amqp_get_rpc_reply(conn_);
        
        // Set prefetch count
        amqp_basic_qos(conn_, 1, 0, 10, 0);
    }
    
    ~RabbitMQConsumer() {
        amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
        amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
        amqp_destroy_connection(conn_);
    }
    
    void consume(std::function<bool(const std::string&)> handler) {
        amqp_basic_consume(conn_, 1, amqp_cstring_bytes(queue_.c_str()),
            amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
        
        while (true) {
            amqp_envelope_t envelope;
            amqp_maybe_release_buffers(conn_);
            
            amqp_rpc_reply_t result = amqp_consume_message(conn_, &envelope, NULL, 0);
            
            if (result.reply_type == AMQP_RESPONSE_NORMAL) {
                std::string message(
                    static_cast<char*>(envelope.message.body.bytes),
                    envelope.message.body.len
                );
                
                bool success = handler(message);
                
                if (success) {
                    amqp_basic_ack(conn_, 1, envelope.delivery_tag, 0);
                } else {
                    amqp_basic_nack(conn_, 1, envelope.delivery_tag, 0, 1);
                }
                
                amqp_destroy_envelope(&envelope);
            }
        }
    }
};

4. Kafka implementation

Installation

아래 코드는 bash를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.

# Install librdkafka
sudo apt-get install librdkafka-dev
# Or build from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

Producer example

다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
class KafkaProducer {
    std::unique_ptr<RdKafka::Producer> producer_;
    std::unique_ptr<RdKafka::Topic> topic_;
    
public:
    KafkaProducer(const std::string& brokers, const std::string& topic) {
        std::string errstr;
        
        RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        conf->set("bootstrap.servers", brokers, errstr);
        conf->set("client.id", "cpp-producer", errstr);
        
        producer_.reset(RdKafka::Producer::create(conf, errstr));
        if (!producer_) {
            throw std::runtime_error("Failed to create producer: " + errstr);
        }
        
        RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
        topic_.reset(RdKafka::Topic::create(producer_.get(), topic, tconf, errstr));
        
        delete conf;
        delete tconf;
    }
    
    void produce(const std::string& key, const std::string& message) {
        RdKafka::ErrorCode resp = producer_->produce(
            topic_.get(),
            RdKafka::Topic::PARTITION_UA,
            RdKafka::Producer::RK_MSG_COPY,
            const_cast<char*>(message.c_str()),
            message.size(),
            &key,
            nullptr
        );
        
        if (resp != RdKafka::ERR_NO_ERROR) {
            throw std::runtime_error("Produce failed: " + RdKafka::err2str(resp));
        }
        
        producer_->poll(0);
    }
    
    void flush(int timeout_ms = 10000) {
        producer_->flush(timeout_ms);
    }
};

Consumer example

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class KafkaConsumer {
    std::unique_ptr<RdKafka::KafkaConsumer> consumer_;
    
public:
    KafkaConsumer(const std::string& brokers, const std::string& group_id,
                  const std::vector<std::string>& topics) {
        std::string errstr;
        
        RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        conf->set("bootstrap.servers", brokers, errstr);
        conf->set("group.id", group_id, errstr);
        conf->set("auto.offset.reset", "earliest", errstr);
        conf->set("enable.auto.commit", "false", errstr);
        
        consumer_.reset(RdKafka::KafkaConsumer::create(conf, errstr));
        if (!consumer_) {
            throw std::runtime_error("Failed to create consumer: " + errstr);
        }
        
        RdKafka::ErrorCode err = consumer_->subscribe(topics);
        if (err) {
            throw std::runtime_error("Subscribe failed: " + RdKafka::err2str(err));
        }
        
        delete conf;
    }
    
    void consume(std::function<bool(const std::string&, const std::string&)> handler) {
        while (true) {
            RdKafka::Message* message = consumer_->consume(1000);
            
            if (message->err() == RdKafka::ERR_NO_ERROR) {
                std::string key = message->key() ? *message->key() : "";
                std::string payload(
                    static_cast<const char*>(message->payload()),
                    message->len()
                );
                
                bool success = handler(key, payload);
                
                if (success) {
                    consumer_->commitSync(message);
                }
            }
            
            delete message;
        }
    }
};

5. Serialization strategies

JSON serialization

다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

#include <nlohmann/json.hpp>
struct Order {
    std::string id;
    std::string customer;
    double amount;
    
    std::string to_json() const {
        nlohmann::json j;
        j[id] = id;
        j[customer] = customer;
        j[amount] = amount;
        return j.dump();
    }
    
    static Order from_json(const std::string& json_str) {
        auto j = nlohmann::json::parse(json_str);
        Order order;
        order.id = j[id];
        order.customer = j[customer];
        order.amount = j[amount];
        return order;
    }
};

Protobuf serialization

아래 코드는 protobuf를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.

// order.proto
syntax = "proto3";
message Order {
  string id = 1;
  string customer = 2;
  double amount = 3;
  int64 timestamp = 4;
}

다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

#include "order.pb.h"
std::string serialize_order(const Order& order) {
    OrderProto proto;
    proto.set_id(order.id);
    proto.set_customer(order.customer);
    proto.set_amount(order.amount);
    proto.set_timestamp(std::time(nullptr));
    
    std::string serialized;
    proto.SerializeToString(&serialized);
    return serialized;
}
Order deserialize_order(const std::string& data) {
    OrderProto proto;
    proto.ParseFromString(data);
    
    Order order;
    order.id = proto.id();
    order.customer = proto.customer();
    order.amount = proto.amount();
    return order;
}

6. Real-world examples

Example 1: Order processing system

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 비동기 처리를 통해 효율적으로 작업을 수행합니다, 에러 처리를 통해 안정성을 확보합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class OrderProcessor {
    KafkaProducer producer_;
    
public:
    OrderProcessor() : producer_("localhost:9092", "orders") {}
    
    void create_order(const Order& order) {
        // Publish to Kafka
        std::string message = order.to_json();
        producer_.produce(order.id, message);
        producer_.flush();
        
        std::cout << "Order published: " << order.id << "\n";
    }
};
class PaymentService {
    KafkaConsumer consumer_;
    
public:
    PaymentService() 
        : consumer_("localhost:9092", "payment-service", {"orders"}) {}
    
    void start() {
        consumer_.consume([this](const std::string& key, const std::string& payload) {
            try {
                Order order = Order::from_json(payload);
                process_payment(order);
                return true;
            } catch (const std::exception& e) {
                std::cerr << "Payment failed: " << e.what() << "\n";
                return false;
            }
        });
    }
    
private:
    void process_payment(const Order& order) {
        std::cout << "Processing payment for order: " << order.id << "\n";
        // Payment logic...
    }
};

Example 2: Log aggregation

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class LogAggregator {
    KafkaProducer producer_;
    
public:
    LogAggregator() : producer_("localhost:9092", "logs") {}
    
    void log(const std::string& level, const std::string& message) {
        nlohmann::json log_entry;
        log_entry[timestamp] = std::time(nullptr);
        log_entry[level] = level;
        log_entry[message] = message;
        log_entry[service] = "myapp";
        
        producer_.produce("", log_entry.dump());
    }
};

7. Performance comparison

Throughput benchmark

아래 코드는 code를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.

Messages/sec | RabbitMQ | Kafka
-------------|----------|-------
10,000       | 5ms      | 2ms
100,000      | 50ms     | 10ms
1,000,000    | 500ms    | 50ms

Latency benchmark

아래 코드는 code를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.

Percentile | RabbitMQ | Kafka
-----------|----------|-------
p50        | 1ms      | 2ms
p95        | 5ms      | 10ms
p99        | 20ms     | 50ms

8. Error handling

Retry logic

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 비동기 처리를 통해 효율적으로 작업을 수행합니다, 에러 처리를 통해 안정성을 확보합니다, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class RetryHandler {
    int max_retries_;
    std::chrono::milliseconds backoff_;
    
public:
    RetryHandler(int max_retries, std::chrono::milliseconds backoff)
        : max_retries_(max_retries), backoff_(backoff) {}
    
    bool handle_message(const std::string& message) {
        for (int attempt = 0; attempt < max_retries_; ++attempt) {
            try {
                process(message);
                return true;
            } catch (const std::exception& e) {
                std::cerr << "Attempt " << attempt + 1 << " failed: " << e.what() << "\n";
                if (attempt < max_retries_ - 1) {
                    std::this_thread::sleep_for(backoff_ * (attempt + 1));
                }
            }
        }
        
        // Send to dead letter queue
        send_to_dlq(message);
        return false;
    }
};

Dead letter queue

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class DeadLetterQueue {
    KafkaProducer dlq_producer_;
    
public:
    DeadLetterQueue() : dlq_producer_("localhost:9092", "dlq") {}
    
    void send(const std::string& original_topic, const std::string& message,
              const std::string& error) {
        nlohmann::json dlq_message;
        dlq_message[original_topic] = original_topic;
        dlq_message[message] = message;
        dlq_message[error] = error;
        dlq_message[timestamp] = std::time(nullptr);
        
        dlq_producer_.produce("", dlq_message.dump());
    }
};

9. Common mistakes

Mistake 1: Not handling backpressure

아래 코드는 cpp를 사용한 구현 예제입니다. 반복문으로 데이터를 처리합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

// ❌ BAD: Producer overwhelms consumer
while (true) {
    producer.produce(message);  // No rate limiting
}
// ✅ GOOD: Rate limiting
RateLimiter limiter(1000);  // 1000 msg/s
while (true) {
    limiter.wait();
    producer.produce(message);
}

Mistake 2: Not committing offsets

아래 코드는 cpp를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

// ❌ BAD: Auto-commit may lose messages
consumer.consume([](const std::string& msg) {
    process(msg);  // If crash here, message lost
});
// ✅ GOOD: Manual commit after processing
consumer.consume([&](const std::string& msg) {
    process(msg);
    consumer.commit();  // Commit after success
});

10. Best practices

  1. Idempotent consumers: Handle duplicate messages
  2. Schema evolution: Use Protobuf or Avro
  3. Monitoring: Track lag, throughput, errors
  4. Partitioning: Design partition keys carefully
  5. Backpressure: Implement rate limiting
  6. Dead letter queues: Handle poison messages
  7. Graceful shutdown: Flush producers, close consumers
  8. Testing: Use embedded brokers for tests

11. Production patterns

Pattern 1: Circuit breaker

다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.

class CircuitBreaker {
    enum State { CLOSED, OPEN, HALF_OPEN };
    State state_ = CLOSED;
    int failure_count_ = 0;
    int threshold_ = 5;
    
public:
    bool allow_request() {
        if (state_ == OPEN) {
            // Check if should try again
            return false;
        }
        return true;
    }
    
    void on_success() {
        failure_count_ = 0;
        state_ = CLOSED;
    }
    
    void on_failure() {
        failure_count_++;
        if (failure_count_ >= threshold_) {
            state_ = OPEN;
        }
    }
};

Summary

  • RabbitMQ: Classic messaging, flexible routing, RPC patterns
  • Kafka: High-throughput streaming, event sourcing, replay
  • Serialization: JSON for simplicity, Protobuf for performance
  • Error handling: Retries, dead letter queues, circuit breakers
  • Production: Monitoring, backpressure, graceful shutdown Next: Monitoring dashboard (#50-6)
    Previous: Production deployment (#50-5)

Keywords

C++ message queue, RabbitMQ, Kafka, AMQP, async messaging, event-driven architecture, microservices

... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3