[2026] C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]
이 글의 핵심
C++ Kafka 심화: librdkafka 스트림 처리 패턴, 트랜잭션 프로듀서, 정확히 한 번 전달, KSQL 대안. 실무 문제 시나리오, 완전한 예제, 자주 발생하는 에러, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: “스트림 처리·정확히 한 번이 막막해요”
실제 겪는 문제 시나리오
Kafka 기본(#52-5)에서 프로듀서·컨슈머·오프셋·리밸런싱을 다뤘다면, 이 글에서는 고급 기능을 다룹니다. 실무에서 자주 맞닥뜨리는 문제와 해결 방법을 제시합니다. 시나리오 1: 토픽 A → 처리 → 토픽 B 파이프라인
상황: 클릭 로그를 읽어 집계 후 요약 토픽에 저장하는 파이프라인
문제: C++에서 Kafka Streams/KSQL 없이 어떻게 구현하나?
결과: 컨슈머-프로듀서 패턴으로 read → transform → produce
시나리오 2: 주문 이벤트 중복 처리로 재고가 음수
상황: 컨슈머 재시작 시 같은 메시지를 다시 처리해 재고가 두 번 차감됨
문제: at-least-once만으로는 중복 제거 불가
결과: 멱등성 키 설계 또는 정확히 한 번(idempotence + 트랜잭션) 적용
시나리오 3: 여러 토픽에 원자적으로 쓰기
상황: 주문 생성 시 orders 토픽 + order-events 토픽에 동시에 저장해야 함
문제: 중간에 실패하면 일부만 반영되어 데이터 불일치
결과: 트랜잭션 프로듀서로 여러 토픽에 원자적 발행
시나리오 4: 실시간 윈도우 집계
상황: 1분 단위로 API 호출 수를 집계해 대시보드에 표시
문제: C++에서 Kafka Streams 없이 윈도우 집계를 어떻게?
결과: 시간 윈도우 버퍼 + 주기적 flush로 시뮬레이션
시나리오 5: 컨슈머 처리 중 오프셋 커밋 타이밍
상황: 배치 처리 후 커밋 vs 메시지별 커밋 시 리밸런싱 시 중복 유실
문제: 리밸런싱 시 처리 중인 메시지가 다른 컨슈머로 넘어감
결과: 처리 완료 직후 커밋, 리밸런싱 콜백에서 상태 정리
아래 코드는 mermaid를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
flowchart TB
subgraph 문제[실무 문제]
P1[스트림 파이프라인] --> S1[컨슈머-프로듀서]
P2[중복 처리] --> S2[멱등성·정확히 한 번]
P3[원자적 다중 토픽] --> S3[트랜잭션]
P4[윈도우 집계] --> S4[버퍼·주기 flush]
P5[리밸런싱] --> S5[상태·커밋 전략]
end
목표:
- 스트림 처리: read → transform → produce
- 트랜잭션: 여러 토픽 원자적 발행
- 정확히 한 번: idempotence·멱등성·트랜잭션
- 윈도우 집계: C++에서 시뮬레이션
- 프로덕션: 모니터링·백프레셔·재시도 요구 환경: C++17 이상, librdkafka 2.0+, Kafka 기본(#52-5) 선행 이 글을 읽으면:
- 스트림 처리 파이프라인을 C++로 구현할 수 있습니다.
- 트랜잭션·정확히 한 번 전달을 적용할 수 있습니다.
- 실무 에러·성능·프로덕션 패턴을 활용할 수 있습니다.
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
1. 스트림 처리 패턴
C++에서 스트림 처리란?
Kafka Streams나 KSQL은 JVM 기반입니다. C++에서는 컨슈머로 읽고 → 변환 → 프로듀서로 쓰는 패턴으로 스트림 처리를 구현합니다. 아래 코드는 mermaid를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
sequenceDiagram
participant C as 컨슈머
participant App as C++ 앱
participant P as 프로듀서
C->>App: consume(topic-a)
App->>App: transform/filter/aggregate
App->>P: produce(topic-b)
1.1 단순 파이프라인: topic-a → topic-b
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// stream_pipeline.cpp
// 컴파일: g++ -std=c++17 -o stream_pipeline stream_pipeline.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sig_handler(int) { run = 0; }
int main() {
std::string errstr;
std::string brokers = "localhost:9092";
// 1. 컨슈머 설정
RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
c_conf->set("bootstrap.servers", brokers, errstr);
c_conf->set("group.id", "stream-pipeline-group", errstr);
c_conf->set("enable.auto.commit", "false", errstr);
RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
delete c_conf;
if (!consumer) {
std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
return 1;
}
// 2. 프로듀서 설정
RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
p_conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer* producer = RdKafka::Producer::create(p_conf, errstr);
delete p_conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
delete consumer;
return 1;
}
consumer->subscribe({"raw-logs"});
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
std::cout << "스트림 파이프라인 시작 (raw-logs → processed-logs)" << std::endl;
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string payload(static_cast<const char*>(msg->payload()),
msg->len());
// 변환: 대문자 변환 (예시)
std::string transformed;
for (char c : payload) {
transformed += (c >= 'a' && c <= 'z') ? (c - 32) : c;
}
// 출력 토픽에 발행
RdKafka::ErrorCode err = producer->produce(
"processed-logs",
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(transformed.data()),
transformed.size(),
msg->key() ? msg->key()->data() : nullptr,
msg->key() ? msg->key()->size() : 0,
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
} else {
consumer->commit(msg); // 처리 완료 후 커밋
}
break;
}
case RdKafka::ERR__TIMED_OUT:
case RdKafka::ERR__PARTITION_EOF:
break;
default:
std::cerr << "소비 에러: " << msg->errstr() << std::endl;
run = 0;
break;
}
delete msg;
producer->poll(0);
}
producer->flush(5000);
consumer->close();
delete producer;
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
raw-logs→ 읽기 → 변환 →processed-logs발행- 처리 완료 후
commit(): 발행 성공 후에만 커밋해 유실 방지 producer->poll(0): delivery report 처리 주의: 발행 실패 시commit()하지 않으면 재시작 시 재처리됩니다. at-least-once 보장.
1.2 필터링: 조건에 맞는 메시지만 전달
다음은 cpp를 활용한 상세한 구현 코드입니다. 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// stream_filter.cpp - 에러 로그만 필터링
bool should_forward(const std::string& payload) {
return payload.find("ERROR") != std::string::npos ||
payload.find("FATAL") != std::string::npos;
}
// consume 루프 내부
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::string payload(static_cast<const char*>(msg->payload()), msg->len());
if (should_forward(payload)) {
producer->produce("error-logs", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()), payload.size(),
nullptr, 0, 0, nullptr);
}
consumer->commit(msg); // 필터 통과 여부와 관계없이 소비 완료
}
1.3 시간 윈도우 집계 (1분 단위)
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// stream_window_aggregate.cpp - 1분 윈도우 카운트
#include <librdkafka/rdkafkacpp.h>
#include <chrono>
#include <map>
#include <string>
class WindowAggregator {
public:
using WindowKey = std::string;
using CountMap = std::map<WindowKey, int64_t>;
void add(const std::string& key, int64_t count = 1) {
auto window = current_window();
counts_[window][key] += count;
}
void flush_if_needed(RdKafka::Producer* producer) {
auto now = std::chrono::system_clock::now();
auto current = current_window();
for (auto it = counts_.begin(); it != counts_.end();) {
if (it->first < current) {
for (const auto& [k, v] : it->second) {
std::string msg = k + ":" + std::to_string(v);
producer->produce("aggregated", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg.data()), msg.size(),
nullptr, 0, 0, nullptr);
}
it = counts_.erase(it);
} else {
++it;
}
}
}
private:
std::string current_window() {
auto now = std::chrono::system_clock::now();
auto sec = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return std::to_string((sec / 60) * 60); // 1분 단위
}
std::map<std::string, CountMap> counts_;
};
2. 정확히 한 번 전달
2.1 전달 시맨틱 비교
| 시맨틱 | 설명 | 중복 | 유실 |
|---|---|---|---|
| at-most-once | 커밋 후 처리 | 없음 | 가능 |
| at-least-once | 처리 후 커밋 | 가능 | 없음 |
| exactly-once | 멱등성·트랜잭션 | 없음 | 없음 |
2.2 멱등성 키로 중복 제거
비즈니스 로직에서 멱등성 키(idempotency key)를 사용해 중복 처리를 방지합니다. 다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// exactly_once_idempotent.cpp - 멱등성 키 기반 중복 제거
#include <unordered_set>
#include <string>
class IdempotencyCache {
public:
bool try_acquire(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if (seen_.count(key)) return false; // 이미 처리됨
seen_.insert(key);
if (seen_.size() > max_size_) {
// FIFO: 오래된 키 제거 (간단히 처음 것 제거)
seen_.erase(seen_.begin());
}
return true;
}
private:
std::unordered_set<std::string> seen_;
std::mutex mutex_;
size_t max_size_ = 100000;
};
// 메시지 처리 시
std::string idempotency_key = msg->key() ?
std::string(msg->key()->data(), msg->key()->size()) :
std::string(msg->topic_name()) + "-" + std::to_string(msg->partition()) +
"-" + std::to_string(msg->offset());
if (idempotency_cache.try_acquire(idempotency_key)) {
do_business_logic(msg); // 실제 처리
}
consumer->commit(msg); // 커밋은 항상 (재처리 시 멱등성으로 스킵)
주의: idempotency_key는 Redis·DB에 저장해 재시작 후에도 유지하는 것이 좋습니다. 메모리 캐시는 재시작 시 초기화됩니다.
2.3 프로듀서 멱등성 (enable.idempotence)
프로듀서가 메시지를 중복 발행하지 않도록 브로커 설정을 합니다. 다음은 간단한 cpp 코드 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// 프로듀서 설정
conf->set("enable.idempotence", "true", errstr);
// 이 설정 시 retries, acks, max.in.flight.requests.per.connection가 자동으로
// 안전한 값으로 설정됨
효과: 네트워크 오류로 재시도 시 브로커가 중복을 제거합니다. 단일 토픽 발행에만 적용됩니다.
3. 트랜잭션 프로듀서
3.1 트랜잭션이 필요한 경우
- 여러 토픽에 원자적으로 쓰기
- consume → transform → produce를 원자적으로 처리 (read-process-write) 아래 코드는 mermaid를 사용한 구현 예제입니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
sequenceDiagram
participant C as 컨슈머
participant App as 앱
participant P as 트랜잭션 프로듀서
P->>P: init_transactions()
P->>P: begin_transaction()
C->>App: consume
App->>P: produce (topic-a)
App->>P: produce (topic-b)
P->>P: commit_transaction()
App->>C: commit (offsets)
3.2 librdkafka 트랜잭션 API (C++)
librdkafka는 C API로 트랜잭션을 지원합니다. C++에서는 RdKafka::Producer의 init_transactions() 등이 있습니다.
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// transactional_producer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
conf->set("transactional.id", "my-txn-id", errstr); // 트랜잭션 ID 필수
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
// 1. 트랜잭션 초기화
RdKafka::ErrorCode err = producer->init_transactions(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "init_transactions 실패: " << RdKafka::err2str(err) << std::endl;
delete producer;
return 1;
}
// 2. 트랜잭션 시작
err = producer->begin_transaction();
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "begin_transaction 실패: " << RdKafka::err2str(err) << std::endl;
delete producer;
return 1;
}
// 3. 여러 토픽에 발행
std::string msg1 = "order-123";
std::string msg2 = "event-order-123";
producer->produce("orders", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg1.data()), msg1.size(),
nullptr, 0, 0, nullptr);
producer->produce("order-events", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg2.data()), msg2.size(),
nullptr, 0, 0, nullptr);
// 4. 트랜잭션 커밋
err = producer->commit_transaction(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "commit_transaction 실패: " << RdKafka::err2str(err) << std::endl;
producer->abort_transaction(5000);
}
producer->flush(5000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
주의: transactional.id는 클러스터 내에서 유일해야 합니다. enable.idempotence는 트랜잭션 사용 시 자동으로 활성화됩니다.
3.3 컨슈머 read_committed
트랜잭션으로 커밋된 메시지만 읽으려면:
conf->set("isolation.level", "read_committed", errstr);
3.4 트랜잭션 미지원 브로커
Kafka 2.8+ 및 transaction.state.log.replication.factor가 1 이상이어야 합니다. 구버전이면 트랜잭션 API가 실패합니다. 이 경우 멱등성 키 + at-least-once로 대체합니다.
4. 완전한 Kafka 예제
예제 1: 로그 집계 파이프라인 (에러율 계산)
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 에러 처리를 통해 안정성을 확보합니다, 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// log_aggregator.cpp - 에러 로그 비율 집계
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <chrono>
#include <atomic>
struct LogStats {
std::atomic<int64_t> total{0};
std::atomic<int64_t> errors{0};
};
void run_log_aggregator(const std::string& brokers) {
std::string errstr;
RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
c_conf->set("bootstrap.servers", brokers, errstr);
c_conf->set("group.id", "log-aggregator", errstr);
c_conf->set("enable.auto.commit", "false", errstr);
auto* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
delete c_conf;
RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
p_conf->set("bootstrap.servers", brokers, errstr);
auto* producer = RdKafka::Producer::create(p_conf, errstr);
delete p_conf;
consumer->subscribe({"app-logs"});
LogStats stats;
while (true) {
RdKafka::Message* msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::string payload(static_cast<const char*>(msg->payload()), msg->len());
stats.total++;
if (payload.find("ERROR") != std::string::npos) stats.errors++;
// 1분마다 요약 발행
auto now = std::chrono::system_clock::now();
static auto last_flush = now;
if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_flush).count() > 60000) {
double rate = (stats.total > 0) ? (100.0 * stats.errors / stats.total) : 0;
std::string summary = "{\"total\":" + std::to_string(stats.total) +
",\"errors\":" + std::to_string(stats.errors) +
",\"error_rate\":" + std::to_string(rate) + "}";
producer->produce("log-summary", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(summary.data()), summary.size(),
nullptr, 0, 0, nullptr);
last_flush = now;
}
consumer->commit(msg);
}
delete msg;
producer->poll(0);
}
delete producer;
delete consumer;
}
예제 2: 주문 이벤트 처리 (멱등성)
다음은 cpp를 활용한 상세한 구현 코드입니다. 필요한 모듈을 import하고, 클래스를 정의하여 데이터와 기능을 캡슐화하며, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// order_processor.cpp - 멱등성 키로 중복 주문 방지
#include <librdkafka/rdkafkacpp.h>
#include <unordered_set>
#include <mutex>
#include <string>
class OrderProcessor {
public:
void process(RdKafka::Message* msg, RdKafka::KafkaConsumer* consumer) {
if (msg->err() != RdKafka::ERR_NO_ERROR) return;
std::string order_id = extract_order_id(msg);
if (order_id.empty()) {
consumer->commit(msg);
return;
}
{
std::lock_guard<std::mutex> lock(mutex_);
if (processed_.count(order_id)) {
consumer->commit(msg); // 이미 처리됨, 스킵
return;
}
processed_.insert(order_id);
if (processed_.size() > 100000) {
processed_.erase(processed_.begin());
}
}
// 실제 비즈니스 로직: 재고 차감, DB 저장 등
do_order_processing(order_id, msg);
consumer->commit(msg);
}
private:
std::string extract_order_id(RdKafka::Message* msg) {
if (msg->key()) {
return std::string(msg->key()->data(), msg->key()->size());
}
return "";
}
void do_order_processing(const std::string& order_id, RdKafka::Message* msg) {
(void)order_id;
(void)msg;
// 재고 차감, 주문 저장 등
}
std::unordered_set<std::string> processed_;
std::mutex mutex_;
};
예제 3: Graceful Shutdown + Flush
다음은 cpp를 활용한 상세한 구현 코드입니다. 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// graceful_shutdown.cpp
static std::atomic<bool> running{true};
void shutdown_handler(int sig) {
(void)sig;
running = false;
}
int main() {
signal(SIGINT, shutdown_handler);
signal(SIGTERM, shutdown_handler);
// ....프로듀서·컨슈머 생성
while (running) {
RdKafka::Message* msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
process(msg);
consumer->commit(msg);
}
delete msg;
}
// 1. 프로듀서: 대기 중인 메시지 전송 완료
producer->flush(10000);
delete producer;
// 2. 컨슈머: 정리
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
5. 자주 발생하는 에러와 해결법
에러 1: “Transaction coordinator not found”
증상: init_transactions() 또는 begin_transaction() 실패.
원인: 브로커가 트랜잭션을 지원하지 않거나, transaction.state.log.replication.factor가 0.
해결법:
# 브로커 설정 확인
# broker 설정에서 transaction.state.log.replication.factor >= 1
아래 코드는 cpp를 사용한 구현 예제입니다. 조건문으로 분기 처리를 수행합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 트랜잭션 미지원 시 fallback: 멱등성 키 + at-least-once
err = producer->init_transactions(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "트랜잭션 미지원, 멱등성 모드로 전환" << std::endl;
use_transaction_ = false;
}
에러 2: “Fatal error: Local: Invalid argument (transactional.id)”
증상: transactional.id 설정 후 오류.
원인: transactional.id가 비어 있거나, 이미 사용 중인 ID.
해결법:
// ✅ 유일한 ID 사용 (인스턴스별)
conf->set("transactional.id", "my-app-" + std::to_string(getpid()), errstr);
에러 3: “Commit failed: Local: No offset stored”
증상: 리밸런싱 직후 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS.
원인: 컨슈머 그룹 리밸런싱 중에는 커밋이 거부됨.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 조건문으로 분기 처리를 수행합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 리밸런싱 콜백에서 assign 후에만 commit
if (msg->err() == RdKafka::ERR_NO_ERROR) {
consumer->commit(msg);
} else if (msg->err() == RdKafka::ERR__REBALANCE_IN_PROGRESS) {
// rebalance_cb에서 처리됨, 여기서는 consume 계속
}
에러 4: “Producer: Queue full”
증상: produce() 호출 시 ERR__QUEUE_FULL.
원인: 발행 속도 > 브로커 수신 속도.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 반복문으로 데이터를 처리합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 백프레셔: 큐가 비워질 때까지 대기
while (producer->outq_len() > 10000) {
producer->poll(100);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
producer->produce(...);
에러 5: 스트림 파이프라인에서 메시지 유실
증상: 컨슈머가 commit() 후 프로듀서 발행 전에 크래시.
원인: 처리 후 커밋 → 발행 순서가 잘못되면, 커밋은 됐는데 발행이 안 된 상태로 재시작됨.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 조건문으로 분기 처리를 수행합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 발행 성공 후 커밋 (at-least-once)
RdKafka::ErrorCode err = producer->produce(...);
if (err == RdKafka::ERR_NO_ERROR) {
producer->poll(0); // dr_cb 확인
consumer->commit(msg);
}
// ✅ 정확히 한 번이 필요하면 트랜잭션 + send_offsets_to_transaction
에러 6: “Broker: Not leader for partition” 반복
증상: 리더가 자주 바뀌는 파티션에서 지속적 실패. 원인: 브로커 불안정, 네트워크 지연. 해결법: 다음은 간단한 cpp 코드 예제입니다. 에러 처리를 통해 안정성을 확보합니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
// ✅ 재시도·백오프
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "500", errstr);
conf->set("message.send.max.retries", "5", errstr);
에러 7: “Consumer group is rebalancing” 후 메시지 중복
증상: 리밸런싱 시 처리 중이던 메시지가 다른 컨슈머로 재할당되어 중복 처리.
원인: rebalance_cb에서 unassign 시 처리 중인 메시지 정리 전에 새 컨슈머가 할당받음.
해결법:
아래 코드는 cpp를 사용한 구현 예제입니다. 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// ✅ rebalance_cb에서 파티션 해제 시 처리 중인 작업 완료 대기
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) override {
if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// 처리 중인 메시지 완료 대기
wait_for_pending_processing();
consumer->unassign();
} else if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
}
}
에러 8: “Message delivery failed: Broker: Message size too large”
증상: 큰 메시지 발행 시 실패. 해결법:
// 프로듀서
conf->set("message.max.bytes", "10485760", errstr); // 10MB
// 브로커: message.max.bytes, replica.fetch.max.bytes 동일하게
6. 성능 최적화 팁
6.1 스트림 파이프라인 배치 처리
메시지를 모아서 한 번에 처리하면 오버헤드가 줄어듭니다. 다음은 cpp를 활용한 상세한 구현 코드입니다. 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// 배치 크기만큼 모아서 처리
const size_t BATCH_SIZE = 100;
std::vector<RdKafka::Message*> batch;
while (batch.size() < BATCH_SIZE) {
RdKafka::Message* msg = consumer->consume(100);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
batch.push_back(msg);
} else {
bool timed_out = (msg->err() == RdKafka::ERR__TIMED_OUT);
delete msg;
if (timed_out && !batch.empty()) break;
}
}
for (auto* m : batch) {
process_and_produce(m);
consumer->commit(m);
delete m;
}
6.2 프로듀서 병렬화
// ✅ 여러 스레드에서 동일 프로듀서 사용 (librdkafka는 스레드 안전)
std::thread t1([&]() { producer->produce(...); producer->poll(100); });
std::thread t2([&]() { producer->produce(...); producer->poll(100); });
6.3 압축
conf->set("compression.type", "lz4", errstr); // 속도·압축률 균형
6.4 linger.ms vs 처리량
| linger.ms | 처리량 | 지연 |
|---|---|---|
| 0 | 낮음 | 1ms |
| 5 | 중간 | 5ms |
| 50 | 높음 | 50ms |
conf->set("linger.ms", "5", errstr);
conf->set("batch.size", "32768", errstr); // 32KB
6.5 파티션 수와 컨슈머 수
- 파티션 수 ≥ 컨슈머 수: 모든 컨슈머가 활용됨
- 파티션 수 < 컨슈머 수: 일부 컨슈머는 idle
# 토픽 생성 시 파티션 수 결정
kafka-topics --create --topic events --partitions 12 --replication-factor 2 \
--bootstrap-server localhost:9092
7. 프로덕션 패턴
7.1 헬스 체크
다음은 cpp를 활용한 상세한 구현 코드입니다. 반복문으로 데이터를 처리합니다, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
bool health_check(RdKafka::Producer* producer) {
RdKafka::Metadata* meta = nullptr;
RdKafka::ErrorCode err = producer->metadata(true, nullptr, &meta, 5000);
bool ok = (err == RdKafka::ERR_NO_ERROR && meta);
if (meta) delete meta;
return ok;
}
bool health_check(RdKafka::KafkaConsumer* consumer) {
std::vector<RdKafka::TopicPartition*> partitions;
RdKafka::ErrorCode err = consumer->assignment(partitions);
bool ok = (err == RdKafka::ERR_NO_ERROR);
for (auto* p : partitions) delete p;
return ok;
}
7.2 메트릭 수집
아래 코드는 cpp를 사용한 구현 예제입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
// librdkafka 통계 (JSON)
conf->set("statistics.interval.ms", "10000", errstr);
class StatsCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) override {
if (event.type() == RdKafka::Event::EVENT_STATS) {
// event.str()에 JSON 통계
std::cout << event.str() << std::endl;
}
}
};
7.3 환경 변수 기반 설정
아래 코드는 cpp를 사용한 구현 예제입니다. 클래스를 정의하여 데이터와 기능을 캡슐화하며, 조건문으로 분기 처리를 수행합니다. 각 부분의 역할을 이해하면서 코드를 살펴보시기 바랍니다.
struct KafkaConfig {
std::string brokers = "localhost:9092";
std::string group_id = "cpp-consumer";
std::string transactional_id;
int linger_ms = 5;
};
KafkaConfig load_config() {
KafkaConfig c;
if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
if (const char* t = std::getenv("KAFKA_TRANSACTIONAL_ID")) c.transactional_id = t;
return c;
}
7.4 SSL/TLS + SASL
아래 코드는 cpp를 사용한 구현 예제입니다. 코드를 직접 실행해보면서 동작을 확인해보세요.
conf->set("security.protocol", "sasl_ssl", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", std::getenv("KAFKA_USER"), errstr);
conf->set("sasl.password", std::getenv("KAFKA_PASSWORD"), errstr);
conf->set("ssl.ca.location", "/etc/ssl/certs/ca-certificates.crt", errstr);
7.5 멀티 브로커
conf->set("bootstrap.servers",
"broker1:9092,broker2:9092,broker3:9092", errstr);
7.6 디버그 로깅
conf->set("debug", "broker,topic,msg", errstr);
// 프로덕션에서는 제거
8. 구현 체크리스트
스트림 처리
- consume → transform → produce 순서 준수
- 발행 성공 후 commit (유실 방지)
- producer->poll() 주기적 호출
정확히 한 번
- 멱등성 키 설계 (키 또는 topic-partition-offset)
- 멱등성 캐시 저장소 (Redis/DB 권장)
- enable.idempotence (프로듀서)
트랜잭션
- transactional.id 설정 (유일값)
- init_transactions → begin → produce → commit
- 실패 시 abort_transaction
- isolation.level=read_committed (컨슈머)
에러 처리
- produce() 반환값 검사
- msg->err() 검사
- ERR__QUEUE_FULL 시 백프레셔
- rebalance_cb 구현
프로덕션
- Graceful Shutdown (flush, close)
- SSL/TLS, SASL (필요 시)
- 환경 변수로 설정 외부화
- 헬스 체크·메트릭
9. 정리
| 항목 | 요약 |
|---|---|
| 스트림 처리 | consume → transform → produce, 발행 후 커밋 |
| 정확히 한 번 | 멱등성 키 + enable.idempotence + 트랜잭션 |
| 트랜잭션 | transactional.id, init_transactions, begin/commit/abort |
| 에러 | Transaction coordinator, Queue full, Rebalance |
| 성능 | 배치 처리, linger.ms, 압축, 파티션 수 |
| 프로덕션 | Graceful Shutdown, SSL/SASL, 헬스 체크 |
| 핵심 원칙: |
- 발행 후 커밋: 처리 완료 후 프로듀서 발행 성공 확인 후 commit
- 멱등성: 비즈니스 키로 중복 처리 방지
- 트랜잭션: 다중 토픽 원자적 쓰기
- 리밸런싱: rebalance_cb에서 상태 정리·재할당 처리
자주 묻는 질문 (FAQ)
Q. 이 내용을 실무에서 언제 쓰나요?
A. 이벤트 스트리밍, 로그 수집, 실시간 데이터 파이프라인, CDC, 금융·주문 시스템의 정확히 한 번 전달 등에 활용합니다. Kafka 기본(#52-5)을 선행으로 읽으세요.
Q. C++에서 Kafka Streams 대신 뭘 쓰나요?
A. Kafka Streams는 JVM 전용입니다. C++에서는 컨슈머-프로듀서 패턴으로 read → transform → produce를 구현합니다. 복잡한 윈도우·조인은 별도 상태 저장소(Redis, DB)와 함께 구현합니다.
Q. 정확히 한 번 전달이 꼭 필요한가요?
A. 금융·주문 시스템에서는 필수입니다. 로그·메트릭은 at-least-once로도 충분할 수 있습니다. 멱등성 키 설계가 가능하면 정확히 한 번 전달을 시뮬레이션할 수 있습니다.
Q. 트랜잭션 프로듀서가 실패하면?
A. abort_transaction()을 호출해 롤백합니다. 재시도 시 begin_transaction()부터 다시 시작합니다.
한 줄 요약: 스트림 처리·트랜잭션·정확히 한 번 전달을 C++로 구현해 실무에 적용할 수 있습니다.
다음 글: C++ 시리즈 목차
이전 글: Kafka 완벽 가이드(#52-5)
참고 자료
- librdkafka 공식 문서
- Apache Kafka 트랜잭션
- Kafka 완벽 가이드(#52-5) — 프로듀서·컨슈머 기초