RocketMQ基础知识
RocketMQ基础知识
简介
RocketMQ是阿里巴巴开源,后来捐赠给Apache基金会的一款分布式消息中间件。
因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
领域模型
RocketMQ使用异步通信方式,以及发布订阅的消息传输模型。
模型图(源自官网):
如上图所示,RocketMQ的领域模型主要分为三部分:消息生产、消息存储、消息消费。
生产者将消息发送到RocketMQ的服务端,消息被存储在服务端的主题中,消费者通过订阅主题来消费消息。
主题
定义
主题是RocketMQ中消息传输和存储的最外层容器(顶层容器),用于区分不同类业务逻辑的消息。主要作用:
- 隔离不同类数据:将不同类的数据发送到不同的主题中,就可以实现存储的隔离性、订阅的隔离性。
- 标识数据的身份及权限:消息本身是匿名的,可以将不同身份的消息发送到特定的主题中,就可以区分出不同消息的身份以进行权限管理。
主题实际是一个逻辑概念,每个主题内可以由多个队列,消息的存储是由队列来实际完成的。
内部属性
-
主题名称:由用户创建主题时定义,用于标识主题,集群内全局唯一。
-
队列列表:创建主题时可以定义队列的数量,系统会根据队列数量给主题分配队列,一个主题至少包含一个队列。
-
消息类型:一个主题只能接收一类消息:
Normal:普通消息,没有特殊用途,无需特殊处理的消息。
FIFO:顺序消息,可以保证消息的投递顺序严格按照消息的发送顺序(消费顺序应由消费者自己保证)。
Delay:定时/延时消息,消息发送后并不立即投递,在一定时间后才投递。
Transaction:事务消息,用于保障分布式事务一致性。
队列
定义
队列是主题内的次级容器,是消息的最小存储单元,也是消息传输和存储的实际物理容器。主要作用:
- 顺序性:同一队列的消息天然按照其存入的顺序保存。
- 流式操作:队列可以通过索引访问任意位置的消息,可以实现聚合读取、回溯读取等操作。
内部属性
- 读写权限:可以控制队列是否可以读写数据(读写、只读、只写、不可读写)。
消息
定义
消息是RocketMQ中的最小数据传输单元。主要特点:
- 不可变性:消息构建后,不可被修改。
- 持久化:RocketMQ默认会对消息进行持久化,以保证可回溯性和可恢复性。
内部属性
- 主题名称:该消息所属的主题的名称,主题名称集群内全局唯一。
- 消息类型:当前消息的类型(Normal、FIFO、Delay、Transaction)。
- 消息队列:该消息所属的主题内的队列。
- 消息位点:该消息在队列中的位置(0~Long.Max)。
- 消息ID:消息的集群内唯一标识,系统自动生成的由数字和大写字母组成的32位字符串。
- 索引Key(可选):可以给消息设置不同的Key,用于区分不同消息,以及快速查找消息。
- 标签Tag(可选):消费者可以通过标签Tag来过滤消息,一条消息最多一个Tag。
- 定时/延时时间(可选):若为定时/延时消息,则需要为消息设置定时/延时时间,最大为40天。
- 消息发送时间:生产者客户端系统自动填充的系统本地毫秒级时间戳。
- 消息保存时间:服务端完成消息存储时,服务端系统自动填充的系统本地毫秒级时间戳。
- 消费重试次数:消息消费失败后,服务端系统会自动记录消息的重新投递次数。
- 自定义属性:生产者可以自定义字符串键值对属性。
- 消息负载:消息的实际业务数据,生产者负责编码,会按照二进制字节传输。
约束
消息必须为对应主题指定的消息类型。
不得超过消息类型的最大限制:Normal/FIFO:4MB ,Delay/Transaction:64KB
生产者
定义
生产者是构建消息并将消息发送到服务端的运行实体。
生产者和主题是多对多的关系,可以据此实现生产者的水平扩展和容灾。
内部属性
-
客户端ID:由RocketMQ自动生成,用于区分不同的生产者,集群内全局唯一,无法修改。
-
通信参数:
接入点信息(必填):要连接的服务端的地址。
身份认证信息(可选):若服务端开启身份识别和认证,需要传输用于身份认证的凭证信息。
请求超时时间(可选):网络请求调用的超时时间。
-
预绑定主题列表:对于Transaction消息,必须要预绑定主题列表,因为生产者在故障、重启恢复时,需要检查事务消息的主题中是否有未提交的事务消息。对于非Transaction消息,可以根据后续发消息的目标主题动态变更。
-
事务检查器:在事务消息机制中,生产者需要提供事务检查器,用于返回业务事务是否成功。
-
发送重试策略:生产者在消息发送失败时的重试策略。
消费者分组
定义
消费者分组是承载多个消费行为一致的消费者的负载均衡分组。
消费者分组不是一个运行实体,而是一个逻辑资源。通过消费者分组内初始化多个消费者实现消费能力的水平扩展和高可用容灾。
同一消费者分组下的消费者安装统一的消费行为和负载均衡策略消费消息。
内部属性
- 消费者分组名称:消费者分组的名称,用于区分不同的消费者分组,集群内全局唯一。
- 投递顺序:RocketMQ向消费者客户端投递消息的顺序:并发投递(默认)、顺序投递。
- 消费重试策略:消费者消费失败时,系统会根据重试策略(最大重试次数、重试间隔)将消息重新投递给消费者。
- 订阅关系:该消费者分组的订阅关系集合。包括订阅的主题、消息过滤规则等。订阅关系由消费者动态注册到消费者分组中,服务端会持久化订阅关系并匹配消息的消费进度。
约束
同一消费者分组下的消费者必须具有一致的消息投递顺序,以及消费重试策略。
消费者
定义
消费者是RocketMQ中用来接收并处理数据的运行实体,通常被集成在业务系统中。可以定义行为:
- 消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。
- 消费者类型:多种消费者类型:PushConsumer、SimpleConsumer、PullConsumer。
- 消费者本地运行配置:控制消费者客户端本地的运行配置:线程数、并发度等。
内部属性
-
消费者分组名称:消费者关联的消费者分组名称,消费者必须关联一个指定的消费者分组。
-
客户端ID:由RocketMQ自动生成,用于区分不同的消费者,集群内全局唯一,不可修改。
-
通信参数:
接入点信息(必填):要连接的服务端地址。
身份认证信息(可选):若服务端开启身份识别和认证,需要传输用于身份认证的凭证信息。
请求超时时间(可选):网络请求调用的超时时间。
-
预绑定订阅关系列表:消费者订阅的主题列表,可以动态变更。
-
消费监听器:服务端将消息推送给消费者后,监听器会调用消费逻辑。
订阅关系
定义
订阅关系是消费者获取消息、处理消息的规则和状态配置。
订阅关系由消费者分组动态注册到服务端,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
订阅关系可以控制如下传输行为:
- 消息过滤规则:控制消费者对主题内的哪些消息进行消费。
- 消费状态:RocketMQ默认提供订阅关系持久化功能,当消费者离线后再次上线,可以获取到离线前的消费进度并再次消费。
订阅关系判断原则
RocketMQ的订阅关系以消费者分组和主题为单位进行设计,一个订阅关系指的是某个消费者分组对某个主题的订阅。
不同消费者对同一主题的订阅相互独立:
图片源自官网
同一消费者分组对不同主题的订阅相互独立:
图片源自官网
内部属性
-
过滤类型:消息过滤方式的类型:
- TAG过滤:通过Tag标签进行过滤。
- SQL92过滤:通过SQL语法进行过滤。
-
过滤表达式:过滤规则的表达式。
领域模型
顺序消息
顺序消息强调多条消息间的先后顺序关系,可以支持消费者按照发送消息的先后顺序获取消息。
发送顺序消息时需要为每条消息指定一个消息组,同一消息组内的消息才可以保证先进先出的顺序关系。
消息生产顺序性的保证:
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者间的消息即使在同一消息组也无法保证顺序性。
- 串行发送:如果生产者使用多线程并行发送,则不同线程产生的消息无法保证顺序性。
保证了消息生产的顺序性后,RocketMQ会将同一消息组内的消息按照发送顺序存储在同一个队列中。不同消息组的消息可以存储在同一个队列中,但不保证消息的连续性。
图片源自官网
消费的顺序性应当由消费者自己保证。
顺序消息的重试次数应当在合理的范围内,不应长时间阻塞后续消息的消费。
事务消息
事务消息通过二阶段提交的方式保证了分布式事务的最终一致性。
事务消息处理流程:
-
生产者将事务消息发送给RocketMQ服务端。
-
服务端收到消息并持久化存储后返回给生产者ack确认,此时事务消息还暂时不能发送给消费者,这种状态下的事务消息即为半事务消息。
-
生产者开始执行本地事务。
-
生产者根据本地事务执行结果,选择向服务端提交二次确认结果(Commit/Rollback):
- Commit:服务端将半事务消息投递给消费者。
- Rollback:服务端将半事务消息回滚,不发送给消费者。
-
在网络异常等情况下,服务端可能长时间未收到生产者的二次确认结果,或则收到的二次确认结果异常,服务端将会更具设置的回查间隔时间和最大回查次数,对生产者发送回查消息。
-
生产者收到回查消息后,需要检查对应的本地事务执行结果,并向服务端提交二次确认(Commit/Rollback)。
消费者分类
RocketMQ提供了不同的消费者类型,用来满足不同业务场景下的消费需求。
同一消费者分组内,应只包含同一种消费者类型
PushConsumer
PushConsumer是一种高度封装的消费者类型,只需要在消费者初始化时注册一个监听器,并在监听器内部实现消息处理逻辑,就可以自动化实现消息的获取、触发监听器以及消息重试处理。
PushConsumer的消费者监听器要返回执行结果:
- ConsumeResult.SUCCESS:代表执行成功,服务端更新消费进度。
- ConsumeResult.FAILURE:代表执行失败,根据消费重试逻辑进行消费重试。
- 其它:例如抛出异常、消费超时等情况,按照执行失败的逻辑处理,根据消费重试逻辑进行消费重试。
PushConsumer的消息实时处理能力是基于Reactor线程模型实现的。通过一个轮询线程将消息异步拉取到缓存队列中,再分别提交到消费线程中,触发监听器调用本地消费逻辑:
图片源自官网
SimpleConsumer
SimpleConsumer是一种自由度较高的消费者类型,需要自定义业务逻辑来调用接口获取消息,然后分发给业务线程处理消息,最后调用提交接口将处理结果返回给服务端。
相较于PushConsumer:
- SimpleConsumer可以在消费消息时灵活自定义消息的预计处理时长。
- SimpleConsumer内部没有复杂的线程封装,可以自定义实现异步分发、批量消费等。
- SimpleConsumer由业务主动调取接口获取消息,因此可以自定义消息获取频率、消费频率。
消息过滤
消费者在订阅了某个主题后,RocketMQ会将该主题中的所有消息都投递给该消费者,消费者可以设置消息过滤规则来避免接收到大量无效消息。
消息过滤关键流程:
- 生产者在初始化消息时预先为消息设置一些属性和标签。
- 消费者通过调用订阅关系注册接口,上报自己的过滤规则。
- 服务端会根据消费者上报的过滤规则筛选合适的消息发送给消费者。
消息过滤分类为:
-
Tag标签过滤:
生产者在发送消息时,最多为每个消息设置一个Tag标签(建议长度不超过128字符)。
消费者在设置订阅关系时,可以配置需要的Tag标签。 -
SQL属性过滤:
生产者在发送消息时,可以为每个消息设置多个属性(Key-Value)。
Tag标签是一种系统属性,所以SQL过滤方式也兼容Tag过滤。
具体过滤语法请参考官方文档
消费者负载均衡
因为RocketMQ支持同一个主题被不同消费者分组订阅,因此,可以根据消费者分组和消费者的不同组合,实现两种消费效果:
图片源自官网
- 消费组间广播消费:每个消费者分组只包含一个消费者,每个消费者得到消费者分组内的所有消息,实现广播的效果。
- 消费组内共享消费:每个消费者分组内包含多个消费者,多个消费者共同分担消费者分组内的所有消息,实现水平拆分和负载均衡的效果。
在共享消费模式下,同一个消费者分组内的多个消费者,共同分担消费者分组内的所有消息,但是哪条消息分给哪个消费者,就涉及到了消费者负载均衡的问题。
消费者负载均衡可以分为两种模式:消息粒度负载均衡,队列粒度负载均衡。
消息粒度负载均衡
图片源自官网
如上图所示,在消息粒度负载均衡中,同一消费者分组中的消费者平分主题中的所有消息,主题中的消息将随机平均分配给消费者分组内的所有消费者。
当消费者获取到某条消息后,RocketMQ服务端会对该条消息加锁,保证该条消息对其他消费者不可见,直到该条消息消费成功或消费超时。
图片源自官网
如上图所示,对于同一个消息组内的顺序消息,RocketMQ服务端保证按照服务端存储的先后顺序将消息发送给消费者。当前置消息未消费完成时,将会锁定后置消息,保证同一消费组内的消息被串行消费。
基于消息粒度的负载均衡,可以保证每个消费者都可以平均分配到消息,而不会出先部分消费者空闲的情况。
队列粒度负载均衡
图片源自官网
如上图所示,在队列粒度负载均衡中,主题中的队列会根据统一的算法,分配给消费者分组中特定的消费者。为了避免消息被重复消费,每个队列仅能被一个消费者消费。
当队列数量大于消费者数量时,会出现部分消费者空闲的情况,因此没有消息粒度负载均衡策略灵活。但是队列粒度负载均衡,在流式处理场景下具有优势,因为可以保证同一队列中的消息被同一个消费者消费,对批量处理,聚合处理更友好。
消费进度管理
每条消息在队列中都有一个唯一的Long类型的坐标(0~Long.Max),这个坐标被定义为消息位点。
RocketMQ定义队列中的最早的一条消息位点为MinOffset,最新一条消息的位点为MaxOffset。
队列在理论上是可以无限延长的,但受限于物理机的空间,RocketMQ会滚动删除队列中最早的消息。因此MinOffset和MaxOffset会一直滚动递增。
如果RocketMQ在消费者消费完消息后,就删除该消息,那么其它订阅了该主题的消费者将无法消费该消息。因此,消息被消费过后并不是被立即删除,RocketMQ会为每个消费者分组维护一份消费记录,记录消费者分组消费主题中某一队列时消费过的最新一条消息的位点,也就是该消费者分组的消费位点。
当消费者分组重新上线后,就可以根据消费位点继续消费了。
重置消费位点:
- 如果业务需要消费历史消息,可以将消费位点重置到指定时刻,服务端会自动匹配最近的消费位点。
- 如果消息大量堆积,可以更新消费位点以绕过部分消息,减少下游处理的压力。
fset会一直滚动递增。
如果RocketMQ在消费者消费完消息后,就删除该消息,那么其它订阅了该主题的消费者将无法消费该消息。因此,消息被消费过后并不是被立即删除,RocketMQ会为每个消费者分组维护一份消费记录,记录消费者分组消费主题中某一队列时消费过的最新一条消息的位点,也就是该消费者分组的消费位点。
当消费者分组重新上线后,就可以根据消费位点继续消费了。
重置消费位点:
- 如果业务需要消费历史消息,可以将消费位点重置到指定时刻,服务端会自动匹配最近的消费位点。
- 如果消息大量堆积,可以更新消费位点以绕过部分消息,减少下游处理的压力。
- 如果出现消费错误等异常,导致需要重新消费历史消息,可以将消费位点更新到历史位点,实现消费回溯。