[2026] C++ Message Queues: RabbitMQ and Kafka Integration Complete Guide [#50-7]
이 글의 핵심
Complete message queue guide: Decouple services with AMQP and Kafka, producers and consumers, serialization, backpressure, performance comparison, real-world examples, and production patterns.
Introduction: “Sync calls are the bottleneck”
If service A calls B over HTTP and waits, A pays B’s full latency (DB, external APIs, heavy CPU). Message queues let A publish work and return quickly—decoupling producers and consumers and buffering spikes. Topics:
- RabbitMQ (AMQP) — exchanges, queues, acks (e.g. SimpleAmqpClient patterns)
- Kafka — topics, partitions, consumer groups (e.g. librdkafka)
- Serialization — JSON, Protobuf
- Errors, tuning, production patterns Environment: C++17+.
Table of contents
- When queues help
- RabbitMQ vs Kafka
- RabbitMQ implementation
- Kafka implementation
- Serialization strategies
- Real-world examples
- Performance comparison
- Error handling
- Common mistakes
- Best practices
- Production patterns
1. When queues help
Use cases
- Order pipeline — user gets immediate ACK; payment, stock, email, analytics run asynchronously.
- Traffic spikes — queue absorbs bursts; workers drain at sustainable rate.
- Microservices — publish OrderCreated; downstream services subscribe independently.
- Centralized logs/metrics — Kafka-style log aggregation.
- Heavy jobs — image resize, encoding, fan-out to worker pools.
Architecture comparison
아래 코드는 mermaid를 사용한 구현 예제입니다. 비동기 처리를 통해 효율적으로 작업을 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
flowchart LR
subgraph sync["Synchronous (HTTP)"]
A1[Service A] -->|wait| B1[Service B]
B1 -->|wait| C1[Service C]
end
subgraph async["Asynchronous (Queue)"]
A2[Service A] -->|publish| Q[Queue]
Q --> B2[Service B]
Q --> C2[Service C]
end
Benefits:
- Decoupling: Services don’t need to know about each other
- Buffering: Handle traffic spikes
- Reliability: Retry failed messages
- Scalability: Add more consumers
2. RabbitMQ vs Kafka
| RabbitMQ | Kafka | |
|---|---|---|
| Model | Classic broker, routing | Durable log / streaming |
| Retention | Often delete after ack | Time/size retention |
| Throughput | Very high (100k+ msg/s) | Extremely high (1M+ msg/s) |
| Ordering | Per queue | Per partition |
| Latency | Low (ms) | Low-medium (ms-10ms) |
| Use case | RPC-ish, work queues | Event logs, analytics |
| Replay | Limited | Full replay support |
| Routing | Flexible (exchanges) | Simple (topics) |
When to use RabbitMQ
- Traditional messaging patterns
- Complex routing (topic, fanout, direct)
- RPC-style request/response
- Lower message volume (<100k msg/s)
- Message acknowledgment critical
When to use Kafka
- Event sourcing
- Log aggregation
- High throughput (>100k msg/s)
- Message replay needed
- Stream processing
- Multiple consumers reading same data
3. RabbitMQ implementation
Installation
아래 코드는 bash를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
# Ubuntu
sudo apt-get install rabbitmq-server
# Docker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# C++ client
sudo apt-get install librabbitmq-dev
# Or use AMQP-CPP: https://github.com/CopernicaMarketingSoftware/AMQP-CPP
Producer example
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <string>
class RabbitMQProducer {
amqp_connection_state_t conn_;
amqp_socket_t* socket_;
std::string exchange_;
public:
RabbitMQProducer(const std::string& host, int port, const std::string& exchange)
: exchange_(exchange) {
conn_ = amqp_new_connection();
socket_ = amqp_tcp_socket_new(conn_);
if (amqp_socket_open(socket_, host.c_str(), port) != 0) {
throw std::runtime_error("Cannot open socket");
}
amqp_rpc_reply_t reply = amqp_login(conn_, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Login failed");
}
amqp_channel_open(conn_, 1);
amqp_get_rpc_reply(conn_);
}
~RabbitMQProducer() {
amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn_);
}
void publish(const std::string& routing_key, const std::string& message) {
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2; // Persistent
amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
int result = amqp_basic_publish(
conn_,
1,
amqp_cstring_bytes(exchange_.c_str()),
amqp_cstring_bytes(routing_key.c_str()),
0,
0,
&props,
message_bytes
);
if (result != 0) {
throw std::runtime_error("Publish failed");
}
}
};
Consumer example
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class RabbitMQConsumer {
amqp_connection_state_t conn_;
std::string queue_;
public:
RabbitMQConsumer(const std::string& host, int port, const std::string& queue)
: queue_(queue) {
conn_ = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn_);
amqp_socket_open(socket, host.c_str(), port);
amqp_login(conn_, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
amqp_channel_open(conn_, 1);
amqp_get_rpc_reply(conn_);
// Set prefetch count
amqp_basic_qos(conn_, 1, 0, 10, 0);
}
~RabbitMQConsumer() {
amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn_);
}
void consume(std::function<bool(const std::string&)> handler) {
amqp_basic_consume(conn_, 1, amqp_cstring_bytes(queue_.c_str()),
amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while (true) {
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn_);
amqp_rpc_reply_t result = amqp_consume_message(conn_, &envelope, NULL, 0);
if (result.reply_type == AMQP_RESPONSE_NORMAL) {
std::string message(
static_cast<char*>(envelope.message.body.bytes),
envelope.message.body.len
);
bool success = handler(message);
if (success) {
amqp_basic_ack(conn_, 1, envelope.delivery_tag, 0);
} else {
amqp_basic_nack(conn_, 1, envelope.delivery_tag, 0, 1);
}
amqp_destroy_envelope(&envelope);
}
}
}
};
4. Kafka implementation
Installation
아래 코드는 bash를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
# Install librdkafka
sudo apt-get install librdkafka-dev
# Or build from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
Producer example
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
class KafkaProducer {
std::unique_ptr<RdKafka::Producer> producer_;
std::unique_ptr<RdKafka::Topic> topic_;
public:
KafkaProducer(const std::string& brokers, const std::string& topic) {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("client.id", "cpp-producer", errstr);
producer_.reset(RdKafka::Producer::create(conf, errstr));
if (!producer_) {
throw std::runtime_error("Failed to create producer: " + errstr);
}
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
topic_.reset(RdKafka::Topic::create(producer_.get(), topic, tconf, errstr));
delete conf;
delete tconf;
}
void produce(const std::string& key, const std::string& message) {
RdKafka::ErrorCode resp = producer_->produce(
topic_.get(),
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(message.c_str()),
message.size(),
&key,
nullptr
);
if (resp != RdKafka::ERR_NO_ERROR) {
throw std::runtime_error("Produce failed: " + RdKafka::err2str(resp));
}
producer_->poll(0);
}
void flush(int timeout_ms = 10000) {
producer_->flush(timeout_ms);
}
};
Consumer example
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class KafkaConsumer {
std::unique_ptr<RdKafka::KafkaConsumer> consumer_;
public:
KafkaConsumer(const std::string& brokers, const std::string& group_id,
const std::vector<std::string>& topics) {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("enable.auto.commit", "false", errstr);
consumer_.reset(RdKafka::KafkaConsumer::create(conf, errstr));
if (!consumer_) {
throw std::runtime_error("Failed to create consumer: " + errstr);
}
RdKafka::ErrorCode err = consumer_->subscribe(topics);
if (err) {
throw std::runtime_error("Subscribe failed: " + RdKafka::err2str(err));
}
delete conf;
}
void consume(std::function<bool(const std::string&, const std::string&)> handler) {
while (true) {
RdKafka::Message* message = consumer_->consume(1000);
if (message->err() == RdKafka::ERR_NO_ERROR) {
std::string key = message->key() ? *message->key() : "";
std::string payload(
static_cast<const char*>(message->payload()),
message->len()
);
bool success = handler(key, payload);
if (success) {
consumer_->commitSync(message);
}
}
delete message;
}
}
};
5. Serialization strategies
JSON serialization
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
#include <nlohmann/json.hpp>
struct Order {
std::string id;
std::string customer;
double amount;
std::string to_json() const {
nlohmann::json j;
j[id] = id;
j[customer] = customer;
j[amount] = amount;
return j.dump();
}
static Order from_json(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
Order order;
order.id = j[id];
order.customer = j[customer];
order.amount = j[amount];
return order;
}
};
Protobuf serialization
아래 코드는 protobuf를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// order.proto
syntax = "proto3";
message Order {
string id = 1;
string customer = 2;
double amount = 3;
int64 timestamp = 4;
}
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
#include "order.pb.h"
std::string serialize_order(const Order& order) {
OrderProto proto;
proto.set_id(order.id);
proto.set_customer(order.customer);
proto.set_amount(order.amount);
proto.set_timestamp(std::time(nullptr));
std::string serialized;
proto.SerializeToString(&serialized);
return serialized;
}
Order deserialize_order(const std::string& data) {
OrderProto proto;
proto.ParseFromString(data);
Order order;
order.id = proto.id();
order.customer = proto.customer();
order.amount = proto.amount();
return order;
}
6. Real-world examples
Example 1: Order processing system
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 비동기 처리를 통해 효율적으로 작업을 수행합니다, 에러 처리를 통해 안정성을 확보합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class OrderProcessor {
KafkaProducer producer_;
public:
OrderProcessor() : producer_("localhost:9092", "orders") {}
void create_order(const Order& order) {
// Publish to Kafka
std::string message = order.to_json();
producer_.produce(order.id, message);
producer_.flush();
std::cout << "Order published: " << order.id << "\n";
}
};
class PaymentService {
KafkaConsumer consumer_;
public:
PaymentService()
: consumer_("localhost:9092", "payment-service", {"orders"}) {}
void start() {
consumer_.consume([this](const std::string& key, const std::string& payload) {
try {
Order order = Order::from_json(payload);
process_payment(order);
return true;
} catch (const std::exception& e) {
std::cerr << "Payment failed: " << e.what() << "\n";
return false;
}
});
}
private:
void process_payment(const Order& order) {
std::cout << "Processing payment for order: " << order.id << "\n";
// Payment logic...
}
};
Example 2: Log aggregation
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class LogAggregator {
KafkaProducer producer_;
public:
LogAggregator() : producer_("localhost:9092", "logs") {}
void log(const std::string& level, const std::string& message) {
nlohmann::json log_entry;
log_entry[timestamp] = std::time(nullptr);
log_entry[level] = level;
log_entry[message] = message;
log_entry[service] = "myapp";
producer_.produce("", log_entry.dump());
}
};
7. Performance comparison
Throughput benchmark
아래 코드는 code를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
Messages/sec | RabbitMQ | Kafka
-------------|----------|-------
10,000 | 5ms | 2ms
100,000 | 50ms | 10ms
1,000,000 | 500ms | 50ms
Latency benchmark
아래 코드는 code를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
Percentile | RabbitMQ | Kafka
-----------|----------|-------
p50 | 1ms | 2ms
p95 | 5ms | 10ms
p99 | 20ms | 50ms
8. Error handling
Retry logic
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 비동기 처리를 통해 효율적으로 작업을 수행합니다, 에러 처리를 통해 안정성을 확보합니다, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class RetryHandler {
int max_retries_;
std::chrono::milliseconds backoff_;
public:
RetryHandler(int max_retries, std::chrono::milliseconds backoff)
: max_retries_(max_retries), backoff_(backoff) {}
bool handle_message(const std::string& message) {
for (int attempt = 0; attempt < max_retries_; ++attempt) {
try {
process(message);
return true;
} catch (const std::exception& e) {
std::cerr << "Attempt " << attempt + 1 << " failed: " << e.what() << "\n";
if (attempt < max_retries_ - 1) {
std::this_thread::sleep_for(backoff_ * (attempt + 1));
}
}
}
// Send to dead letter queue
send_to_dlq(message);
return false;
}
};
Dead letter queue
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class DeadLetterQueue {
KafkaProducer dlq_producer_;
public:
DeadLetterQueue() : dlq_producer_("localhost:9092", "dlq") {}
void send(const std::string& original_topic, const std::string& message,
const std::string& error) {
nlohmann::json dlq_message;
dlq_message[original_topic] = original_topic;
dlq_message[message] = message;
dlq_message[error] = error;
dlq_message[timestamp] = std::time(nullptr);
dlq_producer_.produce("", dlq_message.dump());
}
};
9. Common mistakes
Mistake 1: Not handling backpressure
아래 코드는 cpp를 사용한 구현 예제입니다. 반복문으로 데이터를 처리합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// ❌ BAD: Producer overwhelms consumer
while (true) {
producer.produce(message); // No rate limiting
}
// ✅ GOOD: Rate limiting
RateLimiter limiter(1000); // 1000 msg/s
while (true) {
limiter.wait();
producer.produce(message);
}
Mistake 2: Not committing offsets
아래 코드는 cpp를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// ❌ BAD: Auto-commit may lose messages
consumer.consume([](const std::string& msg) {
process(msg); // If crash here, message lost
});
// ✅ GOOD: Manual commit after processing
consumer.consume([&](const std::string& msg) {
process(msg);
consumer.commit(); // Commit after success
});
10. Best practices
- Idempotent consumers: Handle duplicate messages
- Schema evolution: Use Protobuf or Avro
- Monitoring: Track lag, throughput, errors
- Partitioning: Design partition keys carefully
- Backpressure: Implement rate limiting
- Dead letter queues: Handle poison messages
- Graceful shutdown: Flush producers, close consumers
- Testing: Use embedded brokers for tests
11. Production patterns
Pattern 1: Circuit breaker
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
class CircuitBreaker {
enum State { CLOSED, OPEN, HALF_OPEN };
State state_ = CLOSED;
int failure_count_ = 0;
int threshold_ = 5;
public:
bool allow_request() {
if (state_ == OPEN) {
// Check if should try again
return false;
}
return true;
}
void on_success() {
failure_count_ = 0;
state_ = CLOSED;
}
void on_failure() {
failure_count_++;
if (failure_count_ >= threshold_) {
state_ = OPEN;
}
}
};
Summary
- RabbitMQ: Classic messaging, flexible routing, RPC patterns
- Kafka: High-throughput streaming, event sourcing, replay
- Serialization: JSON for simplicity, Protobuf for performance
- Error handling: Retries, dead letter queues, circuit breakers
- Production: Monitoring, backpressure, graceful shutdown
Next: Monitoring dashboard (#50-6)
Previous: Production deployment (#50-5)
Keywords
C++ message queue, RabbitMQ, Kafka, AMQP, async messaging, event-driven architecture, microservices