[2026] C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달
이 글의 핵심
C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머 그룹·오프셋 관리 구현. Connection timeout·메시지 유실·리밸런싱 등 흔한 에러 해결, 배치·압축 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: C++에서 Kafka를 왜 쓰나요?
실제 겪는 문제 시나리오
시나리오 1: 로그 수집 시 DB 부하 폭증
여러 마이크로서비스에서 로그를 중앙 DB에 직접 INSERT합니다. 트래픽이 늘면 DB 연결 수가 폭증하고, 로그 쓰기 지연으로 애플리케이션 응답이 느려집니다. “로그 때문에 메인 서비스가 죽어요.”
시나리오 2: 주문 이벤트를 여러 서비스에 전달
주문 완료 시 재고 차감, 포인트 적립, 알림 발송 등 여러 서비스에 이벤트를 보내야 합니다. HTTP로 순차 호출하면 지연이 누적되고, 한 서비스 장애 시 전체가 멈춥니다. “어떤 서비스가 느리면 전체 주문 처리가 막혀요.”
시나리오 3: 실시간 분석 파이프라인
클릭 스트림, 센서 데이터를 실시간으로 수집해 분석 엔진에 전달해야 합니다. 폴링 방식은 지연이 크고, 직접 DB 조회는 부하가 큽니다. “1초 단위로 집계해야 하는데 10초씩 밀려요.”
시나리오 4: librdkafka 도입 후 “Connection refused” 에러
Kafka 브로커 주소를 설정했는데 연결이 안 됩니다. localhost:9092 vs 127.0.0.1:9092, 방화벽, Docker 네트워크 등 확인할 것이 많아 막막합니다.
시나리오 5: 컨슈머 재시작 시 메시지 중복 처리
컨슈머가 메시지 처리 중 크래시했습니다. 재시작 후 같은 메시지를 다시 읽어 중복 처리됩니다. “주문이 두 번 차감돼요.”
시나리오 6: 파티션 리밸런싱 시 처리 중단
컨슈머 그룹에 새 인스턴스를 추가했더니 기존 컨슈머가 담당하던 파티션이 재할당됩니다. 리밸런싱 중 메시지 처리 순서가 꼬이거나 유실될 수 있습니다.
Kafka로 해결:
- 이벤트 스트리밍: 프로듀서가 메시지를 토픽에 발행, 컨슈머가 구독. DB에 직접 쓰지 않아 메인 서비스 부하 감소
- 비동기·디커플링: 프로듀서는 발행만 하고, 여러 컨슈머가 각자 처리. 한 서비스 장애가 다른 서비스에 영향 없음
- 오프셋 관리: 컨슈머 그룹이 읽은 위치를 커밋해, 재시작 시 이어서 처리
- 파티션·스케일아웃: 파티션 수만큼 컨슈머를 늘려 처리량 확장 다음은 mermaid를 활용한 상세한 구현 코드입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
flowchart LR
subgraph Producer["프로듀서 (C++)"]
P1[앱 로그]
P2[주문 이벤트]
P3[센서 데이터]
end
subgraph Kafka[Apache Kafka]
T1[logs 토픽]
T2[orders 토픽]
T3[events 토픽]
end
subgraph Consumer["컨슈머 (C++)"]
C1[로그 저장]
C2[재고 차감]
C3[실시간 분석]
end
P1 --> T1
P2 --> T2
P3 --> T3
T1 --> C1
T2 --> C2
T3 --> C3
Kafka 프로듀서-컨슈머 흐름
다음은 mermaid를 활용한 상세한 구현 코드입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
sequenceDiagram
participant P as 프로듀서
participant B as 브로커
participant C as 컨슈머
P->>B: produce(topic, key, value)
B->>B: 파티션에 저장
B->>P: dr_cb (delivery report)
C->>B: subscribe(topic)
B->>C: 파티션 할당
loop consume
C->>B: poll() → 메시지 수신
C->>C: 비즈니스 로직 처리
C->>B: commit() 오프셋
end
RabbitMQ vs Kafka 비교
| 항목 | RabbitMQ | Kafka |
|---|---|---|
| 모델 | 큐, Exchange | 토픽, 파티션 |
| 메시지 보존 | 소비 후 삭제 (기본) | 보존 기간 동안 유지 |
| 재처리 | 별도 구현 | 오프셋 이동으로 가능 |
| 처리량 | 수만 msg/s | 수백만 msg/s |
| C++ 클라이언트 | rabbitmq-c | librdkafka |
| 이 글에서 다루는 것: |
- librdkafka 설치 및 CMake 연동
- 완전한 프로듀서·컨슈머 C++ 예제
- 자주 발생하는 에러와 해결법
- 성능 최적화 (배치, 압축, 파티션)
- 프로덕션 배포 패턴
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
1. 환경 설정
필수 의존성
| 항목 | 버전 | 비고 |
|---|---|---|
| C++ | C++14 이상 | C++17 권장 |
| librdkafka | 2.0+ | vcpkg, Homebrew, 또는 소스 빌드 |
| Apache Kafka | 2.8+ | 브로커 (Docker 권장) |
| CMake | 3.16+ | FindPackage 지원 |
Kafka 브로커 실행 (Docker)
아래 코드는 bash를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
apache/kafka:3.6
# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
librdkafka 설치
아래 코드는 bash를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
# vcpkg (권장)
vcpkg install rdkafka
# macOS (Homebrew)
brew install librdkafka
# Ubuntu/Debian
sudo apt-get install librdkafka-dev
CMakeLists.txt 기본 설정
아래 코드는 cmake를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(RdKafka CONFIG REQUIRED)
add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)
add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)
주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.
2. 완전한 프로듀서 예제
2.1 최소 동작 프로듀서
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
int main(int argc, char** argv) {
std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
std::string topic = argc > 2 ? argv[2] : "app-logs";
std::string errstr;
// 1. 전역 설정
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
// 2. 프로듀서 생성
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
// 3. 메시지 발행
std::string payload = "Hello, Kafka from C++!";
RdKafka::ErrorCode err = producer->produce(
topic,
RdKafka::Topic::PARTITION_UA, // 파티션 자동 할당
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()),
payload.size(),
"key1", 5, // 키 (선택)
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
} else {
std::cout << "메시지 발행 완료" << std::endl;
}
// 4. 대기 중인 delivery report 처리
while (producer->outq_len() > 0) {
producer->poll(100);
}
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
PARTITION_UA: 파티션을 자동 선택 (키 해시 또는 라운드로빈)RK_MSG_COPY: 페이로드를 복사해 저장 (호출 후 버퍼 수정 가능)outq_len(): 전송 대기 중인 메시지 수. 0이 될 때까지poll()호출 필요
2.2 Delivery Report 콜백 (실전용)
발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다. 다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) {
std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
} else {
std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
<< msg.partition() << "] @ " << msg.offset() << std::endl;
}
}
};
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
DeliveryReportCb dr_cb;
conf->set("dr_cb", &dr_cb, errstr);
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
std::string payload = "Test message with delivery report";
producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()), payload.size(),
nullptr, 0, 0, nullptr);
// poll을 호출해야 dr_cb가 실행됨
for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
producer->poll(100);
}
producer->flush(5000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다. 프로듀서 스레드에서 루프로 poll(100)을 호출하는 것이 일반적입니다.
2.3 RAII 래퍼 클래스 (재사용 가능)
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// kafka_producer.hpp
#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <memory>
#include <stdexcept>
#include <string>
class KafkaProducer {
public:
KafkaProducer(const std::string& brokers,
const std::string& client_id = "cpp-producer") {
std::string errstr;
conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
conf_->set("bootstrap.servers", brokers, errstr);
conf_->set("client.id", client_id, errstr);
conf_->set("dr_cb", &dr_cb_, errstr);
producer_.reset(RdKafka::Producer::create(conf_.get(), errstr));
if (!producer_) {
throw std::runtime_error("프로듀서 생성 실패: " + errstr);
}
}
void produce(const std::string& topic, const std::string& key,
const std::string& value) {
RdKafka::ErrorCode err = producer_->produce(
topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(value.data()), value.size(),
key.empty() ? nullptr : key.data(), key.size(),
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
throw std::runtime_error("발행 실패: " + RdKafka::err2str(err));
}
producer_->poll(0);
}
void flush(int timeout_ms = 10000) {
producer_->flush(timeout_ms);
}
void poll(int timeout_ms = 0) {
producer_->poll(timeout_ms);
}
private:
struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) {
std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
}
}
} dr_cb_;
std::unique_ptr<RdKafka::Conf> conf_;
std::unique_ptr<RdKafka::Producer> producer_;
};
3. 완전한 컨슈머 예제
3.1 최소 동작 컨슈머
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sigterm_handler(int) { run = 0; }
int main(int argc, char** argv) {
std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
std::string topic = argc > 3 ? argv[3] : "app-logs";
std::string errstr;
signal(SIGINT, sigterm_handler);
signal(SIGTERM, sigterm_handler);
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
conf->set("enable.auto.commit", "false", errstr); // 수동 커밋
RdKafka::KafkaConsumer* consumer =
RdKafka::KafkaConsumer::create(conf, errstr);
delete conf;
if (!consumer) {
std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
return 1;
}
std::vector<std::string> topics = {topic};
RdKafka::ErrorCode err = consumer->subscribe(topics);
if (err) {
std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
delete consumer;
return 1;
}
std::cout << "메시지 수신 대기 중....(Ctrl+C로 종료)" << std::endl;
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
std::cout << "[" << msg->topic_name() << ":" << msg->partition()
<< "@" << msg->offset() << "] "
<< std::string(static_cast<const char*>(msg->payload()),
msg->len())
<< std::endl;
consumer->commit(msg); // 오프셋 커밋
break;
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR__PARTITION_EOF:
break;
default:
std::cerr << "소비 에러: " << msg->errstr() << std::endl;
run = 0;
break;
}
delete msg;
}
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
enable.auto.commit=false: 메시지 처리 완료 후 수동으로commit()호출. 처리 중 크래시 시 재처리 가능consume(1000): 1초 타임아웃. 메시지 없으면ERR__TIMED_OUT반환commit(msg): 해당 메시지의 오프셋까지 커밋
3.2 리밸런싱 콜백 (컨슈머 그룹)
파티션 재할당 시 리밸런싱 콜백에서 assign/unassign를 처리해야 합니다.
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// consumer_rebalance.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <signal.h>
static volatile sig_atomic_t run = 1;
class RebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) override {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
std::cerr << "[리밸런싱] 파티션 할당: ";
for (auto* p : partitions)
std::cerr << p->topic() << "[" << p->partition() << "] ";
std::cerr << std::endl;
} else {
consumer->unassign();
std::cerr << "[리밸런싱] 파티션 해제" << std::endl;
}
}
};
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
conf->set("group.id", "cpp-group", errstr);
conf->set("enable.auto.commit", "false", errstr);
RebalanceCb rebalance_cb;
conf->set("rebalance_cb", &rebalance_cb, errstr);
RdKafka::KafkaConsumer* consumer =
RdKafka::KafkaConsumer::create(conf, errstr);
delete conf;
consumer->subscribe({"app-logs"});
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << std::string(static_cast<const char*>(msg->payload()),
msg->len()) << std::endl;
consumer->commit(msg);
}
delete msg;
}
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
3.3 이벤트 콜백 (에러·통계)
다음은 cpp를 활용한 상세한 구현 코드입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// EventCb: 연결 에러, throttle 등 수신
class EventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) override {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
if (event.fatal()) {
std::cerr << "치명적 에러: " << event.str() << std::endl;
} else {
std::cerr << "에러: " << event.str() << std::endl;
}
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "Throttled: " << event.throttle_time() << "ms by "
<< event.broker_name() << std::endl;
break;
default:
break;
}
}
};
// 설정 시
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);
4. 자주 발생하는 에러와 해결법
에러 1: “Connection refused” / “Broker: Connection refused”
증상: 프로듀서/컨슈머 생성은 되지만 메시지 발행·소비 시 연결 실패. 원인:
- Kafka 브로커가 실행 중이 아님
- 잘못된 주소/포트
- Docker 환경에서
localhostvs127.0.0.1혼동 - 방화벽 차단 해결법: 아래 코드는 bash를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
# 브로커 실행 확인
docker ps | grep kafka
# Docker 내부에서 접속 시
# bootstrap.servers = localhost:9092 (호스트에서)
# bootstrap.servers = host.docker.internal:9092 (Docker 컨테이너에서 호스트 접속)
아래 코드는 cpp를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 여러 브로커 지정 (고가용성)
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);
// ✅ 연결 타임아웃 설정
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);
에러 2: “Topic ‘xxx’ not present in metadata”
증상: produce() 호출 시 ERR__UNKNOWN_TOPIC 또는 토픽을 찾을 수 없음.
원인: 토픽이 아직 생성되지 않았거나, auto.create.topics.enable=true인데 브로커가 아직 메타데이터를 갱신하지 않음.
해결법:
# 토픽 사전 생성
kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
// ✅ 토픽 자동 생성 대기 (metadata.max.age.ms)
conf->set("metadata.max.age.ms", "5000", errstr);
에러 3: “Message delivery failed: Broker: Message size too large”
증상: 큰 메시지 발행 시 실패.
원인: 브로커의 message.max.bytes 설정보다 메시지가 큼.
해결법:
// 프로듀서: 메시지 크기 제한
conf->set("message.max.bytes", "10485760", errstr); // 10MB
// 브로커 설정: message.max.bytes, replica.fetch.max.bytes 동일하게
에러 4: “Commit failed: Local: No offset stored”
증상: consumer->commit(msg) 호출 시 에러.
원인: enable.auto.commit=true이면서 수동 commit()을 호출하거나, 아직 커밋할 오프셋이 없음.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 수동 커밋 사용 시
conf->set("enable.auto.commit", "false", errstr);
// ✅ commitSync 호출
consumer->commit(msg);
// 또는
consumer->commitSync(); // 현재 오프셋까지 커밋
에러 5: 메시지 중복 처리
증상: 컨슈머 재시작 후 같은 메시지를 다시 처리. 원인: 메시지 처리 완료 전에 커밋하거나, 처리 후 커밋 전에 크래시. 해결법: 아래 코드는 cpp를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// ✅ 처리 완료 후 커밋 (at-least-once)
void process_msg(RdKafka::Message* msg) {
// 1. 비즈니스 로직 처리
doBusinessLogic(msg);
// 2. 처리 성공 후 커밋
consumer->commit(msg);
}
// ❌ 나쁜 예: 처리 전 커밋
consumer->commit(msg); // 커밋
doBusinessLogic(msg); // 처리 중 크래시 시 재처리됨
정확히 한 번 전달이 필요하면 프로듀서의 enable.idempotence=true와 트랜잭션을 사용합니다. Kafka 고급(#52-6)에서 다룹니다.
에러 6: “Out of memory” / “Queue full”
증상: 프로듀서가 produce() 호출 시 ERR__QUEUE_FULL 반환.
원인: 브로커 전송 속도보다 발행 속도가 빠름. 내부 큐가 가득 참.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 반복문으로 데이터를 처리합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 큐 크기 증가
conf->set("queue.buffering.max.messages", "1000000", errstr);
conf->set("queue.buffering.max.kbytes", "1048576", errstr); // 1GB
// ✅ 발행 시 블로킹 대기
while (producer->outq_len() > 10000) {
producer->poll(100);
}
producer->produce(...);
에러 7: “Broker: Not leader for partition”
증상: 리더가 변경된 파티션에 발행 시도. 원인: 브로커 장애·리밸런싱으로 리더가 변경됨. librdkafka는 자동으로 새 리더로 재시도합니다. 해결법:
// ✅ 재시도 설정 (기본값으로 충분)
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "100", errstr);
에러 8: “Consumer group is rebalancing”
증상: 컨슈머가 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS.
원인: 컨슈머 그룹에 인스턴스 추가/제거로 리밸런싱 중.
해결법:
// ✅ 리밸런싱 콜백에서 assign 후 재처리
// rebalance_cb에서 파티션 해제 시 처리 중인 메시지 정리
// assign 후 새 파티션에서 consume 재개
에러 9: DeliveryReportCb가 호출되지 않음
증상: dr_cb가 한 번도 실행되지 않음.
원인: poll()을 호출하지 않음. delivery report는 poll() 호출 시 처리됩니다.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 반복문으로 데이터를 처리합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 프로듀서 스레드에서 주기적 poll
void producer_loop() {
while (running) {
producer->produce(...);
producer->poll(100); // 필수!
}
producer->flush(10000);
}
5. 성능 최적화
5.1 배치 발행 (Linger)
메시지를 모아서 한 번에 전송하면 RTT를 줄일 수 있습니다. 아래 코드는 cpp를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// 배치 대기 시간 (ms)
conf->set("linger.ms", "5", errstr); // 5ms 대기 후 전송
// 배치 크기 (bytes)
conf->set("batch.size", "16384", errstr); // 16KB
주의: linger.ms가 크면 지연이 증가합니다. 처리량 vs 지연 트레이드오프를 고려하세요.
5.2 압축
네트워크 대역폭을 줄입니다.
conf->set("compression.type", "gzip", errstr);
// none, gzip, snappy, lz4, zstd
| 압축 | 속도 | 압축률 | CPU |
|---|---|---|---|
| none | 빠름 | 없음 | 낮음 |
| snappy | 빠름 | 중간 | 중간 |
| lz4 | 빠름 | 좋음 | 중간 |
| gzip | 느림 | 높음 | 높음 |
| zstd | 중간 | 매우 높음 | 중간 |
5.3 프로듀서 풀링
아래 코드는 cpp를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// ❌ 나쁜 예: 매 요청마다 새 프로듀서
void handle_request() {
auto producer = create_producer();
producer->produce(...);
delete producer;
}
// ✅ 좋은 예: 프로듀서 재사용 (싱글톤 또는 풀)
static KafkaProducer* get_producer() {
static KafkaProducer producer("localhost:9092");
return &producer;
}
5.4 파티션 수와 컨슈머 수
- 파티션 수 ≥ 컨슈머 수일 때만 모든 컨슈머가 활용됩니다.
- 파티션 수를 늘리면 처리량은 늘지만, 순서 보장은 파티션 내에서만입니다.
// 키가 같은 메시지는 같은 파티션으로
producer->produce(topic, partition, ..., key, key_len, ...);
// partition = PARTITION_UA면 키 해시로 파티션 선택
5.5 성능 비교 (참고)
| 설정 | 초당 메시지 (대략) | 지연 |
|---|---|---|
| linger.ms=0, batch 작음 | 5만 | 1ms |
| linger.ms=5, batch 16KB | 50만 | 5ms |
| linger.ms=5, 압축 | 30만 | 5ms |
6. 프로덕션 패턴
6.1 Graceful Shutdown
다음은 cpp를 활용한 상세한 구현 코드입니다. 반복문으로 데이터를 처리합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
static std::atomic<bool> running{true};
void sig_handler(int) {
running = false;
}
int main() {
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
// ....프로듀서 생성
while (running) {
producer->produce(...);
producer->poll(100);
}
// 종료 전 모든 메시지 전송 완료 대기
producer->flush(10000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
6.2 메시지 키 설계
다음은 간단한 cpp 코드 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// 파티션 내 순서 보장: 같은 키 → 같은 파티션
// 예: 사용자별 이벤트 순서
std::string key = "user:" + std::to_string(user_id);
producer->produce(topic, RdKafka::Topic::PARTITION_UA, ..., key.data(), key.size(), ...);
6.3 헬스 체크
아래 코드는 cpp를 사용한 구현 예제입니다. 조건문으로 분기 처리를 수행합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// 프로듀서: 메타데이터 조회로 브로커 연결 확인
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = producer->metadata(true, nullptr, &metadata, 5000);
bool healthy = (err == RdKafka::ERR_NO_ERROR && metadata);
if (metadata) delete metadata;
6.4 환경 변수 기반 설정
아래 코드는 cpp를 사용한 구현 예제입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
struct KafkaConfig {
std::string brokers = "localhost:9092";
std::string group_id = "cpp-consumer";
int linger_ms = 5;
};
KafkaConfig load_config() {
KafkaConfig c;
if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
return c;
}
6.5 로깅 (debug)
// 문제 디버깅 시
conf->set("debug", "broker,topic,msg", errstr);
// broker, topic, msg, protocol, cgrp 등
6.6 SSL/TLS (프로덕션)
다음은 간단한 cpp 코드 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
conf->set("security.protocol", "ssl", errstr);
conf->set("ssl.ca.location", "/path/to/ca-cert", errstr);
conf->set("ssl.certificate.location", "/path/to/client-cert", errstr);
conf->set("ssl.key.location", "/path/to/client-key", errstr);
6.7 SASL 인증
다음은 간단한 cpp 코드 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
conf->set("security.protocol", "sasl_plaintext", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", "user", errstr);
conf->set("sasl.password", "pass", errstr);
7. 구현 체크리스트
환경 설정
- Kafka 브로커 실행 확인
- 토픽 사전 생성 (또는 auto.create 설정)
- librdkafka 설치 및 CMake 연동
프로듀서
-
bootstrap.servers설정 -
dr_cb등록 및poll()주기적 호출 -
flush()또는outq_len()==0대기 후 종료 - 배치·압축 설정 (성능 요구 시)
컨슈머
-
group.id설정 -
enable.auto.commit=false시 수동commit() -
rebalance_cb등록 (컨슈머 그룹 사용 시) -
event_cb등록 (에러 모니터링)
에러 처리
-
produce()반환값 검사 -
msg->err()검사 -
ERR__QUEUE_FULL시 대기 또는 백프레셔
프로덕션
- Graceful Shutdown (flush, wait_destroyed)
- SSL/TLS, SASL (필요 시)
- 환경 변수로 설정 외부화
정리
| 항목 | 요약 |
|---|---|
| librdkafka | C/C++ Kafka 클라이언트의 사실상 표준 |
| 프로듀서 | Conf → Producer::create → produce → poll → flush |
| 컨슈머 | Conf → KafkaConsumer::create → subscribe → consume → commit |
| 오프셋 | 수동 커밋으로 메시지 처리 완료 후 commit |
| 에러 | Connection refused, Topic not found, Queue full, Rebalance |
| 성능 | linger.ms, batch.size, 압축, 프로듀서 재사용 |
| 프로덕션 | Graceful Shutdown, SSL/TLS, SASL, 환경 변수 |
| 핵심 원칙: |
- poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
- 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
- 리밸런싱: rebalance_cb에서 assign/unassign 처리
- 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지
자주 묻는 질문 (FAQ)
Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?
A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.
Q. 정확히 한 번 전달(at-least-once vs exactly-once)은 어떻게 하나요?
A. at-least-once: 처리 후 커밋. 중복 가능성 있음. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. Kafka 고급(#52-6)에서 다룹니다.
Q. 메시지 순서가 보장되나요?
A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.
Q. librdkafka는 스레드 안전한가요?
A. 네. librdkafka API는 스레드 안전합니다. 단, 콜백 객체(dr_cb, rebalance_cb 등)는 프로듀서/컨슈머 생성 시점에 등록되어야 하며, 수명이 프로듀서/컨슈머보다 길어야 합니다. 한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머를 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다. 다음 글: Kafka 고급: 스트림 처리·트랜잭션·정확히 한 번(#52-6) 이전 글: C++ 시리즈 목차
참고 자료
- librdkafka 공식 문서
- Apache Kafka 문서
- 메시지 큐 개요(#50-7) — Kafka vs RabbitMQ