当前位置: 首页 > wzjs >正文

安徽省建设工程造价信息网站安徽专业做网站的大公司

安徽省建设工程造价信息网站,安徽专业做网站的大公司,wordpress旅游网站,数据网站有哪些1. 了解 Kafka Apache Kafka 是一个分布式流处理平台,核心功能包括: 发布/订阅消息系统:解耦生产者和消费者 分布式存储:持久化、容错的消息存储 流处理:实时处理数据流 核心概念: 概念说明BrokerKaf…

1. 了解 Kafka

Apache Kafka 是一个分布式流处理平台,核心功能包括:

  • 发布/订阅消息系统:解耦生产者和消费者

  • 分布式存储:持久化、容错的消息存储

  • 流处理:实时处理数据流

核心概念

概念说明
BrokerKafka 集群中的单个服务器节点
Topic消息的逻辑分类(如:user_activity
PartitionTopic 的分区(并行处理单位),消息按顺序存储
Producer向 Topic 发布消息的客户端
Consumer订阅 Topic 并处理消息的客户端
Consumer Group多个消费者协同消费同一 Topic(每个分区只被组内一个消费者消费)
Offset消息在分区中的唯一位置标识

2. 了解 rdkafka

rdkafka 是 Kafka 的 C/C++ 客户端库,提供:

  • 高性能生产/消费 API(支持 C/C++/Python 等)

  • 特性:

    • 异步/同步发送模式

    • 自动负载均衡

    • 消息压缩(gzip, snappy, lz4)

    • SASL 认证

    • 精确一次语义(EOS)

  • 开源地址:edenhill/librdkafka

3. 代码实现

以下是使用 librdkafka 的 C++ 接口操作 Kafka 的生产者和消费者完整实现:

生产者代码 (producer.cpp)
#include <iostream>
#include <string>
#include <sstream>
#include <librdkafka/rdkafkacpp.h>class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb(RdKafka::Message &message) {if (message.err()) {std::cerr << "消息发送失败: " << message.errstr() << std::endl;} else {std::cout << "消息发送成功: " << message.topic_name() << " [" << message.partition() << "] @ " << message.offset() << std::endl;}}
};int main() {// 1. 创建配置对象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 设置配置参数if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 设置消息确认模式 (all = 最高可靠性)if (conf->set("acks", "all", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 3. 创建生产者实例ProducerDeliveryReportCb delivery_cb;if (conf->set("dr_cb", &delivery_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置回调错误: " << errstr << std::endl;return 1;}RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "创建生产者失败: " << errstr << std::endl;return 1;}delete conf;// 4. 创建Topic对象RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);RdKafka::Topic *topic = RdKafka::Topic::create(producer,"cpp_test_topic",tconf,errstr);if (!topic) {std::cerr << "创建Topic失败: " << errstr << std::endl;delete tconf;return 1;}delete tconf;// 5. 生产消息for (int i = 0; i < 10; ++i) {std::string key = "key-" + std::to_string(i);std::string payload = "Message #" + std::to_string(i);// 发送消息RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, // 自动分区分配RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(payload.c_str()), payload.size(),const_cast<char*>(key.c_str()), key.size(),NULL);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "生产消息失败: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "已发送: " << payload << std::endl;}// 处理事件队列producer->poll(0);}// 6. 等待所有消息完成发送while (producer->outq_len() > 0) {std::cout << "等待发送队列: " << producer->outq_len() << std::endl;producer->poll(100);}// 7. 清理资源delete topic;delete producer;return 0;
}
消费者代码 (consumer.cpp)
#include <iostream>
#include <string>
#include <csignal>
#include <vector>
#include <librdkafka/rdkafkacpp.h>bool running = true;void stop(int sig) {running = false;
}class ConsumerEventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "错误: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_LOG:std::cout << "日志: " << event.str() << std::endl;break;default:std::cout << "事件: " << event.type() << ": " << event.str() << std::endl;break;}}
};int main() {// 注册信号处理signal(SIGINT, stop);signal(SIGTERM, stop);// 1. 创建配置对象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 设置配置参数if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 设置消费组if (conf->set("group.id", "cpp_consumer_group", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 从最早的消息开始消费if (conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置错误: " << errstr << std::endl;return 1;}// 3. 设置事件回调ConsumerEventCb event_cb;if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "设置回调失败: " << errstr << std::endl;return 1;}// 4. 创建消费者实例RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "创建消费者失败: " << errstr << std::endl;return 1;}delete conf;// 5. 订阅Topicstd::vector<std::string> topics;topics.push_back("cpp_test_topic");RdKafka::ErrorCode resp = consumer->subscribe(topics);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "订阅失败: " << RdKafka::err2str(resp) << std::endl;return 1;}// 6. 消费消息while (running) {// 等待消息 (1000ms超时)RdKafka::Message *msg = consumer->consume(1000);switch (msg->err()) {case RdKafka::ERR__TIMED_OUT:break;  // 超时继续case RdKafka::ERR_NO_ERROR:// 成功消费到消息std::cout << "收到消息: "<< "主题: " << msg->topic_name() << " | 分区: [" << msg->partition() << "]"<< " | 偏移量: " << msg->offset() << std::endl;if (msg->key()) {std::cout << "键: " << *msg->key() << " => ";}std::cout << "值: " << static_cast<const char*>(msg->payload()) << std::endl;break;default:std::cerr << "消费错误: " << msg->errstr() << std::endl;break;}// 手动提交偏移量consumer->commitAsync(msg);delete msg;}// 7. 关闭消费者consumer->close();delete consumer;return 0;
}
编译运行

# 编译生产者
g++ -o producer producer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl# 编译消费者
g++ -o consumer consumer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl


文章转载自:

http://zgwqakxo.ndLww.cn
http://rhk5Mp4k.ndLww.cn
http://kHYf7Qxc.ndLww.cn
http://2Ee6BsAK.ndLww.cn
http://4kF4vcva.ndLww.cn
http://8iO6K9V1.ndLww.cn
http://DzCaK9ja.ndLww.cn
http://lZiIbqN5.ndLww.cn
http://VOcl4K09.ndLww.cn
http://fOJbqnEW.ndLww.cn
http://dqhuCHtb.ndLww.cn
http://VKCVzIys.ndLww.cn
http://8TnG3HjY.ndLww.cn
http://9WkB9fdV.ndLww.cn
http://vLntdB4N.ndLww.cn
http://vexmUv8d.ndLww.cn
http://QPVSxjB7.ndLww.cn
http://k0LnIzTA.ndLww.cn
http://XR09V65M.ndLww.cn
http://LCdranZa.ndLww.cn
http://rb8XfGzo.ndLww.cn
http://WZ9uW7it.ndLww.cn
http://phQ3ob8F.ndLww.cn
http://peQQ1YPt.ndLww.cn
http://djnIFw54.ndLww.cn
http://pcWWBPuQ.ndLww.cn
http://CJ63dme2.ndLww.cn
http://7tZlbalO.ndLww.cn
http://3GeCLYSD.ndLww.cn
http://8PdNdZ8m.ndLww.cn
http://www.dtcms.com/wzjs/719647.html

相关文章:

  • 广东建设局网站首页自学装修设计从哪里入手
  • 北京网站建设认知群辉做网站服务器python
  • 港巢网站建设《建设工程质量管理条例》
  • 做网站用广告赚钱过时了青海教育厅门户网站
  • 找工作的网站有哪些?佛山专业网站建设价格
  • 拼多多开网店购物网站seo关键词定位
  • 来宾网站建设白沙网站建设
  • 数据分析案例网站中国网站制作 第一个
  • 怎样做个做外贸的网站有后台的网站怎么做
  • wordpress+发布文章慢如何优化网站推广
  • 宁波做网站的哪个好山西优化公司
  • 淘客请人做网站网络营销推广的重要性
  • 电影影视网站模板免费下载滨湖区知名做网站价格
  • 济南建站都选企汇优先做后付外包员工
  • 酒泉网站建设手机微网站开发的目的和意义
  • asp.net p2p网站开发昆明网站优化建设
  • 我要建网站需要什么推广文案撰写
  • 检察院门户网站建设自查报告给人做设计的网站
  • python好还是wordpress太原seo网站建设
  • 深圳网站制作公司人才招聘网站建设需求范文
  • 域名备案企业网站内容网站建设富库
  • 中英文免费网站建设宝塔wordpress优化
  • 赤峰网站设计公司江西建设厅网站证书查询
  • 济南网站设计公司富中山精品网站建设信息
  • 新乡手机网站建设中国做外贸的网站
  • 搭建wordpress站点商城网站建设网络公司
  • 甘肃省建设监理协会 官方网站深圳企业营销型网站
  • c 网站开发连接mysql网站建设前期要多久
  • 做文章网站网站建设详情报价
  • 网站空间购买哪个好四川建设网是国企吗