kafka实践与C++操作kafka
一、集群
- Kafka 架构是由 producer(消息生产者)、consumer(消息消费者)、broker(kafka 集群的 server,负责处理消息读、写请求,存储消息,在 Kafka cluster 这一层里,其实里面是有很多个 broker 组成)、topic(消息队列 / 分类相当于队列,里面有生产者和消费者模型)、zookeeper 这些部分组成。
- kafka 里面的消息是有 topic 来组织的,简单的我们可以想象为一个队列,一个队列就是一个 topic,然后它把每个 topic 又分为很多个 partition,这个是为了做并行的,在每个 partition 内部消息是有顺序,相当于有序的队列,其中每个消息都有个序号 offset,比如 0 到 1,2,从前面读往后面写。一个 partition 对应一个 broker,一个 broker 可以管多个 partition,比如说,topic 有 6 个 partition,有两个 broker,那每个 broker 就管 3 个 partition,这个 partition 可以很简单想象为一个文件,当数据发过来的时候它就往这个 partition 上面 append,追加就行了,消息不经过内存缓冲,直接写入文件,kafka 的很多消息系统不一样,很多消息系统是消费完就把它删除掉,而 kafka 是按照时间策略删除,而不是消费完就删除,在 kafka 里面没有一个消费完这么个概念,只有过期这样一个概念。
- producer 自己决定往哪个 partition 里面面去写,这里有一些的策略,譬如如果 hash,不用多个 partition 之间去 queue 了。consumer 自己维护消费到哪个 offset,每个 consumer 都有对应的 group,group 内是数据消费模型(各个 consumer 要消费不同的 partition,因此一个消息在 group 内只消费一次),group 间是 publish-subscribe 消费模型,各个 group 各自独立消费,互不影响,因此一个消息在被每个 group 消费一次。
1. 搭建两台服务器
ip1: 192.168.31.249
ip2: 192.168.31.36
2. zookeeper部署
zookeeper还是先只部署一台,在ip2: 192.168.31.36 上启动zookeeper
3. 启动broker ip1:192.168.31.249
修改broker.id(也可以改为-1,自动分配)
broker.id=0
修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。
zookeeper.connect=192.168.31.249:2181
启动kafka:
sh kafka-server-start.sh -daemon ../config/server.propertiessh kafka-server-start.sh ../config/server.properties
默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。
4. 启动broker ip2: 192.168.31.36
修改broker.id(也可以改为-1,自动分配)
broker.id=1
修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。
zookeeper.connect=192.168.31.249:2181
启动kafka:
sh kafka-server-start.sh -daemon ../config/server.properties
默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。
5. 查看kafka集群
创建主题:
sh kafka-topics.sh --create --zookeeper 192.168.31.249:2181 -replication-factor 2 --partitions 2 - topic kafka-2
查看主题:
sh kafka-topics.sh --describe --zookeeper 192.168.31.249:2181 --topic kafka-2
显示信息:
Topic:kafka-2 PartitionCount:2
Topic: kafka-2 Partition: 0
Topic: kafka-2 Partition: 1
ReplicationFactor:2 Configs:Leader: 1 Replicas: 1,0 Isr: 1,0Leader: 0 Replicas: 0,1 Isr: 0
6. 测试集群
开三个终端:开启一个生产者,两个消费者
生产者: sh kafka-console-producer.sh --broker-list 192.168.31.249:9092 --topic kafka-2
消费者:
sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning
sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning
当两个消费者同属一个消费组开启后,消费者轮流收到发送者的数据。
kafka-console-consumer.sh部分支持的参数:
参数 | 值类型 | 说明 | 有效值 |
---|---|---|---|
--topic | string | 被消费的 topic | |
--partition | integer | 指定分区,除非指定--offset ,否则从分区结束(latest)开始消费 | |
--offset | string | 执行消费的起始 offset 位置,默认值:latest | latest, earliest |
--consumer-property | string | 将用户定义的属性以key=value 的形式传递给使用者 | |
--consumer.config | string | 消费者配置属性文件,请注意,[consumer-property] 优先于此配置 | |
--from-beginning | 从存在的最早消息开始,而不是从最新消息开始 | ||
--group | string | 指定消费者所属组的 ID |
二、代码案例
一、环境准备
- Kafka 集群部署(本地测试)
- 下载 Kafka:官网(建议 2.8.1 版本)
- 启动 Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动 Kafka Broker:
bin/kafka-server-start.sh config/server.properties
- 创建测试主题(分区数 2,副本数 1):
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic demo-topic --partitions 2 --replication-factor 1
二、完整代码实现
以下为生产者(
kafka_producer.cpp
)和消费者(kafka_consumer.cpp
)的完整代码,使用 librdkafka 库实现。1. 生产者代码(发送消息)
// kafka_producer.cpp #include <iostream> #include <string> #include <chrono> #include <thread> #include "rdkafkacpp.h"// 投递结果回调(消息是否成功发送到Kafka) class DeliveryReportCb : public RdKafka::DeliveryReportCb { public:void dr_cb(RdKafka::Message& msg) override {if (msg.err()) {std::cerr << "消息投递失败 [" << msg.topic_name() << "][" << msg.partition() << "]: " << msg.errstr() << std::endl;} else {std::cout << "消息投递成功 [" << msg.topic_name() << "][" << msg.partition() << "]: 偏移量=" << msg.offset() << ", 大小=" << msg.len() << "字节" << std::endl;}} };// 事件回调(如错误、日志) class EventCb : public RdKafka::EventCb { public:void event_cb(RdKafka::Event& event) override {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_STATS:std::cout << "统计信息: " << event.str() << std::endl;break;default:break;}} };int main() {// 配置参数const std::string brokers = "localhost:9092"; // Kafka服务器地址const std::string topic_name = "demo-topic"; // 主题名称const int send_interval_ms = 2000; // 消息发送间隔(2秒)// 创建配置对象RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);std::string errstr;// 设置全局配置if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK ||conf->set("dr_cb", new DeliveryReportCb(), errstr) != RdKafka::Conf::CONF_OK ||conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置失败: " << errstr << std::endl;return 1;}// 设置主题配置(可选)topic_conf->set("request.required.acks", "1", errstr); // Leader确认即可// 创建Producer实例RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "创建Producer失败: " << errstr << std::endl;return 1;}// 创建Topic句柄RdKafka::Topic* topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);if (!topic) {std::cerr << "创建Topic失败: " << errstr << std::endl;delete producer;return 1;}// 循环发送消息std::cout << "开始发送消息到主题 [" << topic_name << "], 按 Ctrl+C 停止..." << std::endl;int msg_count = 0;while (true) {// 构造消息内容(带时间戳)auto now = std::chrono::system_clock::now();std::time_t time = std::chrono::system_clock::to_time_t(now);std::string payload = "Demo消息 #" + std::to_string(++msg_count) + " @ " + std::ctime(&time);// 发送消息(自动选择分区)RdKafka::ErrorCode err = producer->produce(topic,RdKafka::Topic::PARTITION_UA, // 自动分配分区RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(payload.c_str()), payload.size(),nullptr, // 无消息键nullptr);if (err != RdKafka::ERR_NO_ERROR) {std::cerr << "发送失败: " << RdKafka::err2str(err) << std::endl;} else {std::cout << "已发送消息 #" << msg_count << ": " << payload;}// 触发回调处理(必须定期调用poll)producer->poll(0);// 等待2秒std::this_thread::sleep_for(std::chrono::milliseconds(send_interval_ms));}// 清理资源(实际不会执行到,需通过信号捕获退出)delete topic;delete producer;delete conf;delete topic_conf;return 0; }
2. 消费者代码(接收消息)
// kafka_consumer.cpp #include <iostream> #include <vector> #include <string> #include "rdkafkacpp.h"// Rebalance回调(处理分区分配/撤销) class RebalanceCb : public RdKafka::RebalanceCb { public:void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions) override {if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {// 分配分区:手动指定从最早位置开始消费(可选)for (auto* part : partitions) {part->set_offset(RdKafka::TopicPartition::OFFSET_BEGINNING);}consumer->assign(partitions);std::cout << "分配分区: ";for (auto* part : partitions) {std::cout << part->topic() << "[" << part->partition() << "] ";}std::cout << std::endl;} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {// 撤销分区consumer->unassign();std::cout << "撤销分区" << std::endl;}} };// 事件回调(如错误、日志) class EventCb : public RdKafka::EventCb { public:void event_cb(RdKafka::Event& event) override {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_LOG:std::cout << "日志事件: " << event.str() << std::endl;break;default:break;}} };int main() {// 配置参数const std::string brokers = "localhost:9092"; // Kafka服务器地址const std::string group_id = "demo-consumer-group"; // 消费者组IDconst std::vector<std::string> topics = {"demo-topic"}; // 订阅主题// 创建配置对象RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);std::string errstr;// 设置全局配置if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK ||conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK ||conf->set("rebalance_cb", new RebalanceCb(), errstr) != RdKafka::Conf::CONF_OK ||conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置失败: " << errstr << std::endl;return 1;}// 设置自动提交(可选,这里禁用自动提交,手动提交位移)conf->set("enable.auto.commit", "false", errstr);// 设置主题配置:从最早位置开始消费(若分区无已提交位移)topic_conf->set("auto.offset.reset", "earliest", errstr);conf->set("default_topic_conf", topic_conf, errstr);// 创建Consumer实例RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "创建Consumer失败: " << errstr << std::endl;return 1;}// 订阅主题if (consumer->subscribe(topics) != RdKafka::ERR_NO_ERROR) {std::cerr << "订阅主题失败" << std::endl;delete consumer;return 1;}std::cout << "开始消费主题 [" << topics[0] << "], 按 Ctrl+C 停止..." << std::endl;while (true) {// 拉取消息(超时1秒)RdKafka::Message* msg = consumer->consume(1000);if (!msg) continue;// 处理消息switch (msg->err()) {case RdKafka::ERR_NO_ERROR:std::cout << "消费消息 [" << msg->topic_name() << "][" << msg->partition() << "]: 偏移量=" << msg->offset() << ", 内容=" << static_cast<char*>(msg->payload()) << std::endl;// 手动提交位移(每消费5条提交一次)static int count = 0;if (++count % 5 == 0) {consumer->commitSync();std::cout << "已同步提交位移" << std::endl;}break;case RdKafka::ERR__PARTITION_EOF:std::cout << "到达分区末尾,等待新消息..." << std::endl;break;case RdKafka::ERR__TIMED_OUT:// 超时无消息,继续轮询break;default:std::cerr << "消费错误: " << msg->errstr() << std::endl;break;}delete msg;}// 清理资源(实际不会执行到,需通过信号捕获退出)consumer->unsubscribe();consumer->close();delete consumer;delete conf;delete topic_conf;return 0; }
三、编译与运行
1. 依赖安装
- librdkafka:需安装开发库(包含头文件和动态库)。
- Ubuntu/Debian:
sudo apt-get install librdkafka-dev
- CentOS/Fedora:
sudo yum install librdkafka-devel
- 源码编译(推荐最新稳定版):
git clone https://github.com/edenhill/librdkafka.git cd librdkafka git checkout v2.2.0 # 选择稳定版本 ./configure --prefix=/usr/local make -j4 sudo make install sudo ldconfig # 更新动态链接库缓存
2. 编译命令
使用
g++
编译生产者和消费者代码(需链接 librdkafka++ 和 librdkafka 库):# 编译生产者 g++ kafka_producer.cpp -o kafka_producer -lrdkafka++ -lrdkafka -lpthread# 编译消费者 g++ kafka_consumer.cpp -o kafka_consumer -lrdkafka++ -lrdkafka -lpthread
3. 运行步骤
启动 Kafka 集群(确保 Zookeeper 和 Broker 已运行)。
运行生产者(发送消息到
demo-topic
):./kafka_producer
输出示例:
开始发送消息到主题 [demo-topic], 按 Ctrl+C 停止... 已发送消息 #1: Demo消息 #1 @ Tue May 28 15:30:00 2024 消息投递成功 [demo-topic][0]: 偏移量=0, 大小=35字节 已发送消息 #2: Demo消息 #2 @ Tue May 28 15:30:02 2024 消息投递成功 [demo-topic][1]: 偏移量=0, 大小=35字节
运行消费者(从
demo-topic
消费消息):./kafka_consumer
输出示例:
开始消费主题 [demo-topic], 按 Ctrl+C 停止... 分配分区: demo-topic[0] demo-topic[1] 消费消息 [demo-topic][0]: 偏移量=0, 内容=Demo消息 #1 @ Tue May 28 15:30:00 2024 消费消息 [demo-topic][1]: 偏移量=0, 内容=Demo消息 #2 @ Tue May 28 15:30:02 2024 已同步提交位移
0voice · GitHub