MQ常见问题分析——消息可靠性、消息积压、消息幂等问题
消息可靠性保证
消息在整个链路中可以分为三个部分:
- 生产者->broker
- broker集群内部
- broker->消费者
事实上在这三个阶段我们都要保证消息的可靠性。
生产者->broker
在生产者向broker发消息的过程中,可能因为网络抖动或者服务的原因。消息没有成功到达broker。
如何保证这个阶段的可靠性呢?
首先我们来回顾一下三种消费发送方法(详见第二课时)
- 同步发送:消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 异步发送:发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。实现异步发送回调接口(SendCallback)
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
很显然单向发送生产者无法知道消息发送情况,那么这种方法自然是无法保证可靠性的。
同步发送
发送一个消息之后会阻塞,知道broker接收并返回ack,才会继续往下执行业务。
所以他适用于一致性较强的业务。
如下一个订单创建业务,如果消息发送失败要通过本地事务保证库存和订单的一致性。
@Transactional
public void createOrder(){//1.扣减库存//2.发mq给订单服务创建订单SendResult sendResult = producer.send(msg);
}
针对于这种一致性比较强的场景我们建议使用事务消息(下节课详细讲),这个事务消息同样保证消息发送可靠性
如果发送失败收不到ack,我们可以捕获异常并且根据业务场景进行任意处理。
- 重试
- 手动补偿
消息重试
RocketMQ在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数,失败时会按照设置的重试次数重新发送,直到消息发送成功或达到最大重试次数。
// 设置同步发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认10s
producer.setSendMsgTimeout(5000);
重试流程如下:
- 如果发送失败,默认重试2次。
但在重试时会过滤掉上次发送失败的Broker,选择其它Broker。
异步发送
针对于实时性较高的场景我们用异步发送。
Message msg = new Message(MqConstant.SIMPLE_TOPIC,"TagA",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);//发送异常//重试 //手动补偿e.printStackTrace();}
});
异步发送,会收到broker的回调。
在回调函数中我们就可以根据业务实现重试和补偿。
异步发送的客户端内部也提供了重试,异步发送失败重试时,异步重试不会选择其他Broker,仅在当前Broker上做重试。
producer.setRetryTimesWhenSendAsyncFailed(0);
broker集群内部
broker收到到消息后,会先把消息存储到内存中,之后再刷到磁盘当中。
- 同步刷盘:broker存储消息到内存中后立刻刷到磁盘中,成功之后返回ack,不会有消息丢失的风险。
- 异步刷盘:broker存储消息到内存中,之后就直接返回ack,等内存中消息达到一定量之后,再刷到磁盘中。如果在刷盘过程中出现错误,那么这些消息也就丢失了。
具体的持久化机制我会在最后一个课时讲。
那么默认是异步刷盘,如果对可靠性较高的场景我们只需要更改配置
//broker.conf文件
flushDiskType=SYNC_FLUSH
broker->消费者
消费者重试
非广播模式下,Consumer消费消息失败后,要提供⼀种重试机制,令消息再消费⼀次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身⽆法处理(例如话费充值,当前消息的⼿机号被注销,⽆法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,⽽这条失败的消息即使⽴刻重试消费,99%也不成功,所以最好提供⼀种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应⽤服务不可⽤,例如db连接不可⽤,外系统⽹络不可达等。 遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应⽤sleep 30s,再消费下⼀条消息,这样可以减轻Broker重试消息的压⼒。
消费者方在接收到消息之后,成功完成业务处理之后,需要向服务端返回ack。如果不响应或者返回失败等都会触发重试。
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("收到的消息:" + msg);}return null;//returnConsumeConcurrentlyStatus.RECONSUME_LATER;//抛出异常}
});
消费者返回null,或者返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,或者抛出异常,都会触发重试。
重试队列
RocketMQ会为每个消费组都设置⼀个Topic名称为“%RETRY%+consumerGroup”的重试队列(这⾥需要注意的是,这个Topic的重试队列是针对消费组,⽽不是针对每个Topic设置的),⽤于暂时保存因为各种异常⽽导致Consumer端⽆法消费的消息。
考虑到异常恢复起来需要⼀些时间,会为重试队列设置多个重试级别,每个重试级 别都有与之对应的重新投递延时,重试次数越多投递延时就越⼤。
RocketMQ对于重试消息的处理是先保存⾄Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进⾏Delay后重新保存⾄“%RETRY%+consumerGroup”的重试队列中。与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:重试超过指定次数的消息,将会进⼊到死信队列中 %DLQ%my-consumer group1 。
**
注意: 若重试次数超过16次,后面每次重试间隔都为2小时。
**
consumer.setMaxReconsumeTimes(10);
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
死信队列具备以下特点:
- RocketMQ会自动为需要死信队列的ConsumerGroup创建死信队列。
- 死信队列与ConsumerGroup对应,死信队列中包含该ConsumerGroup所有相关topic的死信消息。
- 死信队列中消息的有效期与正常消息相同,默认48小时。
- 若要消费死信队列中的消息,需在控制台将死信队列的权限设置为6,即可读可写。
消息幂等性
幂等必要性
消息幂等的必要性:
- 发送时消息重复:当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端名机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
- 投递时消息重复:消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
- 负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及订阅方应用重启):当RocketMQ的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
解决方案
业务幂等
将业务逻辑设计为幂等的,即多次执行相同的操作不会影响最终结果。例如:删除id=9的记录,无论我删除多少次结果都一样。
状态判断
在处理消息时,首先查询当前状态,看是否已经达到目标状态。只有当状态符合预期时,才进行下一步处理,并更新状态。这种方式适用于多步骤的业务逻辑,每个步骤都有明确的状态变化。
比如:订单服务将订单状态改为已支付
update orders set order_status='已支付'
where order_id = xxxx and order_status='交易中' ;
在条件中加上一条符合上一个状态的才能够被修改。
唯一ID去重
每个消息都包含一个唯一的 ID,生产者在生成消息时给每个消息赋予一个唯一的标识(如 UUID)。消费者在处理消息前,先检查该消息的 ID 是否已经处理过。如果已经处理过,则直接丢弃;如果没有处理过,则进行处理并记录这个 ID。
记录方式可以是数据库表或缓存(如 Redis),用于存储已经处理过的消息 ID。
数据库唯一约束
在数据库中为相关字段设置唯一约束。例如,在处理订单时,可以在订单表中为订单号设置唯一索引。
在插入数据时,如果因为唯一约束导致插入失败,则说明该消息已经被处理过,这样就保证了幂等性。
Redis
利用Redis的SETNX(SET if Not Exists)操作。先尝试将消息的唯一ID设置到Redis中,如果成功(返回1),则表明是第一次处理,可以继续处理消息;如果失败(返回0),则说明消息已经被处理过。使用Redis的TTL功能,可以给记录的消息ID设置过期时间,避免Redis中记录无限增长。包括也可以用set
消息积压
MQ(Message Queue)消息积压问题指的是在消息队列中累积了大量未处理的消息,导致消息队列中的消息积压严重,超出系统处理能力,影响系统性能和稳定性的现象。
消息积压原因:
- 消息生产者速率大于消费者消费速率
- 突发性,大概率是消费者服务出问题,生产者生产突增消费者来不及消费
- 长期累积的,生产者总是比消费者快,长期一定会消息积压
解决方案
消费者服务扩容
大部分情况下,我们首先想到的就是消费者服务扩容(增加消费者的实例数量)对堆积是有效果的。原因在于RocketMQ在设计上就是支持消费者的横向扩展的,所以整体的消费能力通常情况下会随着消费者数量的增加而增加。然而,这里有一个特殊的场景需要考虑,即队列数和消费者数量的比例。
如果扩容前,队列数比消费者数大或者队列数等于消费者实例数,那么扩容之后队列数可能小于消费者实例数。这样一来,多出来的消费者实际上是会空载的,也就是说这对消息的堆积丝毫没有帮助。
消费者线程扩容
消费者线程扩容相比较于消费者服务扩容可能会更加有效,RocketMQ虽然一个消费者也是只能分配一条队列,但是这个消费者的并发数是可以大于1的,也就是说这个消费者实际上是多线程地在消费队列中的数据。
同一个消费者示例,通过并发编程的方式提高消息处理能力。
为什么说消费者线程扩容通常情况下应对堆积会是更有效的扩容方式呢?有以下两点原因。
- 存量堆积数据的消费加速。对于已经堆积到队列的数据,因为队列无法同时分配给多个消费者,假设研发人员采取消费者实例数扩容,最终总会到达这样的一个状态:消费者数量=队列数量。到达这个状态之后,再扩容消费者实例已经没有意义了。但是队列里面堆积的数据是没办法迁移的,如果要加速这个队列的消费速度,最终只能提升单个消费者的消费度来加速堆积数据的处理。而消费者的线程数是可以继续扩容的,而且扩线程数就是扩并发数。也就是说只要扩容了线程数,在不考虑资源消耗(多线程的上下文切换、内存开销等)的情况下,对于整体的消费速度总会线性提升的。
- 线程池参数优化。很多情况下,开发者对于线程池的前期评估是不准的,大多数情况下都是按照一些简单的经验做的保守设置,如16。但是现阶段很多服务器的机器资源是很充足的,CPU的利用率远远不够,适当提升线程数是低成本实现消费能力扩展的很好手段。
Broker队列数扩容
有些时候我们看到队列大量堆积了,就病急乱投医,直接考虑扩充队列数去解决堆积问题,实际上这种扩容方式大部分情况下是没有效果的。原因在于 RocketMQ 的队列发生数量变化的时候,并不会做数据的搬迁。假设现有4个队列,每个队列都堆积了 10万条消息,那么当研发人员把队列数量扩容到8条的时候,原来的4条队列每条还是堆积了 10万条消息,而新创建的队列则没有堆积的消息。
但是这种方法有两种情况适用:
- 队列数大于等于消费者实例数,那么要扩展消费者实例,前提是队列数的扩容,同时扩展后的消费者数要小于等于队列数,要不然我们有的消费者就没有相对应的队列。
- 缓解新消息的处理延迟问题。堆积会导致新消息需要一直等待老消息的消费结束才能消费。很多时候,我们希望发生故障的时候能尽可能不影响新的请求。也就是说堆积的消息已经受影响了,可能没什么办法,但是我们是希望新的消息能在这些影响中得到隔离。
服务扩容
- 首先,RocketMQ的消费投递的性能极高,因为有大量的零拷贝、PageCache、批处理、长轮询等设计,RocketMQ无论是消息查找,还是消息拉取方面性能都是极高的,几乎不存在瓶颈。
- 排除 Broker 投递上遇到的瓶颈(网卡),堆积肯定是消费性能跟不上导致的。而消费性能一般又分为自身资源不足和下游服务性能不足导致的。如果是自身资源不启的情况,通过对消费者服务的扩容或者对消费者线程的扩容都是可以起到不错的效果的。但是如果瓶颈出现在下游的服务、存储上的话,那么扩容消费者本身的线程或者实例都只能是加重下游的负担,对于加速堆积消息的消费反而是有害的。
总结
快速扩容
遇到消息堆积,首要的目标是解决堆积问题。
恢复新消息消费
扩容之后,研发团队还需要考虑让新消息能有机会被消费到。前面说过,扩容即使可以提升消费能力,但还是无法做到让新消息插队。
- 更换消息主题。研发人员可以让生产者更换一个主题,让消息能进到另外一个主题中.这时候让消费者组也更换新的消费主题,从而实现新消息能及时分配资源。
- 前面说的扩容队列数也可以
定位问题预防
- 通过日志分析和监控,定位到问题所在
- 调整配置预防下次发生
- 消费者的实例数量。
- 每个消费者能分配到的队列数。
- 消费者线程池的线程数。