消息队列总结
为什么需要消息队列?
随着互联网快速发展,业务规模不断扩张,技术架构从单体演进到微服务,服务间调用复杂、流量激增。为了解耦服务、合理利用资源、缓冲流量高峰,「消息队列」应运而生,常用于异步处理、服务解耦和流量削峰等场景。
- 订单系统:在电商系统中,订单的创建、支付、发货等步骤可以通过消息队列进行异步处理和解耦。
- 日志处理:使用消息队列将日志从应用系统传输到日志处理系统,实现实时分析和监控。
- 任务调度:在批量任务处理、任务调度系统中,通过消息队列将任务分发给多个工作节点,进行并行处理。
- 数据同步:在数据同步系统中,消息队列可以用于将变更的数据异步同步到不同的存储系统或服务。
消息队列的模型有哪些?
1、队列模型(也称点对点模型)
在队列模型中,消息从生产者发送到队列,并且每条消息只能被一个消费者消费一次。消费之后,消息在队列中被删除。适用于任务处理类场景,如一个任务只需要一个处理者执行。
2、发布/订阅模型(Publish/Subscribe)
在发布/订阅模型中,生产者将消息发布到某个主题(Topic),所有订阅了该主题的消费者都会接收到该消息。每个订阅者都会接收到相同的消息,适用于广播通知、实时推送等场景
RabbitMQ、RocketMQ、Kafka 技术选型总结
对比维度 | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
定位 | 功能丰富的消息中间件 | 分布式高可用消息中间件 | 高吞吐分布式日志系统 / 消息队列 |
协议支持 | AMQP,MQTT,STOMP,WebSocket | 自研协议,支持部分 RocketMQ 专属特性 | 自研 TCP 协议 |
吞吐量 | 中等(万级 TPS) | 高吞吐(10-20 万 TPS) | 极高吞吐(百万级 TPS) |
延迟 | 低延迟,毫秒级 | 毫秒级,RT 性能好 | 延迟相对高,适合大数据场景 |
可靠性 | 高,支持消息确认、持久化 | 高,支持消息可靠投递,事务消息 | 高,分区副本机制,适合大规模持久化 |
可用性 | 单节点或集群(镜像队列),集群复杂 | 自带 NameServer + Broker 集群,易扩展 | 高可用(分布式架构,Zookeeper/KRaft) |
功能特性 | 丰富(死信队列、延迟队列、插件机制) | 支持定时消息、事务消息、顺序消费 | 支持回溯消费、批量拉取、分布式存储 |
典型场景 | 业务解耦、异步削峰、轻量级系统 | 分布式事务、顺序消息、电商系统 | 大数据场景、日志收集、流式计算 |
运维复杂度 | 中等,Erlang 特殊运维 | 中等,自研协议,配套完善 | 相对高,依赖 Zookeeper(或 KRaft) |
社区 & 成熟度 | 社区活跃,成熟稳定 | 阿里主导,国内电商常用 | Apache 顶级项目,互联网公司主流 |
选型建议
- 轻量级、快速开发首选 RabbitMQ:比如内部系统解耦、异步发送邮件等。
- 高可靠、事务型场景推荐 RocketMQ:例如订单支付、分布式事务。
- 高吞吐、大数据日志场景推荐 Kafka:比如用户行为日志、埋点数据、日志分析。
需要 秒级延迟+事务性:选 RocketMQ
需要 批量消费+高吞吐:选 Kafka
需要 丰富的协议支持+易用性:选 RabbitMQ
RabbitMQ
一、 RabbitMQ如何实现延迟队列?
RabbitMQ 本身不支持延迟消息,但是可以通过它提供的两个特性 TTL(Time-To-Liveand Expiration ,消息存活时间)、DLX(Dead Letter Exchanges,死信交换器)来实现。
还可以利用 RabbitMQ 插件来实现。
使用TTL + 死信队列:
在 RabbitMQ 中,通过设置消息的 TTL 和死信交换器可以实现延迟队列。不给原队列(正常队列)设置消费者,当消息在原队列中达到 TTL后,由于还未被消费则会被转发到绑定的死信交换器,消费者从死信队列中消费消息,从而实现消息的延迟处理。
使用 RabbitMQ插件:延迟消息插件(rabbitmg-delayed-message-exchange)
通过安装 RabbitMQ 的延迟消息插件,可以直接创建延迟交换器(DelayedExchange)在发送消息时,指定消息的延迟时间,RabbitMQ 会在消息达到延迟时间后将其转发到对应的队列进行消费。
TTL 和 DLX 简要说明
TTL(Time-To-Live):指消息在队列中的存活时间。你可以为队列中的所有消息统-设置TTL,也可以为每条消息单独设置TTL。当消息超过TTL时,消息会被标记为过期。
死信队列(DLX):当消息在原队列中过期、被拒绝(nack/reject)或队列已满时消息会被转发到绑定的死信交换器(DLX)。DLX 可以将消息重新路由到死信队列(即这里的延迟队列)。
TTL + DLX 时序问题
因为队列的特点就是先进先出,如果发送的消息延迟的时间不同,例如第一个延迟 10s、第二个延迟 5s、第三个延迟 1s。
那么后面的消息,需要等 10s 的消息消费完才能消费。当 10s 消息未被消费,则后续的消息都会被阻塞,即使消息设置了更短的延迟。
延迟消息插件原理
插件提供了一种新的交换器类型x-delayed-message 。这种交换器可以像普通的交换器样,接收消息并根据路由键将消息路由到相应的队列。只不过x-delayed-message 类型的交换机接收消息投递后,不会直接路由到队列中,而是存储到 Mnesia(Mnesia 是 Erlang运行时中自带的一个数据库管理系统)中。等到消息达到可投递时间,消息才会被投递到目标队列中。
二、 RabbitMQ中消息什么时候会进入死信交换机?
消息会在以下几种情况进入死信交换机(Dead-LetterExchange,DLX)
- 消息被拒绝(Rejection):当消费者使用 basic.reject或 basic.nack明确拒绝消息,并且不要求重新投递(requeue 设置为 false)时,消息会被直接投递到死信交换机。
- 消息过期(TTL Expiration):RabbitMQ 支持为消息或队列设置TTL(Time-To-Live),即生存时间。当消息超过指定的存活时间后还未被消费,它会自动变为死信并被发送到死信交换机。
- 队列达到最大长度(或总大小):如果队列设置了最大长度(x-max-length 或 x-max-length-bytes),当消息数量或总大小超出限制时,最早进入队列的消息会被移入死信交换机。这些条件下进入死信交换机的消息,可以再通过死信队列进行日志记录、重新处理或监资。
各种场景下的死信队列应用
- 消费者拒绝(Rejection):当消费者不能处理消息时,使用 basic.reject或将其拒绝并路由到死信队列。死信队列可以用来监控和分析消费者无法处basic.nack理的异常消息。
- 过期消息处理:在电商订单系统中,TTL机制可用来控制支付超时的订单,将超时未支付的订单放入死信队列中,并在后续对这些订单进行自动取消。
- 队列限流:如果队列有容量限制,当消息超出队列容量时,最早进入的消息会被丢入死信队列,适用于系统负载较高的场景中进行限流和控制。
死信队列的注意事项
避免死信循环:在配置死信交换机时,避免将死信交换机和原队列配置为相互死信的循环,防止消息重复转发引起的系统资源浪费。
三、 RabbitMQ无法路由的消息会去到哪里?
在 RabbitMQ 中,无法被路由的消息通常有以下几种处理方式
- 丢弃消息:默认情况下,若消息无法找到符合条件的队列(即没有匹配的绑定关系),RabbitMQ 会直接丢弃消息,不会进行特殊处理。
- 备份交换机(Alternate Exchange):可以为交换机配置一个备份交换机,无法被路由的消息将被发送到备份交换机,再由备份交换机根据其绑定关系决定如何处理消息。例如,可以将这些消息发送到指定队列进行保存或处理。
- 消息回退(Return Listener):在使用 mandatory 参数的情况下,如果消息无法路由,则会触发返回机制,将消息退回到生产者,这样生产者可以自行处理未路由的消息
不同策略的应用场景
- 备份交换机:适用于业务场景中需要监控或重处理未路由消息的情况 Return Listener
- 回退机制:适用于生产者对消息流控制严格的场景,确保无路由消息被生产者感知。
- 死信交换机:适合管理超时、失败和未能路由的消息,适用于需要严格消息管控的系统。
备份交换机 VS 死信交换机
1、备份交换机(Alternate Exchange)场景:
关键词:交换机没人理我(没人绑定),我总得找个地方放消息吧)
假设你有一个订单服务,向 RabbitMQ 的某个“业务交换机”发订单通知:
• 你把消息发给了交换机 order-exchange;
• 理论上,它应该会被路由到 order.created.queue;
• 但 你搞错了路由键,或者压根没人绑定这个交换机;
• 这时,交换机找不到任何绑定的队列,它会:
①丢弃消息?NO!不合适!
②转发给一个你事先配置好的 备份交换机(AE),比如 backup-exchange;
③你可以监听这个 backup-exchange,做日志记录 / 告警 / 手动重试。
✅ 备份交换机适合场景:
• 想监控和记录“消息发出后却没人接”的情况;
• 比如有些关键系统要求生产消息不能丢(风控、结算等);
• 但并不一定要马上消费,只要先收着,慢慢分析也行。
2、死信交换机(DLX)场景:
关键词:队列能接,但消费失败 / 异常了,我得另找办法处理这些“烂尾消息”
比如你有个延迟支付队列 payment.delay.queue,你设置了 TTL(延迟 30 分钟):
• 有用户下单但没支付,消息进入该队列;
• 如果用户 30 分钟都没动静,消息 TTL 到期;
• RabbitMQ 就会把这个消息转发到你设置好的 死信交换机 DLX;
• DLX 会把这些消息路由到 payment.timeout.queue;
• 然后你再通知用户 “订单已失效” or 扣库存、释放资源。
✅ 死信交换机适合场景:
• 延迟队列(超时处理);
• 消费失败重试机制(超过次数后扔给 DLX);
• 消费者拒绝消息(如业务异常、数据错误);
RocketMQ
一、为什么RocketMQ不使用Zookeeper作为注册中心?而选择择自己实现NameServer?
💡 Zookeeper 是个“重型老大哥”,RocketMQ 想要“简单自由的小弟”
- Zookeeper 过于严谨,RocketMQ觉得“太重了”
Zookeeper 天生就走 强一致性,比如 所有节点必须保持一致再返回结果,适合搞分布式锁、选主这种严肃业务。
但 MQ 场景追求的是 快、实时路由、灵活扩展,服务发现偶尔不一致问题 完全能接受。
👉 RocketMQ 选了一个“轻量级 NameServer”,只负责告诉客户端 broker 在哪,怎么连,不用动不动就三方投票、同步,延迟自然更低。
- Zookeeper 节点互相“唠嗑”太频繁
ZK 节点需要互相同步元数据,三台五台节点隔三差五互相“心跳+投票”,集群一复杂网络抖动就可能导致卡顿或者重选主。
RocketMQ 的 NameServer 完全无状态、互不干涉,谁活着就提供服务,活着就干活,死了不管,简单粗暴,天然就高可用。
- RocketMQ 天然“弱一致”,没必要搞 Zookeeper 的架子
RocketMQ 的消息发送 实时性第一位,即使短时间内拿到的路由信息不是最新的,后面重试也能发出去,系统自己兜底。
👉 “能发出去消息就行,别搞那么复杂” —— RocketMQ 。
- 降低依赖,RocketMQ 想做“独立大哥”
Zookeeper 依赖 Java + 繁琐的配置、维护,对 RocketMQ 来说是外部“拖油瓶”,一旦挂了整个集群连消息路由都挂了。
自己造 NameServer,小而美,RocketMQ 完全自给自足,上线简单、部署灵活,稳得很。
✨ 1. NameServer 本质是一个 KV 路由中心
Broker 启动时,主动向所有 NameServer 注册,发送自己的路由信息(IP、端口、topic 列表、写队列数等)。
NameServer 本地内存维护一张路由表(topic -> broker地址列表)。
Client(Producer/Consumer)启动时,从 NameServer 拉取路由信息,后续自己缓存路由,本地直连 Broker。
Broker 定时上报心跳(默认 30s 一次),如果超过 2 分钟没有心跳,NameServer 自动剔除路由。
✨ 2. NameServer 完全无状态
NameServer 之间没有任何通信,每个节点维护自己的内存路由表,互不干涉。
扩容/缩容很简单:丢几台 NameServer,Broker 会自动向新 NameServer 注册,Client 会自动感知新 NameServer。
📌 为什么 RocketMQ 故意让 NameServer 之间不通信、路由表冗余?
1、无状态设计,活着就能用
NameServer 没有 Leader,没有互相同步的压力,挂了随便重启,活着的 NameServer 能继续提供服务。
Producer、Consumer 连多个 NameServer,只要有一个活着就能获取路由信息,不存在单点问题。
2、极简架构,强健易扩展
NameServer 无磁盘落盘,全内存维护路由表,数据来源于 Broker 的 定时心跳。
新 NameServer 加入,Broker 会主动注册,NameServer 重启后 Broker 也会重新注册,自动修复,无脑扩容。
3、“数据冗余” 换 “极高可用”
你可以理解为 RocketMQ 追求的不是路由表的存储效率,而是 “谁都可以提供路由服务”。
就算挂掉一半 NameServer,系统完全不影响正常发消息。
如何理解Broker?
💡 Broker 就是 RocketMQ 的“核心消息存储+分发中心”
二、RocketMQ中关于事务消息的实现
RocketMQ 中的事务消息通过两阶段提交的方式来确保消息与本地事务的一致性。
第一阶段(消息发送):生产者先将消息发送到 RocketMQ 的 Topic,此时消息的状态为半消息(HalfMessage),消费者不可见。
然后,生产者执行本地事务逻辑,并根据本地事务的执行结果来决定下一步的操作。
第二阶段(提交或回查):
如果本地事务成功,生产者会向 RocketMQ 提交 commit 操作,将半消息变为正式消息,消费者可见。
如果本地事务失败,生产者会向 RocketMQ 提交 Rollback操作,RocketMQ 会丢弃该半消息。
如果生产者没有及时提交commit或 Rollback操作,RocketMQ 会定时回查生产者本地事务状态,决定是否提交或回滚消息。
用打车开发票做类比:
RocketMQ 阶段 | Producer 类比动作 | Consumer 类比动作 |
---|---|---|
发送半消息 | 顾客说“我要开发票” | 暂时看不到 |
本地事务 | 顾客正在付款 | 不处理(消息未投递) |
Commit 提交事务 | 付款成功,系统发发票 | 发票打印(消费消息) |
Rollback 回滚事务 | 付款失败,取消发票 | 没有任何动作 |
RocketMQ 事务消息的缺点主要有:
- 改造成本高:需要改造业务逻辑,接入特定接口,手动处理回查逻辑。
- 功能有限:只支持单事务消息,不支持跨消息事务。
- 可用性依赖强:MQ 集群挂了,半消息无法发送,事务整体不可用,应用流程中断。
用本地消息机制替代二阶段提交:
- 将发消息操作写入数据库本地消息表,与业务操作同一事务提交,保证业务和消息持久化的原子性。
- 通过后台定时任务异步扫描未发送的消息,负责将消息投递到 MQ,避免因 MQ 挂掉导致事务阻塞。
- 失败的消息通过重试机制和死信队列,保证消息最终一致性,减少人工干预。
简单说,本地消息机制把“发消息”变成了数据库事务的一部分,借助异步投递和重试,绕过了传统二阶段提交对 MQ 可用性的强依赖,实现更可靠的事务补偿方案。
Kafka
一、Kafka中Zookeeper的作用
在 Kafka 中,Zookeeper 扮演了集群协调和管理的核心角色。它的主要作用是管理和协调Kafka 集群中的元数据,帮助 Kafka 实现高可用性、负载均衡和容错性。
以下是 Kafka 中 Zookeeper 的几个关键作用:
- 管理 Broker 元数据:Zookeeper 负责管理 Kafka 集群中 Broker 的注册、状态监控。当有新的 Broker加入或离开集群时,Zookeeper能够及时更新集群状态
- 协调分区副本 Leader 选举:当某个分区的 Leader 副本故障时,Zookeeper 协调副本的选举过程,为该分区选出新的 Leader,确保分区高可用。
- 管理消费者的Offset:在早期版本的 Kafka 中,消费者的 Offset 信息存储在Zookeeper 中,以便消费者在重启后可以从上次消费的位置继续消费。最新的 Kafka版本将 Offset 存储移至 Kafka自身的内部主题consumer offsets,减少了对Zookeeper 的依赖。
- 动态配置和负载均衡:Zookeeper 保存着 Kafka 配置和拓扑信息,当集群发生变化时(如增加或减少分区、调整副本因子),Zookeeper协助完成负载均衡。
二、Kafka中关于事务消息的实现
Kafka 的事务消息不同于我们理解的分布式事务消息,它的事务消息是实现了消息的Exacty Once 语义,即保证消息在生产、传输和消费过程中的“仅一次”传递。Kafka 的事务消息主要通过以下几个核心组件来实现:
- 事务协调器(Transaction Coordinator):负责事务的启动、提交和中止管理,并将事务状态记录到transaction_state 内部主题。
- 幂等生产者(ldempotent Producer):Kafka Producer 通过 Producer ID(PID)识别每个事务的唯一性,确保同一事务的每条消息只写入一次。
- 事务性消费:在消费过程中,消费者可以选择隔离未完成事务的数据(通过read_committed 设置),只消费已提交的事务消息,确保数据的最终一致性。
Kafka 事务消息流程
- 启动事务:事务性生产者向 Transaction coordinator 请求启动事务
- 生产消息:生产者开始向 Kafka 写入事务消息,每条消息都带有唯一的 Producer ID和 Sequence Number ,以保证幂等性。
- 提交事务:在所有消息写入完成后,生产者向事务调器发送commit或abort请3求,提交或中止事务。
- 事务性消费:消费者可以通过设置 read_committed 隔离级别,仅消费已提交的消息实现最终数据一致性。
Kafka 事务隔离级别
Kafka 提供了以下两种消费隔离级别:
- read committed:消费者只会消费事务已提交的消息,确保数据的最终一致性。
- read uncommitted:消费者可以消费所有消息,包括尚未提交的事务消息,可能会读到未提交的数据,适用于对事务一致性要求较低的场景
进一步理解 Kafka 的事务消息的 Exactly Once
Kafka 的事务消息和 RocketMQ 的事务消息是不一样的。RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,就像下面代码所演示的。
类别 | RocketMQ 的事务消息 | Kafka 的事务消息 |
---|---|---|
像什么? | 我打车时付款和发票要一起成功 | 我叫了 3 个朋友一起拼车,要么都能上车,要么谁也上不了 |
场景 | 处理本地事务 + 发消息的强一致性 | 保证多条消息发出去的事务性(原子性) |
作用 | 发不发消息,取决于本地操作成功与否 | 多条消息,要么全 commit,要么全 rollback |
例子 | 用户下单成功才发“扣库存”消息 | 发“新建订单”“记账”“打点日志”这3条消息,要么都成功,要么都失败 |
三、如何处理重复消息?
只有让消费者的处理逻辑具有幂等性,保证无论同一条消息被消费多少次,结果都是一样的,从而避免因重复消费带来的副作用。
如何幂等处理重复消息?这需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果
主要是利用唯一标识(ID)去重。
在消息中引入全局唯一 ID,例如 UUID、订单号等,利用 redis 等缓存,或者数据库来存储消息 ID,然后消费者在处理消息时可以检查该消息 ID 是否存在代表此消息是否已经处理过。
去重缓存的使用
- 使用 Redis 等缓存系统存储已经处理的消息 ID,以减少数据库的查询压力。
- 可以为 Redis 中的去重记录设置过期时间,例如7天,以便自动清理历史消息,减小存储压力。
去重表的设计:
- 在数据库中创建一张去重表,用来存储已处理消息的 ID 及处理时间。在消费每条消息前,先查询该表。
- 对于高并发场景,可以结合数据库的唯一索引来避免多次插入同一个消息ID
如何保证消息的有序性?
保证消息有序性的常见方法如下:
1、单一生产者和单一消费者:
使用单个生产者发送消息到单个队列,并由单个消费者处理消息。这样可以确保消息按照生产者的发送顺序消费。这种方法简单但容易成为性能瓶颈无法充分利用并发的优势
2、分区与顺序键(Partition Key)
在支持分区(Partition)的消息队列中(如Kafka、RocketMQ),可以通过Partition Key 将消息发送到特定的分区。每个分区内部是有序的,这样可以保证相同Partition Key 的消息按顺序消费。
例如,在订单处理系统中,可以使用订单号作为 Partition Key,将同一个订单的所有消息路由到同一个分区,确保该订单的消息顺序。
3、顺序队列(Ordered Queue)
一些消息队列系统(如RabbitMQ)支持顺序队列,消息在队列中的存储顺序与投递顺序一致。如果使用单个顺序队列,消息将按顺序被消费。
可以使用多个顺序队列来提高并发处理能力,并使用特定规则将消息分配到不同的顺序队列中。
不同消息队列的顺序性如何保证?
1、Kafka:Kafka 中的消息在分区内部是有序的。生产者在发送消息时,可以指定分区(Partition)。如果所有相同 Key 的消息都发送到同一个分区,则可以保证这些消息的顺序。
通过配置生产者的hash函数,可以将同一类型的消息发送到相同的分区,保证顺序。
在消费端,使用单线程消费者从特定分区读取消息,可以确保消费的顺序性。
2、RabbitMQ:RabbitMQ 通过单个队列可以保证消息的顺序,如果消息需要并发消费,则需要将其路由到不同的顺序队列中。
使用Message Grouping技术,将具有相同属性的消息分组到一个队列中,以确保组内消息的顺序。
通过自定义路由策略,可以将同一业务逻辑的消息发送到相同的队列,从而保证顺序。
3、RocketMQ:RocketMQ 支持顺序消息(Ordered Messages),生产者可以使用 send方法将消息发送到指定的分区队列,并使用Message Queue Selector来选择目标队列(本质的实现和 kafka 是一样的)
消费者端通过顺序消费模式,可以从同一个消息队列中按顺序读取消息,确保消息的顺序性。
如何处理消息堆积?
消息堆积是指在消息队列中,消息的生产速度远大于消费速度,导致大量消息积压在队列
我们需要先定位消费慢的原因,如果是 bug 则处理 bug,同时可以临时扩容增加消费速率,减少线上的资损。
如果是因为本身消费能力较弱,则可以优化下消费逻辑常见有以下几种方式提升消费者的消费能力:
1、增加消费者线程数量:提高并发消费能力。
2、增加消费实例:在分布式系统中,可以水平扩展多个消费实例,从而提高消费速率。
3、优化消费者逻辑:检查消费者的代码,减少单个消息的处理时间。例如,减少 /O 操作、使用批量处理等.
注意上述的第二点
增加消费实例,一定要注意注意Topic 对应的分区/队列数需要大于等于消费实例数不然新增加的消费者是没东西消费的。因为一个Topic中,一个分区/队列只会分配给一个消费实例。
除此之外还可以进行限流和降级处理
·对消息生产端进行限流,降低生产速率,避免消息积压进一步恶化。
·对非关键消息进行丢弃或延迟处理,只保留高优先级的消息,提高系统的响应速度。
优化消费者逻辑常见做法
1、批量消费:通过一次性从队列中消费多条消息(如批量读取 100 条),可以减少每次拉取消息的网络开销,提高处理效率。
2、异步消费:在消费的同时不阻塞后续消息的消费。处理完一条消息后立即开使用异步处理方法,始处理下一条消息,提升并发度(但是要注意消息丢失的风险)
3、优化数据库操作:
- 如果消费者在处理消息时需要频繁访问数据库,可以通过数据库连接池、SQL优化、缓存等手段减少数据库操作的时间。
- 使用批量插入或更新操作,而不是逐条处理,可以显著提升效率,
4、临时扩展队列的策略
- 临时扩展多个消费者队列:在消息积压严重时,可以通过临时扩展多个消费者队列,将积压的消息分配到不同的队列中进行消费。消费完成后,可以将这些临时队列关闭。例如,在Kafka 中,可以增加分区数(Partition),同时扩展更多的消费者实例,分摊消费压力。
- 在 RocketMQ 中可以增加队列数。
5、使用多队列调度机制:
·例如,使用 RabbitMQ 的Exchange机制,将消息按照特定规则路由到多个队列中。这样可以在消息堆积时,将不同类型的消息分开处理。
6、限流与降级的实现方式
生产者限流:
在生产者端增加限流逻辑,使用令牌桶、漏桶算法等限流策略,限制生产者的发送速率,从而避免消息队列被快速填满。
来缓解消息发送。例如,在 Kafka 中可以通过配置生产者的 1inger.ms和 batch.size:的速度。
7、消费者降级:
在消息堆积严重时,对低优先级的消息进行丢弃或延迟处理。只保留高优先级消息的消费,确保系统核心功能的正常运行。
可以在消费端增加优先级队列或通过消息属性区分优先级,先处理高优先级的消息
消息队列设计成推消息还是拉消息?推拉模式的优缺点?
推拉模式主要讨论的是 Consumer 和 Broker 之间的消息交互方式。
Producer 到 Broker:默认是推模式(Push),Producer 主动把消息推给 Broker,理由是:①Broker 可以用多副本等机制保证可靠性;②让成百上千个 Producer 自己存消息、等 Broker 来拉,可靠性、实现成本太高,不现实。
所以:
- Producer → Broker:推(Producer 主动投递)
- Consumer ↔ Broker:推 or 拉(消费方式才区分推拉)
一句话:推拉模式只讨论消费端,生产者默认都是推!
1、推模式(Broker 主动推送)优缺点:
✅ 优点:
- 消息实时性强:Broker 一有消息立刻推送,延迟低;
- 消费者使用简单:Consumer 被动接收,不用管拉不拉,来就接着。
❌ 缺点:
- 推送速率不可控:Broker只管推,不关心消费者忙不忙,推多了容易**“消费堆积”**;
- 容易“打爆消费者”:如果生产速度快于消费速度,消费者可能**“被推死”**,相当于被DDoS了。
总结:推模式实时爽,但消费者扛不住容易“爆仓”。
2、拉模式(Consumer 主动拉取)优缺点一图流:
✅ 优点:
- 消费者掌控节奏:根据自己消费能力决定是否拉取,能控流、能限流。
- Broker 更轻松:只负责存消息,被动响应请求,没负担。
- 批量拉取更灵活:消费者可以按需一次拉多个消息,拉多少自己说了算。
❌ 缺点:
- 消息延迟高:得靠消费者定期轮询,消息到达时不能立马推送。
- 空轮询浪费资源:消息没到时,消费者不停拉取等于白请求,浪费 Broker 和网络资源。
总结:“拉模式控得住节奏,但消息延迟挡不住”
推 or 拉,怎么选?
推模式 vs 拉模式:各有优缺点
- 推模式:实时性高,但容易推爆消费者;
- 拉模式:可控但有延迟。
RocketMQ、Kafka 为啥选拉模式?
- Broker 只管存消息,不依赖消费者状态,职责单一;
- Broker 作为中心,越轻量越稳定;
- 消费端复杂多样,主动拉取更灵活。
如何优化拉模式的缺点?
靠 “长轮询”:
- 没消息就先挂起请求,有消息立马返回;
- 兼顾低延迟 + 不空轮询;
- 消费者不用一直无脑请求,也能第一时间拿到消息。
📌 总结:RocketMQ/Kafka选拉,但用“长轮询”整出了推的实时性、拉的灵活性。
如何保证消息不丢失?
关键看三个环节:
1️⃣ 生产阶段:生产者不丢
- 确认机制:发送消息后必须等待 Broker 确认(ACK);
- 异常重试:发送失败及时 重试 + 报警 + 日志,别悄无声息丢失;
- 同步/异步都要兜底:异步也得有回调错误处理,别掉包不管。
2️⃣ 存储阶段:Broker 不丢
- 刷盘确认:Broker 将消息持久化到磁盘上后再返回ACK,内存级别不算完事;
- 副本机制:集群时,可以配置 多副本(例如写入2台才返回ACK),即使主节点挂了,消息依然安全。
3️⃣ 消费阶段:消费者不丢
- 消费ACK机制:业务逻辑执行完成后才给 Broker 确认ACK;
- 失败重试机制:消费失败可 自动重试 / 进死信队列(DLQ);
- 避免假消费成功:不能拿到消息直接ACK,得真正处理完再确认。