当前位置: 首页 > wzjs >正文

残疾人服务平台seo网站推广经理

残疾人服务平台,seo网站推广经理,邢台做移动网站的地方,南昌做网站公司哪家好问题 1: 为什么使用消息队列?服务搭建KafkaRocketMQ编写 docker-compose.yml运行docker compose修改配置文件(解决网络问题)创建一个 topic运行skd尝试发送与接收功能 压力测试:Kafkabatch-size(批量大小)分区数:发送数据(MB/s)消费-线程数(15 分区) RocketMQ生产生产者数量批…

  • 问题 1: 为什么使用消息队列?
  • 服务搭建
    • Kafka
    • RocketMQ
      • 编写 docker-compose.yml
      • 运行docker compose
      • 修改配置文件(解决网络问题)
      • 创建一个 topic
      • 运行skd尝试发送与接收功能
  • 压力测试:
    • Kafka
      • batch-size(批量大小)
      • 分区数:发送数据(MB/s)
      • 消费-线程数(15 分区)
    • RocketMQ
      • 生产
        • 生产者数量
        • 批量大小(batch-size)
      • 消费
        • 消费者数量
  • 其他配置
    • RocketMQ
      • 生产者类型
      • 消费者类型:
      • 消息的类型:
    • kafka
      • 生产者类型
  • 高可用
  • 消息队列幂等

问题 1: 为什么使用消息队列?

消息队列的作用:

削峰填谷

解耦

异步

具体的使用场景:

问题 1: 实际没有用过

比如: IM 系统; 秒杀/抢票的削峰填谷

服务搭建

Kafka

  1. 写一个 docker-compose.yml
version: '3'name: kafka-groupservices:zookeeper-test:image: zookeeperports:- "2181:2181"volumes:- zookeeper_vol:/data- zookeeper_vol:/datalog- zookeeper_vol:/logscontainer_name: zookeeper-testkafka-test:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: "localhost"KAFKA_ZOOKEEPER_CONNECT: "zookeeper-test:2181"KAFKA_LOG_DIRS: "/kafka/logs"volumes:- kafka_vol:/kafkadepends_on:- zookeeper-testcontainer_name: kafka-testvolumes:zookeeper_vol: {}kafka_vol: {}
  1. 运行docker compose
docker compose -f <yml 的文件路径> up -d
  1. 运行 sdk(go-Kafka)功能测试
const (topic         = "test-topic"brokerAddress = "localhost:9092"
)func TestKafka() {ctx := context.Background()// 1. 发送消息produceMessage(ctx)// 2. 启动消费者go consumeMessages()// 3. 等待10秒time.Sleep(10 * time.Second)fmt.Println("测试完成,退出程序")
}func produceMessage(ctx context.Context) {writer := &kafka.Writer{Addr:     kafka.TCP(brokerAddress),Topic:    topic,Balancer: &kafka.LeastBytes{},}defer writer.Close()msg := kafka.Message{Key:   []byte("test-key"),Value: []byte(fmt.Sprintf("测试消息 - %s", time.Now().Format(time.RFC3339))),}if err := writer.WriteMessages(ctx, msg); err != nil {log.Fatalf("发送消息失败: %v", err)}fmt.Printf("已发送消息: %s\n", msg.Value)
}func consumeMessages() {reader := kafka.NewReader(kafka.ReaderConfig{Brokers:   []string{brokerAddress},Topic:     topic,GroupID:   "test-group",MinBytes:  10e3,MaxBytes:  10e6,})defer reader.Close()fmt.Println("消费者已启动...")for {msg, err := reader.ReadMessage(context.Background())if err != nil {log.Printf("消费错误: %v", err)continue}fmt.Printf("消费到消息: %s\n", msg.Value)}
}

RocketMQ

编写 docker-compose.yml

version: '3.8'
services:namesrv:image: sha256:a1f797e48d967647d4c61b67130891c7ef4763fd33bddc6c2eba3067330305e8container_name: rmqnamesrvports:- 9876:9876networks:- rocketmqcommand: sh mqnamesrvbroker:image: sha256:a1f797e48d967647d4c61b67130891c7ef4763fd33bddc6c2eba3067330305e8container_name: rmqbrokerports:- 10909:10909- 10911:10911- 10912:10912volumes:- ./broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.confenvironment:- NAMESRV_ADDR=rmqnamesrv:9876depends_on:- namesrvnetworks:- rocketmqcommand: sh mqbroker -c /home/rocketmq/rocketmq-5.3.2/conf/broker.confproxy:image: sha256:a1f797e48d967647d4c61b67130891c7ef4763fd33bddc6c2eba3067330305e8container_name: rmqproxynetworks:- rocketmqdepends_on:- broker- namesrvports:- 8080:8080- 8081:8081restart: on-failureenvironment:- NAMESRV_ADDR=rmqnamesrv:9876command: sh mqproxy
networks:rocketmq:driver: bridge

运行docker compose

cd 到存放 docker-compose.yml 的文件夹或者docker compose up 指定文件

  • up 指定文件
docker compose -f < compose.yml文件路径> up -d

修改配置文件(解决网络问题)

  1. 进入容器
docker exec -it rmqbroker bash
  1. 修改配置
echo "brokerIP1 = 127.0.0.1" >> ../conf/broker.conf
  1. 重新运行
docker-compose restart

或者使用命令强行设置 IP

docker exec rmqbroker sh mqadmin updateBrokerConfig -b broker-name -k brokerIP1 -v 127.0.0.1 -n 127.0.0.1:9876
  1. 检查IP 是否设置好
docker exec rmqnamesrv sh mqadmin clusterList -n 127.0.0.1:9876

创建一个 topic

sh ./mqadmin updateTopic -n localhost:9876 -t test-topic -c DefaultCluster -a +message.type=NORMAL

运行skd尝试发送与接收功能

发送

// 测试 1: 单元测试:生产者发消息
func TestProducer(t *testing.T) {// 创建生产者实例p, err := rocketmq.NewProducer(// 设置 NameServer 地址producer.WithNameServer([]string{"127.0.0.1:9876"}),// 设置生产者组名producer.WithGroupName("test_producer_group"),// 设置重试次数producer.WithRetry(2),)if err != nil {fmt.Printf("create producer error: %s\n", err.Error())return}// 启动生产者err = p.Start()if err != nil {fmt.Printf("start producer error: %s\n", err.Error())return}defer p.Shutdown()// 准备发送的消息msg := &primitive.Message{Topic: TestTopic,Body:  []byte("Hello RocketMQ From Go Client"),}// 设置消息标签msg.WithTag("TestTag")// 设置消息键msg.WithKeys([]string{"TestKey"})// 发送消息ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()res, err := p.SendSync(ctx, msg)if err != nil {fmt.Printf("send message error: %s\n", err.Error())} else {fmt.Printf("send message success: result=%s\n", res.String())}// 等待中断信号优雅关闭sig := make(chan os.Signal, 1)signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)<-sig
}

接收

func TestConsumer(t *testing.T) {// 创建消费者实例c := NewConsumer(//消费组consumer.WithGroupName("testGroup"),// namesrv地址consumer.WithNameServer([]string{"127.0.0.1:9876"}),//消费模式consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),)defer c.Shutdown()// 订阅主题c.Subscribe(TestTopic, Handler1)// 创建生产者p := NewProducer(// 设置 NameServer 地址producer.WithNameServer([]string{"127.0.0.1:9876"}),// 设置生产者组名producer.WithGroupName("test_producer_group"),// 设置重试次数producer.WithRetry(2),)// 启动生产者err := p.Producer.Start()if err != nil {fmt.Printf("start producer error: %s\n", err.Error())return}defer p.Producer.Shutdown()for i := 0; i < 10; i++ {err = p.SendMsg(TestTopic, []byte(fmt.Sprintf("Hello RocketMQ From Go Client %d", i)))if err != nil {fmt.Printf("send message error: %s\n", err.Error())return}}// 等待 5s 后退出time.Sleep(5 * time.Second)
}

event-bus 多消费者

func TestConsumer2(t *testing.T) {// 创建消费者实例1c1 := NewConsumer(//消费组consumer.WithGroupName("testGroup1"),// namesrv地址consumer.WithNameServer([]string{"127.0.0.1:9876"}),//消费模式consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),)defer c1.Shutdown()// 订阅主题c1.Subscribe(TestTopic, Handler1)// 创建消费者实例2c2 := NewConsumer(//消费组consumer.WithGroupName("testGroup2"),// namesrv地址// namesrv地址consumer.WithNameServer([]string{"127.0.0.1:9876"}),//消费模式consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),)defer c2.Shutdown()// 订阅主题c2.Subscribe(TestTopic, Handler2)// 创建生产者producerSend()// 等待 5s 后退出time.Sleep(5 * time.Second)
}

压力测试:

Kafka

batch-size(批量大小)

  • 批量传输是提升吞吐量的关键参数,主要是提高传输效率(减少发送的次数)

条件: 发送线程数:1; topic 分区数 15 ; 数据大小:79 字节

go-kafka sdk测试结果

Batch-Size吞吐量 (MB/s)发送数据 (条/s)
12.734,177
50021.3269,620
100022.1279,746
500024303,797
1000024.3307,595
1500025.8326,582
2000025.8326,582
2500025.2318,987
3000025.5322,785
3500026.1330,380
5000021.3269,620

Kafka 自带的压测脚本 测试结果

batch.size记录发送速率 (records/sec)吞吐量 (MB/sec)平均延迟 (ms)最大延迟 (ms)50th 百分位延迟 (ms)95th 百分位延迟 (ms)99th 百分位延迟 (ms)99.9th 百分位延迟 (ms)备注
无设置214,334.71140.43213.22500.00200310413480默认配置
5,00070,993.4846.51658.112,310.006337471,1422,058服务器负载较低,性能较差
10,000134,605.8188.19345.87637.00335428533631性能显著提升
15,000199,399.20130.64234.3378.0233343496872服务器接近负载上限
15,000 (二次)186,382.87122.11247.37948.00233343496872略有下降,服务器负载较高
20,000250,269.04163.97177.39514.00168264390504性能进一步提升
25,000297,300.51194.78119.14420.00127194371413最佳性能

根据两个压测结果来看:

batch-size 越大,吞吐量越高,性能的瓶颈在于 troughput 的大小(接口 QPS/程序每秒能发多少数据给 Kafka)


  • batch-size 如何选择: 根据业务接口的 QPS 进行确定;在固定的 QPS 调用的情况下,batch-size 增加到一定值之后变化就不大了
    • 比如在 go 的 sdk 的情况下测试,batch-size 1000-5000 波动并不多,5000 之后甚至还有下降的趋势

分区数:发送数据(MB/s)

条件: 发送线程数 1; batch-size:5000;数据大小 79 字节

go sdk测试结果

分区数吞吐量 (MB/s)发送数据 (条/s)
125.5322,785
525316,456
1025.4321,518
1525316,456
2024303,797

Kafka 自带脚本压测结果

分区数 记录发送速率 (records/sec) 吞吐量 (MB/sec) 平均延迟 (ms) 最大延迟 (ms) 50th 百分位延迟 (ms) 95th 百分位延迟 (ms) 99th 百分位延迟 (ms) 99.9th 百分位延迟 (ms)
5 457,163.76 299.52 25.63 596.00 2 140 263 585
10 483,582.38 316.83 37.15 423.00 5 172 244 375
15 603,718.91 395.54 28.11 540.00 4 122 172 513
20 594,459.64 389.47 17.25 558.00 2 88 246 529

结论:

  1. 两个压测的数据差距非常大,可以说是背道而驰
  2. 因为分区的作用是改变多线程下的磁盘 IO;
    1. Kafka 每个分区都是一个单独的文件,单线程读写其实就是一个文件 IO 的效率,会有上下波动;但是并不大也就是一个文件的 IO 效率
    2. 但是在多线程情况下,IO 的效率将是每个文件的 IO 的总和(这一点可以很明显的体现在多线程读取 情况下,多分区的读取效率明显大幅提升

消费-线程数(15 分区)

sdk测试结果

消费者数量每个线程消费数据 (条/5s)每秒消费的总条数 (条/s)吞吐量 (MB/s)
17,931,7771,586,355119.9
34,221,217; 4,263,885; 4,130,6022,523,141190.6
62,738,983; 2,735,468; 2,630,453; 2,649,757; 2,739,442; 2,641,3243,183,086240.5
92,009,912; 1,968,823; 1,975,723; 1,965,374; 2,039,036; 1,968,965; 1,961,928; 1,899,827; 1,887,8323,563,936269.3
121,634,192; 1,623,847; 1,608,319; 1,705,832; 1,621,312; 1,696,289; 1,587,789; 1,634,192; 1,639,367; 1,666,854; 1,633,038; 1,672,5443,950,318298.5
151,832,556; 1,766,294; 1,810,132; 1,758,385; 1,791,158; 1,780,809; 1,784,259; 1,826,931; 1,800,389; 1,769,654; 1,791,158; 1,758,385; 1,813,582; 1,798,058; 1,785,9845,337,870403.3


**结论: **

  1. 当 (消费者数量<=分区数量): 消费者越多,消费越快
  2. 当(消费者数量>分区数量): 消费者越多,消费速度影响不大
    1. 原因是一个分区只能同时被一个消费者持有(相同 groupId 情况下),当消费者数量>分区情况下,有的消费者将无法获取到分区,进而无法消费到消息,知道有消费者的分区释放,才会尝试获取分区
    2. **在在 15 分区,20 个消费者情况下压力测试: 有 5 个分区没有消费到一条消息
      **

RocketMQ

生产

生产者数量
生产者线程数吞吐量 (MB/s)折算消息量 (万条/s)
10.44.0
50.55.0
100.66.0
1000.66.0
10000.55.0

**
**

批量大小(batch-size)
批量大小(条)吞吐量 (MB/s)折算消息量(万条/s)
100.55.0
5005.050.0
10007.070.0
15008.686.0
20008.080.0
25009.090.0
300010.0100.0
400010.3103.0
500010.2102.0
600010.6106.0
1000012.0120.0

消费

消费者数量
消费者数量各消费者消息数 (5秒)总消费量 (条)每秒总消费量 (条/s)吞吐量 (MB/s)
1[206,712]206,71241,3421.97
2[92,960, 93,272]186,23237,2461.78
3[90,064, 45,784, 45,800]181,64836,3301.73
4[45,418, 45,480, 45,120, 44,968]180,98636,1971.73
5[35,304, 35,336, 34,920, 35,008, 0]140,56828,1141.34
10[42,840, 43,184, 42,880, 43,039, 0×6]171,94334,3891.64

问题: 为什么消费者越多,性能月底?

按照 Kafka 的压测数据来讲,生产者与消费者数量越高,效率应该增加(在数量<分区数时)



其他配置

RocketMQ

生产者类型

  1. sync:同步发送;阻塞等待发送结果;
  2. async:异步发送;不会阻塞等待发送结果;

消费者类型:

  1. push: 监听的形式,监听 broker 的消息
  2. pull: 接口调用的形式,从通过调用接口的方式获取 broker 消息

消息的类型:

  1. 普通消息:

特点: 不保证顺序,性能最高

  1. 顺序消息(FIFO): 消息保证顺序消费
  2. 延时(delay):延时消息(只有到延时时间才可以消费)
  3. 事务:两阶段提交,开始是半提交状态,可以进行 rollback 与 commit 操作

kafka

生产者类型

  1. sync:同步发送;阻塞等待发送结果;
  2. async:异步发送;不会阻塞等待发送结果;

高可用

参考:https://rocketmq-learning.com/faq/ons-user-question-history16752/

RocketMQ 的高可用主要分为两个方面:

  1. 数据的冗余: RocketMQmaster 可以配置 selve 节点,冗余数据,保证数据不丢失;当 master 故障,selve 节点会接替成为新的 master 进行工作
  2. leader 节点选举: 当 RocketMQ 的中心节点(leader)宕机,其他节点会进行选举(raft 算法);选择出新的 leader 节点;保证服务的正常运行

消息队列幂等

消息队列的幂等主要分为两个阶段

  1. 消息队列的幂等:
    1. 避免发送方的重试导致出现多条消息,确保消息队列同一条消息
    2. 一般会使用全局唯一的 id 对消息进行去重,确保不会出现相同的消息
  2. 消费者(客户端)的幂等
    1. 避免消费者超时重试导致的重复消费问题
    2. 每条消息都回有全局唯一 id,每次消费都回先检查消息是否消费过了

case:

订单问题:

每一笔订单都回有一个全局唯一的订单 id; 每次消费都回检查是否消费过

参考

https://kafka.apache.org/documentation/#introduction

https://rocketmq.apache.org/zh/docs/featureBehavior/01normalmessage

https://blog.csdn.net/m0_71513446/article/details/143386962

https://rocketmq-learning.com/faq/ons-user-question-history16752/

若有收获,就点个赞吧

http://www.dtcms.com/wzjs/177260.html

相关文章:

  • 政府网站建设指引简述什么是网络营销
  • 网站开发技能seo搜索推广
  • 佛山网站开发哪家专业搜狐财经峰会直播
  • 被国家禁止访问的网站怎么打开市场推广计划方案模板
  • 自己做网站挣钱吗优化大师软件下载
  • 虚拟网站建设指导百度指数教程
  • 百度网站开发合同范本企业建站公司
  • 青岛互联网企业廊坊seo关键词优化
  • wap网站设计规范上海搜索引擎优化公司排名
  • 临淄网站制作首选专家项目推广平台排行榜
  • 东莞网页制作最新招聘信息安全优化大师
  • 工信部网站备案查询 验证码错误免费推广
  • 学服装设计后悔死了手机网站搜索优化
  • 作网站建什么网站可以长期盈利
  • 新闻发布会是什么意思网站优化
  • 在网站上发消息做宣传百度快照
  • 青岛网站备案深圳优化服务
  • 企业网站建设与推广江苏提升关键词排名收费
  • 在哪个网站找地理题做百度知道下载安装
  • 南京建设网360优化大师官方网站
  • 网站规划与建设规划书外包平台
  • 做原型交互的网站工具怎样注册网站建立网页
  • 杭州网站推广优化公司免费广告发布平台
  • 用固定ip做访问网站服务器推广平台哪儿有怎么做
  • 点开文字进入网站是怎么做的b站推广网站入口2023是什么
  • 网做英文网站关键词排名优化官网
  • 用php做商城网站的设计论文西安做网站哪家好
  • wordpress网站流量统计插件网络营销模式有哪些
  • 无需域名网站建设seo网站排名助手
  • 杭州网站建设公司推荐网络营销产品概念