当前位置: 首页 > wzjs >正文

哪些网站做企业招聘不要花钱seo全称英文怎么说

哪些网站做企业招聘不要花钱,seo全称英文怎么说,北京自己怎样做网站,前端网页设计师一、集群 Kafka 架构是由 producer(消息生产者)、consumer(消息消费者)、broker(kafka 集群的 server,负责处理消息读、写请求,存储消息,在 Kafka cluster 这一层里,其实…

一、集群

  1. Kafka 架构是由 producer(消息生产者)、consumer(消息消费者)、broker(kafka 集群的 server,负责处理消息读、写请求,存储消息,在 Kafka cluster 这一层里,其实里面是有很多个 broker 组成)、topic(消息队列 / 分类相当于队列,里面有生产者和消费者模型)、zookeeper 这些部分组成。
  2. kafka 里面的消息是有 topic 来组织的,简单的我们可以想象为一个队列,一个队列就是一个 topic,然后它把每个 topic 又分为很多个 partition,这个是为了做并行的,在每个 partition 内部消息是有顺序,相当于有序的队列,其中每个消息都有个序号 offset,比如 0 到 1,2,从前面读往后面写。一个 partition 对应一个 broker,一个 broker 可以管多个 partition,比如说,topic 有 6 个 partition,有两个 broker,那每个 broker 就管 3 个 partition,这个 partition 可以很简单想象为一个文件,当数据发过来的时候它就往这个 partition 上面 append,追加就行了,消息不经过内存缓冲,直接写入文件,kafka 的很多消息系统不一样,很多消息系统是消费完就把它删除掉,而 kafka 是按照时间策略删除,而不是消费完就删除,在 kafka 里面没有一个消费完这么个概念,只有过期这样一个概念。
  3. producer 自己决定往哪个 partition 里面面去写,这里有一些的策略,譬如如果 hash,不用多个 partition 之间去 queue 了。consumer 自己维护消费到哪个 offset,每个 consumer 都有对应的 group,group 内是数据消费模型(各个 consumer 要消费不同的 partition,因此一个消息在 group 内只消费一次),group 间是 publish-subscribe 消费模型,各个 group 各自独立消费,互不影响,因此一个消息在被每个 group 消费一次。


1. 搭建两台服务器

        ip1: 192.168.31.249

         ip2: 192.168.31.36

2. zookeeper部署

        zookeeper还是先只部署一台,在ip2: 192.168.31.36 上启动zookeeper

3. 启动broker ip1:192.168.31.249

        修改broker.id(也可以改为-1,自动分配)

broker.id=0

        修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。

zookeeper.connect=192.168.31.249:2181

        启动kafka:

 sh kafka-server-start.sh -daemon ../config/server.propertiessh kafka-server-start.sh  ../config/server.properties

        默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。

4. 启动broker ip2: 192.168.31.36

        修改broker.id(也可以改为-1,自动分配)

broker.id=1

        修改server.properties(在config目录), 增加zookeeper的配置,要配置对应的zookeeper ip地址。

zookeeper.connect=192.168.31.249:2181

        启动kafka:

 sh kafka-server-start.sh -daemon ../config/server.properties

        默认端口为:9092,可以通过命令lsof -i:9092查看kafka是否启动成功。

5. 查看kafka集群

        创建主题:

sh kafka-topics.sh --create --zookeeper 192.168.31.249:2181 -replication-factor 2 --partitions 2 - topic kafka-2

        查看主题:

sh kafka-topics.sh --describe --zookeeper 192.168.31.249:2181 --topic kafka-2

        显示信息:

Topic:kafka-2   PartitionCount:2    
Topic: kafka-2  Partition: 0    
Topic: kafka-2  Partition: 1    
ReplicationFactor:2 Configs:Leader: 1   Replicas: 1,0   Isr: 1,0Leader: 0   Replicas: 0,1   Isr: 0

6. 测试集群

        开三个终端:开启一个生产者,两个消费者

        生产者: sh kafka-console-producer.sh --broker-list 192.168.31.249:9092 --topic kafka-2

        消费者:

        sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning

        sh kafka-console-consumer.sh --bootstrap-server 192.168.31.249:9092 --topic kafka-2 --group 0 - from-beginning

        当两个消费者同属一个消费组开启后,消费者轮流收到发送者的数据。

        kafka-console-consumer.sh部分支持的参数:

参数值类型说明有效值
--topicstring被消费的 topic
--partitioninteger指定分区,除非指定--offset,否则从分区结束(latest)开始消费
--offsetstring执行消费的起始 offset 位置,默认值:latestlatest, earliest
--consumer-propertystring将用户定义的属性以key=value的形式传递给使用者
--consumer.configstring消费者配置属性文件,请注意,[consumer-property]优先于此配置
--from-beginning从存在的最早消息开始,而不是从最新消息开始
--groupstring指定消费者所属组的 ID

二、代码案例

一、环境准备
  1. Kafka 集群部署(本地测试)
    • 下载 Kafka:官网(建议 2.8.1 版本)
    • 启动 Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties
    • 启动 Kafka Broker:bin/kafka-server-start.sh config/server.properties
    • 创建测试主题(分区数 2,副本数 1):
      bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic demo-topic --partitions 2 --replication-factor 1
      
二、完整代码实现

以下为生产者(kafka_producer.cpp)和消费者(kafka_consumer.cpp)的完整代码,使用 librdkafka 库实现。

1. 生产者代码(发送消息)
// kafka_producer.cpp
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include "rdkafkacpp.h"// 投递结果回调(消息是否成功发送到Kafka)
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb(RdKafka::Message& msg) override {if (msg.err()) {std::cerr << "消息投递失败 [" << msg.topic_name() << "][" << msg.partition() << "]: " << msg.errstr() << std::endl;} else {std::cout << "消息投递成功 [" << msg.topic_name() << "][" << msg.partition() << "]: 偏移量=" << msg.offset() << ", 大小=" << msg.len() << "字节" << std::endl;}}
};// 事件回调(如错误、日志)
class EventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event& event) override {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_STATS:std::cout << "统计信息: " << event.str() << std::endl;break;default:break;}}
};int main() {// 配置参数const std::string brokers = "localhost:9092";  // Kafka服务器地址const std::string topic_name = "demo-topic";   // 主题名称const int send_interval_ms = 2000;             // 消息发送间隔(2秒)// 创建配置对象RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);std::string errstr;// 设置全局配置if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK ||conf->set("dr_cb", new DeliveryReportCb(), errstr) != RdKafka::Conf::CONF_OK ||conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置失败: " << errstr << std::endl;return 1;}// 设置主题配置(可选)topic_conf->set("request.required.acks", "1", errstr);  // Leader确认即可// 创建Producer实例RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "创建Producer失败: " << errstr << std::endl;return 1;}// 创建Topic句柄RdKafka::Topic* topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);if (!topic) {std::cerr << "创建Topic失败: " << errstr << std::endl;delete producer;return 1;}// 循环发送消息std::cout << "开始发送消息到主题 [" << topic_name << "], 按 Ctrl+C 停止..." << std::endl;int msg_count = 0;while (true) {// 构造消息内容(带时间戳)auto now = std::chrono::system_clock::now();std::time_t time = std::chrono::system_clock::to_time_t(now);std::string payload = "Demo消息 #" + std::to_string(++msg_count) + " @ " + std::ctime(&time);// 发送消息(自动选择分区)RdKafka::ErrorCode err = producer->produce(topic,RdKafka::Topic::PARTITION_UA,  // 自动分配分区RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(payload.c_str()), payload.size(),nullptr,  // 无消息键nullptr);if (err != RdKafka::ERR_NO_ERROR) {std::cerr << "发送失败: " << RdKafka::err2str(err) << std::endl;} else {std::cout << "已发送消息 #" << msg_count << ": " << payload;}// 触发回调处理(必须定期调用poll)producer->poll(0);// 等待2秒std::this_thread::sleep_for(std::chrono::milliseconds(send_interval_ms));}// 清理资源(实际不会执行到,需通过信号捕获退出)delete topic;delete producer;delete conf;delete topic_conf;return 0;
}
2. 消费者代码(接收消息)
// kafka_consumer.cpp
#include <iostream>
#include <vector>
#include <string>
#include "rdkafkacpp.h"// Rebalance回调(处理分区分配/撤销)
class RebalanceCb : public RdKafka::RebalanceCb {
public:void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions) override {if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {// 分配分区:手动指定从最早位置开始消费(可选)for (auto* part : partitions) {part->set_offset(RdKafka::TopicPartition::OFFSET_BEGINNING);}consumer->assign(partitions);std::cout << "分配分区: ";for (auto* part : partitions) {std::cout << part->topic() << "[" << part->partition() << "] ";}std::cout << std::endl;} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {// 撤销分区consumer->unassign();std::cout << "撤销分区" << std::endl;}}
};// 事件回调(如错误、日志)
class EventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event& event) override {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "错误事件: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_LOG:std::cout << "日志事件: " << event.str() << std::endl;break;default:break;}}
};int main() {// 配置参数const std::string brokers = "localhost:9092";   // Kafka服务器地址const std::string group_id = "demo-consumer-group";  // 消费者组IDconst std::vector<std::string> topics = {"demo-topic"};  // 订阅主题// 创建配置对象RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);std::string errstr;// 设置全局配置if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK ||conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK ||conf->set("rebalance_cb", new RebalanceCb(), errstr) != RdKafka::Conf::CONF_OK ||conf->set("event_cb", new EventCb(), errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置失败: " << errstr << std::endl;return 1;}// 设置自动提交(可选,这里禁用自动提交,手动提交位移)conf->set("enable.auto.commit", "false", errstr);// 设置主题配置:从最早位置开始消费(若分区无已提交位移)topic_conf->set("auto.offset.reset", "earliest", errstr);conf->set("default_topic_conf", topic_conf, errstr);// 创建Consumer实例RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "创建Consumer失败: " << errstr << std::endl;return 1;}// 订阅主题if (consumer->subscribe(topics) != RdKafka::ERR_NO_ERROR) {std::cerr << "订阅主题失败" << std::endl;delete consumer;return 1;}std::cout << "开始消费主题 [" << topics[0] << "], 按 Ctrl+C 停止..." << std::endl;while (true) {// 拉取消息(超时1秒)RdKafka::Message* msg = consumer->consume(1000);if (!msg) continue;// 处理消息switch (msg->err()) {case RdKafka::ERR_NO_ERROR:std::cout << "消费消息 [" << msg->topic_name() << "][" << msg->partition() << "]: 偏移量=" << msg->offset() << ", 内容=" << static_cast<char*>(msg->payload()) << std::endl;// 手动提交位移(每消费5条提交一次)static int count = 0;if (++count % 5 == 0) {consumer->commitSync();std::cout << "已同步提交位移" << std::endl;}break;case RdKafka::ERR__PARTITION_EOF:std::cout << "到达分区末尾,等待新消息..." << std::endl;break;case RdKafka::ERR__TIMED_OUT:// 超时无消息,继续轮询break;default:std::cerr << "消费错误: " << msg->errstr() << std::endl;break;}delete msg;}// 清理资源(实际不会执行到,需通过信号捕获退出)consumer->unsubscribe();consumer->close();delete consumer;delete conf;delete topic_conf;return 0;
}
三、编译与运行
1. 依赖安装
  • librdkafka:需安装开发库(包含头文件和动态库)。
    • Ubuntu/Debian:
      sudo apt-get install librdkafka-dev
      
    • CentOS/Fedora:
      sudo yum install librdkafka-devel
      
    • 源码编译(推荐最新稳定版):
      git clone https://github.com/edenhill/librdkafka.git
      cd librdkafka
      git checkout v2.2.0  # 选择稳定版本
      ./configure --prefix=/usr/local
      make -j4
      sudo make install
      sudo ldconfig  # 更新动态链接库缓存
      
2. 编译命令

使用g++编译生产者和消费者代码(需链接 librdkafka++ 和 librdkafka 库):

 
# 编译生产者
g++ kafka_producer.cpp -o kafka_producer -lrdkafka++ -lrdkafka -lpthread# 编译消费者
g++ kafka_consumer.cpp -o kafka_consumer -lrdkafka++ -lrdkafka -lpthread
3. 运行步骤
  1. 启动 Kafka 集群(确保 Zookeeper 和 Broker 已运行)。

  2. 运行生产者(发送消息到demo-topic):

    ./kafka_producer
    

    输出示例:

    开始发送消息到主题 [demo-topic], 按 Ctrl+C 停止...
    已发送消息 #1: Demo消息 #1 @ Tue May 28 15:30:00 2024
    消息投递成功 [demo-topic][0]: 偏移量=0, 大小=35字节
    已发送消息 #2: Demo消息 #2 @ Tue May 28 15:30:02 2024
    消息投递成功 [demo-topic][1]: 偏移量=0, 大小=35字节
    
  3. 运行消费者(从demo-topic消费消息):

    ./kafka_consumer
    

    输出示例:

    开始消费主题 [demo-topic], 按 Ctrl+C 停止...
    分配分区: demo-topic[0] demo-topic[1] 
    消费消息 [demo-topic][0]: 偏移量=0, 内容=Demo消息 #1 @ Tue May 28 15:30:00 2024
    消费消息 [demo-topic][1]: 偏移量=0, 内容=Demo消息 #2 @ Tue May 28 15:30:02 2024
    已同步提交位移

 

0voice · GitHub 

http://www.dtcms.com/wzjs/311189.html

相关文章:

  • 专做批发的网站电子全网媒体发布平台
  • 新开家政如何做网站重庆百度
  • 东莞php网站建设百度知道提问
  • 郑州建站公司网站百度官网
  • 深圳网站建设响应式北大青鸟培训机构官网
  • 找外包公司开发app要注意什么北京seo诊断
  • 四川响应式网站哪家好优化网站标题
  • 上海网站建设升企业策划方案怎么做
  • 轻量级应用服务器wordpress优化关键词推广
  • 批量域名注册查询武汉网站推广优化
  • 网站建设倒计时代码奉化首页的关键词优化
  • 网站中在线咨询怎么做网络推广哪个平台效果最好
  • 南宁网页制作招聘网seo排名赚钱
  • 做论坛网站时应该注意什么有了域名怎么建网站
  • 网站做动态虚线百度关键词指数
  • 工信部门备案网站松原新闻头条
  • 忘了网站链接怎么做微商软文范例大全100
  • 友汇网网站建设管理后台网站免费广告投放网站
  • 自己建设网站平台步骤北京正规seo搜索引擎优化价格
  • 网站建设 风险防控四年级新闻摘抄大全
  • 做名片的网站叫什么来着b站2020推广网站
  • 做网站要买多少服务器空间全国新冠疫情最新情况
  • 域名搭建网站网站推广途径和推广要点
  • 网站开发的课程设置网站推广公司
  • 徐州市鼓楼区建设局网站网络优化需要哪些知识
  • 世界军事新闻视频长沙网站seo优化
  • 关于做真实的自己视频网站企业网络推广最简单方法
  • 南浔建设网站网站推广的技术有哪些
  • 一个域名两个网站欧洲站fba
  • 电子商务网站建设实验原理百seo排名优化