当前位置: 首页 > news >正文

64.微服务保姆教程 (七) RocketMQ--分布式消息中间件

RocketMQ–分布式消息中间件

一、MQ

1、什么是MQ

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

2、MQ的应用场景

消息队列在实际应用中常用的使用场景有异步处理,应用解耦,流量削锋和消息通讯。

1、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行的方式和并行方式。

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户。
在这里插入图片描述

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

在这里插入图片描述

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

在这里插入图片描述

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍!

2、应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

在这里插入图片描述

传统模式的缺点:

假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

在这里插入图片描述

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

3、流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛!

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

可以控制活动的人数,可以缓解短时间内高流量压垮应用。

在这里插入图片描述

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

秒杀业务根据消息队列中的请求信息,再做后续处理。

4、消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

点对点通讯:

在这里插入图片描述

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:

在这里插入图片描述

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

3、几大常见MQ产品比较

市场上比较常见的 MQ 产品有 ActiveMQ、RabbitMQ、RocketMQ 和 Kafka。

在这里插入图片描述

二、RocketMQ

1.什么是RocketMQ

RocketMQ是阿里巴巴开源的一款高性能、高吞吐量的分布式消息中间件,RocketMQ使用Java语言开发,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

2.RocketMQ 特点

1.支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型

2.在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)

3.支持拉(pull)和推(push)两种消息模式

pull其实就是消费者主动从MQ中去拉消息,而push则像rabbit MQ一样,是MQ给消费者推送消息。但是RocketMQ的push其实是基于pull来实现的。
它会先由一个业务代码从MQ中pull消息,然后再由业务代码push给特定的应用/消费者。其实底层就是一个pull模式

4.单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)

5.支持多种消息协议,如 JMS、MQTT 等

6.分布式高可用的部署架构,满足至少一次消息传递语义(RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)

7.提供 docker 镜像用于隔离测试和云集群部署

8.提供配置、指标和监控等功能丰富的 Dashboard

3 RocketMQ 优势

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
支持 18 个级别的延迟消息(Kafka 不支持)
支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)
支持重复消费(RabbitMQ 不支持,Kafka 支持)

4、RocketMQ 核心组成部分

RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

在这里插入图片描述

1、Name Server

NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。

Name Server 主要包括两个功能:

  • Broker管理:NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理:每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。任意一个 Name Server 包含所有集群的信息。

2、Broker

集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度);

Broker会注册到 Name Server上去,无论是否是主从, 每个 Broker 都会注册到 Name Server 上;

Broker部署相对复杂,Broker分为Master和Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker 与Name Server集群中的所有节点建立长连接,定时(每隔30秒)注册Topic信息到所有Name Server。Name Server定时(每隔10秒)扫描所有存活的Broker连接,如果Name Server超过两分钟没有收到心跳。则Name Server断开与Broker 的连接。

每个 Broker 都会创建一个 consumerOffset.json 的文件,记录当前消费的节点指向了哪条消息,即消费的偏移量;偏移量是由 Consumer 上报的,Consumer 会定时或者 Kill 阶段提交各自对应 queue 的 offset 位置,为了避免消息的重复推送;consumerOffset.json 的文件格式:

**
在这里插入图片描述

3、producer

Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server中取Topic路由信息,并向提供Topic服务的Master建立长连接(基于 Netty),且定时向Master发送心跳。Producer 完全无状态,可集群部署。

Producer 每隔30秒(由ClientConfig的pollNameServerInterval)从Name Server 获取所有的Topic 队列的最新情况,这意味着如果Broker 不可用,Producer 最多30秒感知到。在此期间内发往Broker的所有消息都会失败。

Producer 每隔30秒 (由ClientConfig中heartbeatBrokerInterval决定) 向所有关联的 Broker 发送心跳,Broker 每隔10秒扫描所有存活的的连接,如果Borker 在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4、Consumer

Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取 Topic 路由信息,并向提供Topic 服务的Master、 Slave 建立长连接(基于 Netty)且定时向 Master、Slave 发送心跳。Consumer 既可以从Master 订阅消息,也可以从Slave 订阅消息,订阅规则由 Broker 配置决定。

Consumer 每隔30秒 从Name Server 获取Topic 的最新队列情况,这意味着Broker 不可用时,Consumer 最多需要30秒 即可感知。

Consumer 每隔30秒(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的Broker 发送心跳,Broker 每隔 10 秒扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group 的所有Consumer发出通知,Group内的所有Consumer 重新分配队列,然后继续消费。

当Consumer得到 Master 宕机通知后,转向Slave 消费,Slave 的消息不对保证100%都同步过来了,因此会有少量的消息丢失。但是一旦Master 恢复,未同步过去的消息会被最终消费掉。

消费者队列是消费者连接之后(或之前连接过)才创建的。我们将原生的消费者标识由{IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不同都认为是不同的消费端,每个消费端会拥有一份自己的消费队列(默认是Borker队列数量*Broker数量)。新挂载的消费都队列中拥有CommitLog 的所有数据。

5、RocketMQ消息模型的术语

a) 消息:消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

b) 主题:Topic表示一类消息的集合,每个主题包含若干消息,是RocketMQ进行消息订阅的基本单位。一个发送者可以发送消息给一个或者多个Topic;一个消息接受者可以订阅一个或多个Topic消息;

c) 标签:为消息设置的标签,用于将同一个topic下区分不同类型的消息,可以理解为Topic是消息的一级分类,Tag是消息的二级分类。

d) 队列:存储消息的物理实体,一个Topic可以包含多个Queue,Queue也叫消息分区,一个Queue中的消息只能被一个消费者组中的一个消费者消费,一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。每个 Topic 在创建之初都会默认创建 4 个队列(queue-0,queue-1,queue-2,queue-3),每个队列都会对应一个持久化的文件;Producer 向 Broker 上的 Topic 发送消息,若发现队列没有创建持久化的文件,则会创建相应的持久化文件 queueLog,queueLog 记录的每条消息在 CommitLog 中的位置等信息。

e) 消息标识
RocketMQ中每个消息都有唯一的消息ID,且可以携带具有业务标识的key,以便对消息的查询。

f) 组(Group),可分为生产者组和消费者组:
生产者组:同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息,且生产者发送后崩溃,则Broker服务器会联系同一个生产者组的其他生产者实例以提交或者回溯消费。
消费者组:同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面实现了负载均衡和容错。消费者组的消费者实例必须订阅完全相同的topic。

g) 偏移量:队列Queue中的offset,可以认为就是下标,消息队列可看做数组。

各角色之间的关系

Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
Broker:暂存和传输消息;举例:邮局
NameServer:管理Broker;举例:各个邮局的管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic

相关文章:

  • Excel 数据 可视化 + 自动化!Excel 对比软件
  • IoTDB磁盘I/O性能监控与优化指南
  • 力扣-hot100 (矩阵置零)
  • 机器学习实操 第二部分 神经网路和深度学习 第13章 使用TensorFlow加载和预处理数据
  • 高等数学第六章---定积分(§6.2定积分在几何上的应用2)
  • Elasticsearch知识汇总之 ElasticSearch高可用方案
  • [ linux-系统 ] 常见指令2
  • 开源向量大模型推荐:2025年技术选型指南
  • 模板模式 VS 建造者模式
  • Sublime Text快速搭建Lua语言运行环境
  • 可以下载blender/fbx格式模型网站
  • 【C++游戏引擎开发】第31篇:物理引擎(Bullet)—碰撞检测系统
  • 学习Python网络爬虫的实例
  • SpringBoot 集成滑块验证码AJ-Captcha行为验证码 Redis分布式 接口限流 防爬虫
  • 数据清洗-电商双11美妆数据分析
  • Python入门(一)
  • 怎样通过API 实现python调用Chatgpt,gemini
  • 爱情的本质是什么--deepseek
  • 20250506联想Lenovo笔记本电脑的USB鼠标失效之后在WIN10下的关机的方法【触摸板被禁用】
  • Hologres x 函数计算 x Qwen3,对接MCP构建企业级数据分析 Agent
  • 特朗普称不会为了和中国谈判而取消对华关税,外交部回应
  • 云南禄丰尾矿坍塌事故搜救正在进行,被掩埋的四辆工程车已找到
  • 甘肃临夏州政协委员马全成涉嫌诈骗罪,被撤销政协委员资格
  • 降雪致长白山天池景区关闭,有游客在户外等待一小时,景区回应
  • 经济日报头版评论:矢志奋斗筑梦青春中国
  • 俄罗斯期望乌克兰在停火期间采取行动缓和局势