【RocketMQ 生产者和消费者】- 延时消息的使用
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 消费者启动源码
- 【RocketMQ 生产者和消费者】- 消费者重平衡(1)
- 【RocketMQ 生产者和消费者】- 消费者重平衡(2)- 分配策略
- 【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响
- 【RocketMQ 生产者和消费者】- 消费者的订阅关系一致性
- 【RocketMQ 生产者和消费者】- 消费者发起消息拉取请求 PullMessageService
- 【RocketMQ 生产者和消费者】- broker 是如何处理消费者消息拉取的 Netty 请求的
- 【RocketMQ 生产者和消费者】- broker 处理消息拉取请求
- 【RocketMQ 生产者和消费者】- 消费者处理消息拉取结果
- 【RocketMQ 生产者和消费者】- 消费者处理消息拉取结果
- 【RocketMQ 生产者和消费者】- ConsumeMessageConcurrentlyService 并发消费消息
- 【RocketMQ 生产者和消费者】- ConsumeMessageOrderlyService 顺序消费消息
- 【RocketMQ 生产者和消费者】- sendMessageBack 发送重试消息
2. 延时消息
RocketMQ 的延时消息是一种非常实用的消息特性,允许生产者发送的消息在指定的延迟时间之后才被消费者消费,下面看下官网的图:延时消息介绍。
如果业务需要延时一段时间之后做一些逻辑,比如订单检测就可以用延时消息,RocketMQ 4.9.3 版本的延时消息是通过 JDK 的 ScheduledExecutorService 来完成,但是在 5.0 之后应该是支持了时间轮的方式来实现定时任务,有兴趣可以看下这篇文章,后面也会去学习 5.x 的源码,现在还是以 4.9 的为主,5.x 也改了不少东西,存储结构和高可用那些都有变动,这都是后面的了,官网地址:Apache RocketMQ 源码解析 —— 秒级定时消息介绍。
应用场景就是刚刚上面说的,举几个例子:
- 订单超时处理: 电商系统用户下单后如果在一定时间内未支付,就需要自动取消订单。这种情况下可以发送延时消息,当订单创建时发送一条延迟一定时间(如 30 分钟)的消息,消息到期后触发检查订单是否支付,若未支付则取消订单,JDK 的 ScheduledExecutorService 执行误差还是可以的。
- 缓存刷新: 对于一些定期需要刷新的缓存数据,可以通过延时消息来定时触发缓存刷新操作,不过这种应该用定时任务中间件会比较好,适配 cron 表达式的可用性和精确度也会更高一点。
- 消息重试: 消息处理失败时发送一条延时消息,在一定时间后再进行重试,避免因系统瞬时故障导致的频繁重试,这里就是前面消费者消费失败之后的重试逻辑了,要注意这里的消费失败重试指的是并发消费,因为前一篇文章也说了顺序消费达到最大重试次数之前都是在本地重试的。
3. 示例
下面做一个简单的示例,首先是消费者,为什么要启动消费者先呢,因为延时消息发送是按照消息存储的时间来计算的,而不是消费者拉到消息之后再开始延时消费。
public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 初始化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// 订阅 topicconsumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TestScheduledTopic", "*");// 监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {long now = System.currentTimeMillis();System.out.printf("%s Receive message[msgId=%s %s %s ms later]\n", format(now), message.getMsgId(),format(message.getStoreTimestamp()), format(message.getBornTimestamp()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}public static String format(long time){return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date(time));}}
下面再启动生产者,这里会发送 100 条消息,然后延时等级设置为 3,也就是消息发送之后 10s 开始消费。
public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化生产者DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 生产者启动producer.setNamesrvAddr("localhost:9876");producer.start();int totalMessagesToSend = 10;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestScheduledTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级,3 就是 10smessage.setDelayTimeLevel(3);// 发送消息SendResult send = producer.send(message);if(send.getSendStatus() == SendStatus.SEND_OK){System.out.println(format());}}// 关闭生产者producer.shutdown();}public static String format(){return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date());}}
来看下输出,首先是生产者的输出,生产者主要是将这 10 条消息的生产时间打印出来,可以看到都是 23:13:32
生产的。
接下来看下消费者的输出,消费者主要看三个参数:日志打印时间、消息存储时间和消息生成时间。
上面说了消息具体执行时间 = 消息存储时间 + 延时时间
,这个消息存储时间就是 storeTimestamp
,在生产者 asyncPutMessage
存储消息的时候设置进去的,具体源码可以看这篇文章:【RocketMQ 存储】- broker 端存储单条消息的逻辑。
然后我在 broker 这里也打印了一些日志,来看下,首先就是在 CommitLog 的 checkMessageAndReturnSize 中打印下延时级别的时间设置。因为最终 ScheduleMessageService 处理延时消息是通过 ConsumeQueue 索引的 tagsCode 来判断是否延时消息到期的,前面文章 【RocketMQ 存储】消息重放服务-ReputMessageService 也分析过消息重放的逻辑,如果感兴趣可以去看下。消息重放会对 CommitLog 里面的消息生成 ConsumeQueue 索引,这个 checkMessageAndReturnSize 就是重放的时候会去调用,延时消息的 tagsCode 也会在这里设置为真实要投递的时间。
最后一条日志打印在了 asyncPutMessage 这个方法的第二行,也就是在设置消息存储时间之后打印下这个时间。
做完准备后再来看下最终 broker 的输出。
可以看到的是,最终延时消息经过了两次存储,第一次就是延时消息存储到对应的延时队列里面,这时候当消息存储完了重放的时候就会输出两条日志,第二次当延时消息到期被还原成真实消息投递到实际的 topic,这时候存储到 CommitLog 的时候调用 asyncPutMessage 方法,打印了存储时间,然后后续重放生成 ConsumeQueue 索引,这时候就不会输出第一条日志了。
然后大家可能会有疑问,为什么这里打印的消息存储时间跟上面消费者打印的不一样,按道理第二次打印的时候,消息存储时间应该是和时间消费的时候打印的存储时间一样的。实际上 asyncPutMessage 这个方法后面在添加的时候会加锁,然后又会重新设置消息存储时间,所以这里会有几 ms 的差距。
4. 延时消息的等级
使用消息生产者的时候需要设置一个延时等级,RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
延时等级 | 延时时间 | 延时等级 | 延时时间 | 延时等级 | 延时时间 |
---|---|---|---|---|---|
1 | 1s | 2 | 5s | 3 | 10s |
4 | 30s | 5 | 1min | 6 | 2min |
7 | 3min | 8 | 4min | 9 | 5min |
10 | 6min | 11 | 7min | 12 | 8min |
13 | 9min | 14 | 10min | 15 | 20min |
16 | 30min | 17 | 1h | 18 | 2h |
所以说 RocketMQ 并不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级1到18,消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。
如果将大量延时消息的定时时间设置为同一时刻,那么到了消息被处理的时候会有大量消息同时需要被处理,造成系统压力过大,有可能导致消息分发延迟,影响定时精度。
5. 小结
好了,这篇文章讲述了延时消息的使用,下一篇文章再来分析下延时消息的原理。
如有错误,欢迎指出!!!