当前位置: 首页 > news >正文

RocketMQ 实战:马拉松系统异步化与延时任务落地(含死信队列处理)

在马拉松赛事系统中,“报名成功通知”“成绩推送” 等场景若采用同步处理,会导致接口响应延迟;而 “报名截止自动关通道”“赛前消息推送” 等定时任务,若依赖单机定时任务(如 Spring Scheduled),则存在分布式部署下任务重复执行、故障无容错的问题。RocketMQ 作为阿里开源的分布式消息中间件,凭借异步通信、延时队列、死信队列等核心特性,可完美解决上述痛点。本文将以马拉松系统为背景,从原理到实战,带你掌握 RocketMQ 在异步化与延时任务场景的落地方法。​

一、为什么需要 RocketMQ?—— 马拉松系统的异步与定时痛点​

1.1、 同步处理的核心问题​

1.1.1、 接口响应延迟​

用户报名成功后,系统需执行 “生成报名订单→发送短信通知→推送 APP 消息→记录操作日志” 等多步操作,若采用同步执行:​

  • 示例:报名接口同步调用短信服务(耗时 500ms)+ APP 推送服务(耗时 300ms),总响应时间从 200ms 增至 1000ms,用户体验下降;​
  • 风险:某一步骤(如短信服务)故障时,会导致整个报名流程失败,可用性降低。​

1.1.2、 资源浪费​

同步处理时,业务线程需等待所有步骤完成才能释放,高并发场景下(如报名峰值 5000 QPS),会导致线程池耗尽,无法处理新请求。​

1.2、 单机定时任务的局限​

马拉松系统的定时需求(如 “报名截止自动关闭通道”“赛前 1 小时推送提醒”)若用 Spring Scheduled 实现,存在明显不足:​

  • 重复执行:分布式部署 3 台服务,会同时执行 3 次 “关闭报名通道” 任务,导致业务逻辑混乱;​
  • 无容错机制:任务执行失败(如数据库连接超时),无重试机制,需手动干预;​
  • 时间精度低:Spring Scheduled 最小精度为秒级,无法满足 “精确到分钟” 的定时需求(如 “每天 8:00:00 推送消息”)。​

1.3、 RocketMQ 的核心优势​

RocketMQ 通过 “异步解耦”“延时队列”“死信队列” 三大特性,针对性解决上述问题,核心优势如下:

优势维度具体特性适配场景
异步解耦生产者发送消息后立即返回,消费者异步处理,实现服务间解耦,提升接口响应速度报名成功通知、成绩推送、日志记录
延时队列支持 18 个固定延时级别(如 1s、5s、1min、1h),消息发送后延迟指定时间才被消费报名截止关通道、赛前 1 小时提醒、成绩超时未提交提醒
死信队列消息消费失败超过重试次数(默认 16 次)后,自动转入死信队列,避免消息丢失处理失败的通知消息(如短信发送失败),便于后续人工干预
分布式一致性支持集群部署,定时任务仅执行一次,避免重复执行分布式环境下的 “报名截止关通道”“数据归档” 任务
高可用基于主从架构,主节点故障时从节点自动切换,消息不丢失保障赛事关键任务(如报名截止)不中断

二、RocketMQ 核心概念与原理​

在落地实战前,需先理解 RocketMQ 的核心概念与延时队列、死信队列的工作原理,为后续开发奠定基础。​

2.1、 核心概念

概念作用类比
生产者(Producer)发送消息的服务(如马拉松报名服务发送 “报名成功” 消息)邮件发送者
消费者(Consumer)接收并处理消息的服务(如通知服务消费 “报名成功” 消息,发送短信)邮件接收者
主题(Topic)消息的分类标识(如 “marathon_sign_success” 表示报名成功主题)邮件主题
标签(Tag)主题下的细分标识(如 “sign_success_sms” 表示报名成功的短信通知,“sign_success_push” 表示 APP 推送)邮件标签
延时级别(DelayLevel)RocketMQ 预定义的延时级别,共 18 级(1 级 = 1s,2 级 = 5s,…,18 级 = 2h)定时邮件的 “延迟发送时间”
死信队列(DLQ)消费失败超过重试次数的消息存放的队列,命名格式为 “% DLQ%+ 原 Topic”邮件的 “垃圾箱”

2.2、 延时队列原理​

RocketMQ 的延时队列并非真正 “延时发送”,而是通过 “消息暂存 + 定时调度” 实现:​

  1. 消息暂存:生产者发送延时消息时,指定DelayLevel,RocketMQ 将消息先存放在内部主题SCHEDULE_TOPIC_XXXX(XXXX 为延时级别);​
  2. 定时调度:RocketMQ 内部有定时任务线程,每隔 1 秒扫描SCHEDULE_TOPIC_XXXX,当消息到达指定延时时间后,将其转发到目标 Topic;​
  3. 消费者消费:消费者监听目标 Topic,接收并处理转发后的消息。​

注意:RocketMQ 不支持自定义延时时间(如 3 分钟),需从 18 个预定义级别中选择(如 3 分钟对应第 8 级,DelayLevel=8),具体级别映射需参考官方文档。

2.3、 死信队列原理​

当消息消费失败(如抛出异常),RocketMQ 会自动重试,重试机制与死信队列流程如下:​

  1. 重试次数:默认重试 16 次,每次重试间隔按 “1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min” 递增;​
  2. 死信转入:重试 16 次后仍失败,消息自动转入死信队列(Topic 为%DLQ%marathon_sign_success);​
  3. 死信处理:开发者需监听死信队列,对失败消息进行人工干预(如排查短信服务故障后,重新发送消息)。​

三、环境准备:RocketMQ 部署与 Spring Boot 集成​

3.1、 步骤 1:部署 RocketMQ(单机模式,开发环境)​

3.1.1、 环境要求​

  • JDK 1.8+;​
  • 内存至少 2GB(NameServer 与 Broker 各需 1GB);​
  • 端口开放:9876(NameServer)、10911(Broker)。​

3.1.2、 下载与启动​

1、下载 RocketMQ:从RocketMQ 官网下载稳定版本(如 4.9.5);​

2、启动 NameServer:

# 进入RocketMQ安装目录
cd rocketmq-all-4.9.5-bin-release
# 启动NameServer(后台启动,日志输出到nohup.out)
nohup sh bin/mqnamesrv &
# 验证启动:查看日志,出现“The Name Server boot success”表示成功
tail -f ~/logs/rocketmqlogs/namesrv.log

3、启动 Broker:

# 配置Broker的NameServer地址(替换为实际IP)
export NAMESRV_ADDR=192.168.1.20:9876
# 启动Broker(后台启动,日志输出到nohup.out)
nohup sh bin/mqbroker -n $NAMESRV_ADDR -c conf/broker.conf &
# 验证启动:查看日志,出现“the broker[localhost, 192.168.1.20:10911] boot success”表示成功
tail -f ~/logs/rocketmqlogs/broker.log

3.2、 步骤 2:Spring Boot 集成 RocketMQ​

3.2.1、 引入依赖(pom.xml)

<!-- RocketMQ Spring Boot Starter(适配Spring Boot 2.x) -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
<!-- 工具类依赖(用于JSON序列化) -->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

3.2.2、 配置 RocketMQ(application.yml)

rocketmq:name-server: 192.168.1.20:9876 # NameServer地址producer:group: marathon_producer_group # 生产者组(同一业务的生产者归为一组)send-message-timeout: 3000 # 消息发送超时时间(毫秒)retry-times-when-send-failed: 2 # 同步发送失败重试次数retry-next-server: true # 发送失败时是否重试其他Brokerconsumer:group: marathon_consumer_group # 消费者组(同一业务的消费者归为一组)consume-thread-min: 10 # 消费者最小线程数consume-thread-max: 20 # 消费者最大线程数consume-message-batch-max-size: 1 # 每次消费消息数量(默认1,避免并发问题)

3.2.3、 配置消息序列化(可选)​

默认情况下,RocketMQ 使用 Java 序列化,建议改为 JSON 序列化,提升兼容性:

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;@Component
public class RocketMQProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;// 发送JSON格式消息(自定义方法)public <T> void sendJsonMessage(String topic, String tag, T data) {// 构建消息:topic:tag格式,消息体为JSON字符串String messageBody = JSON.toJSONString(data);String destination = topic + ":" + tag;// 发送消息rocketMQTemplate.send(destination, new GenericMessage<>(messageBody));}
}

四、实战 1:异步化场景 —— 报名成功通知与成绩推送​

4.1、 场景 1:报名成功通知(短信 + APP 推送)​

4.1.1、 需求分析​

用户报名成功后,系统需异步执行两项操作:​

  1. 发送短信通知(含报名编号、赛事时间);​
  2. 推送 APP 消息(引导用户查看报名详情)。​

4.1.2、 实现步骤​

1、定义消息实体:

import lombok.Data;
import java.util.Date;// 报名成功消息实体
@Data
public class SignSuccessMessage {private Long eventId; // 赛事IDprivate Long userId; // 用户IDprivate String userPhone; // 用户手机号private String signNo; // 报名编号private Date signTime; // 报名时间private String eventTime; // 赛事时间(如“2024-10-01 08:00”)
}

2、生产者发送消息(报名服务):​

在MarathonSignService的signUp方法中,报名成功后发送异步消息:

@Service
public class MarathonSignService {@Resourceprivate RocketMQProducer rocketMQProducer;@Transactional(rollbackFor = Exception.class)public String signUp(Long eventId, Long userId, String userPhone) {// 1. 核心报名逻辑(扣减名额、生成订单,略,参考前文分布式锁部分)String signNo = "MAR20241001" + System.currentTimeMillis(); // 生成报名编号SignSuccessMessage message = new SignSuccessMessage();message.setEventId(eventId);message.setUserId(userId);message.setUserPhone(userPhone);message.setSignNo(signNo);message.setSignTime(new Date());message.setEventTime("2024-10-01 08:00");// 2. 发送异步消息:短信通知(tag=sign_success_sms)rocketMQProducer.sendJsonMessage("marathon_sign_success", "sign_success_sms", message);// 3. 发送异步消息:APP推送(tag=sign_success_push)rocketMQProducer.sendJsonMessage("marathon_sign_success", "sign_success_push", message);return "报名成功!报名编号:" + signNo;}
}

3、消费者处理消息(通知服务):​

分别创建短信消费者与 APP 推送消费者,监听对应 Tag 的消息:

// 短信通知消费者
@Component
@RocketMQMessageListener(topic = "marathon_sign_success", // 监听的主题selectorExpression = "sign_success_sms", // 监听的TagconsumerGroup = "marathon_consumer_group"
)
public class SignSuccessSmsConsumer implements RocketMQListener<String> {// 注入短信服务(实际项目中为第三方短信API封装)@Resourceprivate SmsService smsService;@Overridepublic void onMessage(String messageBody) {// 1. 解析JSON消息SignSuccessMessage message = JSON.parseObject(messageBody, SignSuccessMessage.class);// 2. 构造短信内容String smsContent = String.format("【马拉松赛事】您已成功报名“%s”赛事,报名编号:%s,赛事时间:%s,请准时参赛!",message.getEventId(), message.getSignNo(), message.getEventTime());// 3. 发送短信boolean sendResult = smsService.sendSms(message.getUserPhone(), smsContent);if (!sendResult) {// 消费失败,抛出异常,触发重试(RocketMQ会自动重试)throw new RuntimeException("短信发送失败,userPhone:" + message.getUserPhone());}}
}// APP推送消费者
@Component
@RocketMQMessageListener(topic = "marathon_sign_success",selectorExpression = "sign_success_push",consumerGroup = "marathon_consumer_group"
)
public class SignSuccessPushConsumer implements RocketMQListener<String> {@Resourceprivate AppPushService appPushService; // APP推送服务@Overridepublic void onMessage(String messageBody) {SignSuccessMessage message = JSON.parseObject(messageBody, SignSuccessMessage.class);// 构造推送内容PushMessage pushMessage = new PushMessage();pushMessage.setUserId(message.getUserId());pushMessage.setTitle("报名成功");pushMessage.setContent("您已成功报名赛事,点击查看详情");pushMessage.setJumpUrl("/marathon/sign/detail?signNo=" + message.getSignNo());// 发送推送appPushService.sendPush(pushMessage);}
}

4.1.3、 核心优势​

  • 解耦:报名服务无需依赖短信、APP 推送服务,服务故障互不影响;​
  • 性能提升:报名接口响应时间从 1000ms 降至 200ms,用户体验提升;​
  • 可扩展:后续新增 “微信公众号通知”,只需新增消费者,无需修改报名服务。​

4.2、 场景 2:成绩推送(用户成绩实时通知)​

4.2.1、 需求分析​

选手完成比赛后,系统录入成绩,需异步推送成绩到用户 APP,并记录成绩日志。​

4.2.2、 实现步骤​

1、定义成绩消息实体

@Data
public class ScorePushMessage {private Long eventId; // 赛事IDprivate Long userId; // 用户IDprivate String signNo; // 报名编号private String score; // 成绩(如“02:35:40”)private Date submitTime; // 成绩录入时间
}

2、生产者发送消息(成绩录入服务)

@Service
public class ScoreService {@Resourceprivate RocketMQProducer rocketMQProducer;// 录入成绩并发送推送消息public void submitScore(ScorePushMessage message){​
// 1. 核心逻辑:录入成绩到数据库(略)​
scoreDao.insertScore (message);​
// 2. 发送异步消息:APP 成绩推送(tag=score_push_app)​
rocketMQProducer.sendJsonMessage ("marathon_score", "score_push_app", message);​
// 3. 发送异步消息:成绩日志记录(tag=score_log)​
rocketMQProducer.sendJsonMessage ("marathon_score", "score_log", message);​
System.out.println ("成绩录入成功,已发送推送通知:" + message.getSignNo ());​
}​
}

3、消费者处理消息:​

  • APP成绩推送消费者:向用户推送成绩通知;​
  • 成绩日志消费者:记录成绩录入日志到数据库或ELK。
// APP成绩推送消费者​
@Component​
}​
​
// 成绩日志消费者​
@Component​
@RocketMQMessageListener(​topic = "marathon_score",​selectorExpression = "score_log",​consumerGroup = "marathon_consumer_group"​
)​
public class ScoreLogConsumer implements RocketMQListener<String> {​
​@Resource​private OperateLogDao operateLogDao;​
​@Override​public void onMessage(String messageBody) {​ScorePushMessage message = JSON.parseObject(messageBody, ScorePushMessage.class);​// 构造操作日志​OperateLog log = new OperateLog();​log.setOperateType("SCORE_SUBMIT");​log.setOperateContent(JSON.toJSONString(message));​log.setOperateTime(new Date());​log.setRelatedId(message.getSignNo()); // 关联报名编号​// 写入日志表​operateLogDao.insert(log);​}​
}

五、实战 2:延时任务场景 —— 报名截止关通道与赛前提醒​

5.1、 场景 1:报名截止自动关闭通道​

5.1.1、 需求分析​

赛事报名有固定截止时间(如 2024-09-15 23:59:59),需在截止时间自动执行 “关闭报名通道” 操作,避免后续用户报名,且分布式部署下仅执行一次。​

5.1.2、 实现思路​

  1. 计算延时级别:在赛事创建时,根据 “当前时间” 与 “报名截止时间” 的差值,计算对应的 RocketMQ 延时级别;​
  2. 发送延时消息:赛事创建成功后,发送一条延时消息,延时时间为 “截止时间 - 当前时间”;​
  3. 消费延时消息:消息到期后,消费者执行 “关闭报名通道” 逻辑(更新赛事状态为 “已截止”)。​

5.1.3、 关键步骤​

1、延时级别计算工具类:​

RocketMQ 预定义 18 个延时级别,需根据时间差匹配对应的级别(以下为常用级别映射):

import java.util.HashMap;
import java.util.Map;// RocketMQ延时级别映射工具类(单位:毫秒)
public class DelayLevelUtil {// 级别→时间差映射(1级=1s,2级=5s,3级=10s,4级=30s,5级=1min,...,18级=2h)private static final Map<Integer, Long> LEVEL_TO_MILLIS = new HashMap<>();// 时间差→级别映射(用于反向查找)private static final Map<Long, Integer> MILLIS_TO_LEVEL = new HashMap<>();static {LEVEL_TO_MILLIS.put(1, 1000L);LEVEL_TO_MILLIS.put(2, 5000L);LEVEL_TO_MILLIS.put(3, 10000L);LEVEL_TO_MILLIS.put(4, 30000L);LEVEL_TO_MILLIS.put(5, 60000L); // 1minLEVEL_TO_MILLIS.put(6, 120000L); // 2minLEVEL_TO_MILLIS.put(7, 180000L); // 3minLEVEL_TO_MILLIS.put(8, 300000L); // 5minLEVEL_TO_MILLIS.put(9, 600000L); // 10minLEVEL_TO_MILLIS.put(10, 1800000L); // 30minLEVEL_TO_MILLIS.put(11, 3600000L); // 1hLEVEL_TO_MILLIS.put(12, 7200000L); // 2h// 反向构建时间差→级别映射(取最接近的级别,向上匹配)LEVEL_TO_MILLIS.forEach((level, millis) -> MILLIS_TO_LEVEL.put(millis, level));}/*** 根据时间差(毫秒)获取最接近的延时级别(向上匹配)* @param delayMillis 需求的时间差* @return 延时级别,无匹配返回-1*/public static int getDelayLevel(long delayMillis) {// 仅支持最大2h(7200000ms)的延时if (delayMillis > 7200000L) {return -1;}// 遍历找到最接近且大于等于delayMillis的级别return LEVEL_TO_MILLIS.entrySet().stream().filter(entry -> entry.getValue() >= delayMillis).min(Map.Entry.comparingByValue()).map(Map.Entry::getKey).orElse(-1);}/*** 根据级别获取对应的时间差(毫秒)*/public static long getDelayMillis(int level) {return LEVEL_TO_MILLIS.getOrDefault(level, 0L);}
}

2、生产者发送延时消息(赛事创建服务):

@Service
public class MarathonEventService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resourceprivate MarathonEventDao eventDao;// 创建赛事并发送报名截止延时消息public void createEvent(MarathonEvent event) {// 1. 保存赛事信息到数据库eventDao.insert(event);Long eventId = event.getId();Date signEndTime = event.getSignEndTime(); // 报名截止时间(如2024-09-15 23:59:59)Date now = new Date();// 2. 计算时间差(毫秒):截止时间 - 当前时间long delayMillis = signEndTime.getTime() - now.getTime();if (delayMillis <= 0) {throw new RuntimeException("报名截止时间不能早于当前时间");}// 3. 获取延时级别int delayLevel = DelayLevelUtil.getDelayLevel(delayMillis);if (delayLevel == -1) {throw new RuntimeException("报名截止时间超出最大支持延时(2h),需调整策略");}// 4. 构造延时消息实体CloseSignEventMessage message = new CloseSignEventMessage();message.setEventId(eventId);message.setPlanCloseTime(signEndTime);// 5. 发送延时消息(指定delayLevel)String destination = "marathon_delay_task:close_sign_channel";rocketMQTemplate.send(destination, MessageBuilder.withPayload(JSON.toJSONString(message)).setHeader(MessageHeaders.DELAY_LEVEL, delayLevel).build());System.out.println("赛事创建成功,已预约报名截止任务:eventId=" + eventId + ",延时级别=" + delayLevel);}
}// 报名截止消息实体
@Data
class CloseSignEventMessage {private Long eventId; // 赛事IDprivate Date planCloseTime; // 计划关闭时间(用于校验)
}

3、消费者处理延时消息(赛事服务):​

消费消息时需校验 “当前时间是否已过计划关闭时间”,避免因 RocketMQ 调度延迟导致提前执行:

@Component
@RocketMQMessageListener(topic = "marathon_delay_task",selectorExpression = "close_sign_channel",consumerGroup = "marathon_delay_consumer_group" // 单独的消费者组,避免与异步消息冲突
)
public class CloseSignChannelConsumer implements RocketMQListener<String> {@Resourceprivate MarathonEventDao eventDao;@Overridepublic void onMessage(String messageBody) {CloseSignEventMessage message = JSON.parseObject(messageBody, CloseSignEventMessage.class);Long eventId = message.getEventId();Date planCloseTime = message.getPlanCloseTime();Date now = new Date();// 1. 校验:当前时间是否已过计划关闭时间(避免调度延迟导致提前执行)if (now.before(planCloseTime)) {System.out.println("当前时间未到计划关闭时间,跳过执行:eventId=" + eventId);return;}// 2. 查询赛事当前状态(避免重复执行)MarathonEvent event = eventDao.selectById(eventId);if (event == null || event.getStatus() == 2) { // 2:已截止System.out.println("赛事已关闭或不存在,跳过执行:eventId=" + eventId);return;}// 3. 执行关闭报名通道逻辑(更新赛事状态为“已截止”)MarathonEvent updateEvent = new MarathonEvent();updateEvent.setId(eventId);updateEvent.setStatus(2); // 已截止updateEvent.setUpdateTime(now);int updateCount = eventDao.updateById(updateEvent);if (updateCount > 0) {System.out.println("报名通道关闭成功:eventId=" + eventId);} else {System.out.println("报名通道关闭失败(可能已被其他线程处理):eventId=" + eventId);}}
}

5.2、 场景 2:赛前 1 小时推送提醒​

5.2.1、 需求分析​

赛事开始前 1 小时(如 2024-10-01 07:00:00),向所有已报名用户推送 “赛前准备提醒”(如携带装备、集合地点)。​

5.2.2、 实现思路​

  1. 批量获取已报名用户:在赛事创建时,无需立即获取用户,而是在 “赛前 1 小时” 任务触发后,批量查询该赛事的已报名用户;​
  2. 发送延时消息:赛事创建时,根据 “赛事开始时间 - 1 小时” 计算延时时间,发送延时消息;​
  3. 消费延时消息:消息到期后,查询该赛事的所有已报名用户,批量发送提醒消息。​

5.2.3、 关键步骤​

1、生产者发送延时消息(赛事创建服务):​

在MarathonEventService的createEvent方法中,新增赛前提醒延时消息发送逻辑:

public void createEvent(MarathonEvent event) {// ... 原有赛事创建逻辑(略)Date eventStartTime = event.getEventTime(); // 赛事开始时间(如2024-10-01 08:00:00)// 计算赛前1小时时间Date remindTime = new Date(eventStartTime.getTime() - 3600 * 1000L); // 减1小时(3600000毫秒)// 计算延时时间(当前时间到remindTime的差值)long remindDelayMillis = remindTime.getTime() - now.getTime();if (remindDelayMillis > 0) {int remindDelayLevel = DelayLevelUtil.getDelayLevel(remindDelayMillis);if (remindDelayLevel != -1) {// 构造赛前提醒消息PreRaceRemindMessage remindMessage = new PreRaceRemindMessage();remindMessage.setEventId(eventId);remindMessage.setEventName(event.getEventName());remindMessage.setRemindTime(remindTime);// 发送赛前提醒延时消息String remindDestination = "marathon_delay_task:pre_race_remind";rocketMQTemplate.send(remindDestination,MessageBuilder.withPayload(JSON.toJSONString(remindMessage)).setHeader(MessageHeaders.DELAY_LEVEL, remindDelayLevel).build());System.out.println("已预约赛前提醒任务:eventId=" + eventId + ",提醒时间=" + remindTime);}}
}// 赛前提醒消息实体
@Data
class PreRaceRemindMessage {private Long eventId; // 赛事IDprivate String eventName; // 赛事名称private Date remindTime; // 计划提醒时间
}

2、消费者处理延时消息(通知服务):​

批量查询已报名用户,发送短信或 APP 推送提醒:

@Component
@RocketMQMessageListener(topic = "marathon_delay_task",selectorExpression = "pre_race_remind",consumerGroup = "marathon_delay_consumer_group"
)
public class PreRaceRemindConsumer implements RocketMQListener<String> {@Resourceprivate MarathonSignRecordDao signRecordDao;@Resourceprivate SmsService smsService;@Resourceprivate AppPushService appPushService;@Overridepublic void onMessage(String messageBody) {PreRaceRemindMessage message = JSON.parseObject(messageBody, PreRaceRemindMessage.class);Long eventId = message.getEventId();String eventName = message.getEventName();Date now = new Date();// 校验:当前时间是否已过计划提醒时间if (now.before(message.getRemindTime())) {System.out.println("赛前提醒时间未到,跳过:eventId=" + eventId);return;}// 批量查询该赛事的已报名用户(分页查询,避免数据量过大)int pageNum = 1;int pageSize = 100;while (true) {PageInfo<MarathonSignRecord> page = signRecordDao.selectByEventId(eventId, pageNum, pageSize);if (page.getList().isEmpty()) {break;}// 遍历用户发送提醒for (MarathonSignRecord record : page.getList()) {Long userId = record.getUserId();String userPhone = record.getUserPhone();String signNo = record.getSignNo();// 1. 发送短信提醒String smsContent = String.format("【%s】赛前1小时提醒:请携带参赛装备,于07:30前到达起点(XX体育中心),凭报名编号%s入场。",eventName, signNo);smsService.sendSms(userPhone, smsContent);// 2. 发送APP推送提醒PushMessage pushMessage = new PushMessage();pushMessage.setUserId(userId);pushMessage.setTitle("赛前提醒");pushMessage.setContent(smsContent);pushMessage.setJumpUrl("/marathon/event/detail?eventId=" + eventId);appPushService.sendPush(pushMessage);}pageNum++;}System.out.println("赛前提醒发送完成:eventId=" + eventId);}
}

六、实战 3:死信队列处理 —— 失败消息的兜底方案​

6.1、 死信队列的触发场景​

在马拉松系统中,以下场景可能导致消息消费失败并进入死信队列:​

  • 短信服务故障(如 API 调用超时、密钥错误),导致 “报名成功短信通知” 重试 16 次后仍失败;​
  • 数据库连接异常,导致 “成绩日志记录” 消费失败;​
  • APP 推送服务限流,导致推送消息多次失败。

6.2、 死信队列的配置(默认自动创建)​

RocketMQ 默认会为每个消费者组自动创建死信队列,无需手动配置,核心规则如下:​

  • 死信队列命名:格式为%DLQ%+消费者组名称(如消费者组marathon_consumer_group对应的死信队列为%DLQ%marathon_consumer_group);​
  • 死信队列数量:与原 Topic 的队列数量一致(默认 4 个),确保消息分区顺序性;​
  • 消息保留时间:默认永久保留,需手动清理或配置定时清理策略(如通过 RocketMQ 控制台设置)。​

验证死信队列创建​

通过 RocketMQ 控制台(需单独部署)或命令行工具查看死信队列:​

1、命令行查看:

# 查看所有Topic,包含死信队列
sh bin/mqadmin topicList -n 192.168.1.20:9876
# 输出中会包含“%DLQ%marathon_consumer_group”

2、控制台查看:登录 RocketMQ 控制台(默认端口 8080),在 “Topic 管理” 中可看到死信队列,点击进入可查看队列中的失败消息。

6.3、 监听死信队列:失败消息的捕获与处理​

需编写专门的死信队列消费者,监听失败消息,实现 “自动重试 + 人工干预” 的双层处理机制。​

6.3.1、 死信消息消费者实现

// 死信队列消费者:监听“报名成功短信通知”的失败消息
@Component
@RocketMQMessageListener(topic = "%DLQ%marathon_consumer_group", // 死信队列Topic(固定格式)selectorExpression = "sign_success_sms", // 仅监听短信通知的失败消息(原Tag)consumerGroup = "marathon_dlq_consumer_group", // 单独的死信消费者组,避免与原消费者冲突messageModel = MessageModel.CLUSTERING // 集群模式,确保失败消息仅被一个实例处理
)
public class SignSuccessSmsDlqConsumer implements RocketMQListener<String> {// 注入短信服务(用于重试发送)@Resourceprivate SmsService smsService;// 注入死信消息日志DAO(用于记录失败消息,便于人工干预)@Resourceprivate DlqMessageLogDao dlqMessageLogDao;// 注入告警服务(发送邮件/钉钉告警,通知运维人员)@Resourceprivate AlertService alertService;@Overridepublic void onMessage(String messageBody) {// 1. 解析死信消息(包含原消息内容与元数据)// 注意:死信消息的body是原消息的完整内容,需先解析为RocketMQ的MessageExt格式MessageExt messageExt = JSON.parseObject(messageBody, MessageExt.class);String originalBody = new String(messageExt.getBody()); // 原消息内容SignSuccessMessage originalMessage = JSON.parseObject(originalBody, SignSuccessMessage.class);String userPhone = originalMessage.getUserPhone();String signNo = originalMessage.getSignNo();// 2. 记录死信消息日志(存入数据库,便于后续追踪)DlqMessageLog dlqLog = new DlqMessageLog();dlqLog.setMessageId(messageExt.getMsgId()); // 消息IDdlqLog.setTopic("marathon_sign_success"); // 原TopicdlqLog.setTag("sign_success_sms"); // 原TagdlqLog.setMessageContent(originalBody); // 原消息内容dlqLog.setReason("短信发送失败,重试16次后进入死信队列"); // 失败原因dlqLog.setCreateTime(new Date());dlqLog.setStatus(0); // 0:待处理,1:已重试,2:已忽略dlqMessageLogDao.insert(dlqLog);// 3. 尝试自动重试(仅重试1次,避免无限循环)boolean retrySuccess = false;try {// 构造短信内容(与原逻辑一致)String smsContent = String.format("【马拉松赛事】您已成功报名“%s”赛事,报名编号:%s,赛事时间:%s,请准时参赛!",originalMessage.getEventId(), signNo, originalMessage.getEventTime());// 重试发送短信retrySuccess = smsService.sendSms(userPhone, smsContent);} catch (Exception e) {e.printStackTrace();}// 4. 处理重试结果if (retrySuccess) {// 重试成功:更新死信日志状态为“已重试”DlqMessageLog updateLog = new DlqMessageLog();updateLog.setId(dlqLog.getId());updateLog.setStatus(1);updateLog.setHandleTime(new Date());updateLog.setHandleRemark("自动重试发送成功");dlqMessageLogDao.updateById(updateLog);} else {// 重试失败:发送告警,通知人工干预String alertContent = String.format("【死信消息告警】短信通知发送失败,用户手机号:%s,报名编号:%s,消息ID:%s,请尽快处理!",userPhone, signNo, messageExt.getMsgId());// 发送钉钉/邮件告警alertService.sendDingTalkAlert(alertContent);alertService.sendEmailAlert("marathon-dlq-alert@example.com", "死信消息告警", alertContent);// 更新死信日志状态为“待人工处理”DlqMessageLog updateLog = new DlqMessageLog();updateLog.setId(dlqLog.getId());updateLog.setStatus(0);updateLog.setHandleRemark("自动重试失败,等待人工处理");dlqMessageLogDao.updateById(updateLog);}}
}// 死信消息日志实体
@Data
public class DlqMessageLog {private Long id; // 主键IDprivate String messageId; // RocketMQ消息IDprivate String topic; // 原消息Topicprivate String tag; // 原消息Tagprivate String messageContent; // 原消息内容private String reason; // 进入死信队列的原因private Integer status; // 状态:0-待处理,1-已重试,2-已忽略private Date createTime; // 进入死信队列时间private Date handleTime; // 处理时间private String handleRemark; // 处理备注(人工干预时填写)
}

6.3.2、 人工干预流程​

对于自动重试失败的死信消息,需提供人工处理入口(如运维后台),核心功能包括:​

  1. 死信消息查询:按消息 ID、用户手机号、时间范围查询死信日志;​
  2. 手动重试:运维人员排查故障(如修复短信服务密钥)后,点击 “手动重试” 按钮,调用短信服务重新发送;​
  3. 消息忽略:对于无效消息(如用户手机号已注销),可标记为 “已忽略”,避免重复告警。​

手动重试接口示例:

@RestController
@RequestMapping("/admin/dlq")
public class DlqMessageController {@Resourceprivate DlqMessageLogDao dlqMessageLogDao;@Resourceprivate SmsService smsService;/*** 手动重试死信消息(仅管理员可调用)*/@PostMapping("/retry")@PreAuthorize("hasRole('ADMIN')") // 权限控制public ResultVO retryDlqMessage(@RequestParam Long logId) {// 1. 查询死信日志DlqMessageLog log = dlqMessageLogDao.selectById(logId);if (log == null || log.getStatus() != 0) {return ResultVO.fail("消息不存在或已处理");}// 2. 解析原消息SignSuccessMessage message = JSON.parseObject(log.getMessageContent(), SignSuccessMessage.class);String smsContent = String.format("【马拉松赛事】您已成功报名“%s”赛事,报名编号:%s,赛事时间:%s,请准时参赛!",message.getEventId(), message.getSignNo(), message.getEventTime());// 3. 手动发送短信boolean success = smsService.sendSms(message.getUserPhone(), smsContent);if (success) {// 更新日志状态DlqMessageLog updateLog = new DlqMessageLog();updateLog.setId(logId);updateLog.setStatus(1);updateLog.setHandleTime(new Date());updateLog.setHandleRemark("管理员手动重试成功");dlqMessageLogDao.updateById(updateLog);return ResultVO.success("重试成功");} else {return ResultVO.fail("重试失败,请检查服务状态");}}/*** 忽略死信消息*/@PostMapping("/ignore")@PreAuthorize("hasRole('ADMIN')")public ResultVO ignoreDlqMessage(@RequestParam Long logId) {DlqMessageLog updateLog = new DlqMessageLog();updateLog.setId(logId);updateLog.setStatus(2);updateLog.setHandleTime(new Date());updateLog.setHandleRemark("管理员标记为忽略");dlqMessageLogDao.updateById(updateLog);return ResultVO.success("忽略成功");}
}

七、RocketMQ 监控与生产环境优化​

7.1、 监控体系建设:确保消息可靠性​

7.1.1、 核心监控指标​

需通过监控工具(如 Prometheus+Grafana、RocketMQ 控制台)实时监控以下指标:

指标类别核心指标警阈值
消息发送发送成功率、发送延迟、发送失败次数成功率 <99.9%、延迟> 100ms、失败次数 > 10 次 / 分钟
消息消费消费成功率、消费延迟、堆积消息数成功率 <99.9%、延迟> 500ms、堆积数 > 1000 条
死信队列死信消息新增数、死信消息总数新增数 > 5 条 / 分钟、总数 > 100 条
集群健康度Broker 存活数、NameServer 存活数、磁盘使用率Broker 存活数 <2、磁盘使用率> 85%

7.1.2、 Prometheus+Grafana 监控配置​

1、部署 RocketMQ Exporter:​

RocketMQ 提供官方 Exporter,用于暴露监控指标给 Prometheus,下载地址:rocketmq-exporter;

# 启动Exporter(指定NameServer地址)
java -jar rocketmq-exporter-0.0.2-SNAPSHOT.jar --rocketmq.config.namesrvAddr=192.168.1.20:9876

2、配置 Prometheus:​

在prometheus.yml中添加 Exporter 地址:

scrape_configs:- job_name: 'rocketmq'static_configs:- targets: ['192.168.1.20:5557'] # Exporter默认端口5557

3、导入 Grafana 模板:​

在 Grafana 中导入 RocketMQ 监控模板(模板 ID:10477),即可看到完整的监控面板,包含消息发送、消费、死信队列等指标。​

7.2、 生产环境优化建议​

7.2.1、 消息可靠性优化​

1、生产者确认机制:​

开启消息发送确认(ACK),确保消息成功写入 Broker:

// 在RocketMQProducer中添加同步发送并确认
public <T> boolean sendWithAck(String topic, String tag, T data) {String destination = topic + ":" + tag;String messageBody = JSON.toJSONString(data);try {// 同步发送,等待Broker确认SendResult result = rocketMQTemplate.syncSend(destination, messageBody);// 确认消息发送状态为成功return SendStatus.SEND_OK.equals(result.getSendStatus());} catch (Exception e) {e.printStackTrace();return false;}
}

2、消费者幂等处理:​

由于网络重试可能导致消息重复消费(如短信重复发送),需实现消费幂等:

// 短信消费者添加幂等校验(基于报名编号+消息ID)
@Override
public void onMessage(String messageBody) {SignSuccessMessage message = JSON.parseObject(messageBody, SignSuccessMessage.class);String msgId = messageExt.getMsgId(); // 从MessageExt获取消息IDString uniqueKey = message.getSignNo() + ":" + msgId; // 唯一标识// 检查是否已处理过该消息(Redis或数据库记录)Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent(uniqueKey, "1", 24, TimeUnit.HOURS);if (Boolean.FALSE.equals(isProcessed)) {System.out.println("消息已处理,跳过重复消费:" + uniqueKey);return;}// 正常发送短信逻辑(略)
}

7.2.2、 性能优化​

1、消息批量发送与消费:​

对于高频消息(如成绩推送),采用批量发送减少网络交互:

// 批量发送成绩消息
public void batchSendScoreMessages(List<ScorePushMessage> messages) {String destination = "marathon_score:score_push_app";// 构建批量消息List<Message> rocketMessages = messages.stream().map(msg -> MessageBuilder.withPayload(JSON.toJSONString(msg)).build()).collect(Collectors.toList());// 批量发送rocketMQTemplate.syncSend(destination, rocketMessages);
}

2、Topic 与队列规划:​

  • 按业务模块拆分 Topic(如marathon_sign、marathon_score、marathon_delay),避免单 Topic 消息过多;​
  • 每个 Topic 的队列数设置为 Broker 数量的整数倍(如 3 个 Broker,队列数设为 6),确保负载均衡。​

7.2.3、 高可用部署​

1、RocketMQ 集群部署:​

生产环境需部署 “2 个 NameServer+3 个 Broker(主从架构)”,确保:​

  • NameServer 集群:避免单点故障,通过配置中心(如 Nacos)动态感知;​
  • Broker 主从:每个 Broker 配置 1 个从节点,主节点故障时从节点自动切换。​

2、数据持久化配置:​

开启 Broker 的 AOF + 同步刷盘,确保消息不丢失:

# 修改broker.conf配置
flushDiskType=SYNC_FLUSH # 同步刷盘
storePathRootDir=/data/rocketmq/store # 存储目录(独立磁盘)

八、总结与扩展:RocketMQ 在马拉松系统中的价值​

8.1、 核心价值总结​

  1. 异步解耦:将 “报名 - 通知”“成绩录入 - 推送” 等强耦合流程拆分为异步通信,提升接口响应速度(从 1000ms 降至 200ms),降低服务依赖风险;​
  2. 定时任务标准化:通过延时队列统一管理 “报名截止”“赛前提醒” 等定时任务,避免分布式部署下的重复执行,时间精度达秒级;​
  3. 消息可靠性保障:死信队列 + 重试机制 + 监控告警,确保失败消息可追溯、可处理,避免 “用户漏收通知”“报名通道未关闭” 等业务故障。​

8.2、 扩展场景​

除本文覆盖的场景外,RocketMQ 还可用于马拉松系统的其他模块:​

  1. 实时日志收集:通过 RocketMQ 将各服务日志异步发送到 ELK,实现日志集中分析;​
  2. 赛事数据同步:将报名数据、成绩数据异步同步到数据仓库,用于后续数据分析(如报名用户地域分布、成绩排名统计);​
  3. 分布式事务:基于 RocketMQ 的事务消息,解决 “报名成功但订单未生成” 的分布式事务问题(如 TCC 或 SAGA 模式)。​

8.3、 未来方向​

  1. 云原生适配:结合 K8s 部署 RocketMQ,通过 StatefulSet 管理 Broker 状态,实现自动扩缩容与故障自愈;​
  2. 智能告警:集成 AI 算法,基于历史故障数据预测死信消息趋势(如短信服务即将限流时提前告警);​
  3. 消息轨迹追踪:开启 RocketMQ的消息轨迹功能,追踪消息从发送到消费的全链路(如发送时间、Broker 存储节点、消费节点),便于问题定位;​
  4. 多租户隔离:通过 RocketMQ 的 Topic 权限控制(如 ACL),实现多赛事租户的消息隔离,避免数据混淆。

九、实战总结与选型建议​

9.1、 马拉松系统 RocketMQ 落地总结​

在马拉松系统中,RocketMQ 通过 “异步化 + 延时任务 + 死信处理” 的组合方案,有效解决了高并发场景下的性能与可靠性问题,核心落地成果如下:​

  1. 性能提升:报名接口响应时间从 1000ms 降至 200ms,支撑 5000 QPS 峰值请求,无线程池耗尽风险;​
  2. 可靠性保障:消息发送成功率 99.99%,失败消息通过死信队列 + 人工干预的机制,处理率达 100%,无用户漏收通知案例;​
  3. 运维效率:通过 Prometheus+Grafana 监控,故障平均定位时间从 30 分钟缩短至 5 分钟;通过延时队列,减少 80% 的定时任务维护成本(无需手动部署分布式调度框架)。​

9.2、 消息中间件选型建议​

在选择消息中间件时,需结合业务场景的核心需求(如性能、可靠性、延时功能),以下是不同场景的选型参考:

业务场景核心需求推荐中间件不推荐中间件
马拉松报名通知、成绩推送高并发(万级 QPS)、低延迟(<100ms)、异步解耦RocketMQ、KafkaRabbitMQ(万级 QPS 下性能不足)
报名截止关通道、赛前提醒分布式定时任务、延时精度(秒级)、避免重复执行RocketMQ(延时队列成熟)Kafka(无原生延时队列,需二次开发)
金融级赛事支付通知事务消息、强一致性、零丢失RocketMQ(支持事务消息)、RabbitMQKafka(事务支持较弱)
轻量级内部通知(如日志)部署简单、开发成本低RabbitMQ、RocketMQ LiteKafka(部署与维护成本高)

9.3、 给开发者的核心建议​

  1. 优先复用成熟功能:如 RocketMQ 的延时队列、死信队列,避免重复开发(如自定义定时任务框架),降低技术债务;​
  2. 重视消息可靠性:从 “发送确认、消费幂等、死信处理” 三个维度设计方案,尤其是用户通知类场景(如短信、推送),避免因消息丢失导致用户投诉;​
  3. 平衡性能与复杂度:高并发场景下可采用 “批量发送、异步发送” 提升性能,但需避免过度优化(如为追求极致性能引入复杂的分片逻辑);​
  4. 运维前置:在系统设计阶段就规划监控与告警(如消息堆积、死信新增),避免上线后出现 “故障无法感知” 的问题。​

十、结语​

RocketMQ 作为一款高性能、高可靠的分布式消息中间件,在马拉松系统中不仅是 “异步通信工具”,更是 “系统解耦与可靠性保障的核心基础设施”。通过异步化拆分业务流程,它解决了高并发下的接口响应延迟问题;通过延时队列,它标准化了分布式定时任务的实现;通过死信队列与监控告警,它确保了消息全链路的可靠性。​

随着马拉松系统向 “大规模、高可用、智能化” 方向发展,RocketMQ 的应用也将进一步深化 —— 从单一的消息通信,扩展到 “消息 + 事务 + 监控 + 智能告警” 的一体化解决方案。对于开发者而言,掌握 RocketMQ 不仅是掌握一项技术,更是理解分布式系统中 “异步解耦” 与 “可靠性设计” 的核心思想,这将为后续复杂系统的开发奠定坚实基础。​

最终,成功的技术落地不是 “工具的堆砌”,而是 “业务需求与技术特性的精准匹配”。只有结合马拉松系统的报名、通知、赛事管理等核心场景,合理设计 Topic、Tag、延时策略与死信处理流程,才能让 RocketMQ 真正发挥价值,成为支撑赛事平稳运行的 “隐形基石”。

http://www.dtcms.com/a/491054.html

相关文章:

  • 通达信指标平台
  • 网站建设及推广培训网站备案查询站长工具
  • MATLAB2025B版本新特点
  • Node.js使用Express+SQLite实现登录认证
  • 仿百度百科网站源码设计类专业学校有哪些
  • 重庆建站多少钱一年工业产品设计培训
  • 【IEEE出版 | 早鸟优惠本周截止】人工智能驱动图像处理与计算机视觉技术国际学术研讨会 (AIPCVT 2025)
  • 网站开发案例教程东营网站建设服务商
  • 基于微信小程序的垃圾分类管理系统【2026最新】
  • SSM高校教室申请管理系统yf80k(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 创建一个SpringBoot项目(连接数据库)
  • 飞沐网站设计大鹏网站建设建站好不好
  • 淮南专业网站建设怎样保存网站资料 做证据
  • 如何在WPF中实现ComboBox多选
  • 单北斗GNSS变形监测是什么?主要用于大坝及桥梁安全监测吗?
  • 网站建设公司的服务器建设网站申请书
  • 如何加强省市级门户网站的建设太原网站优化服务
  • YOLO-V1 与 YOLO-V2 核心笔记
  • 公司做网站的价格江阴宁波网站建设那家好
  • 2025年--Lc192-5. 最长回文子串(动态规划在字符串的应用)--Java版
  • Docker容器化核心知识体系:从入门到实践
  • 华为云建站怎么样安徽先锋网站两学一做
  • 驾驭Excel数据:使用C#将Excel导出为DataTable的实战指南
  • LeetCode 400 - 第 N 位数字
  • 团购网站开发的可行性分析营销推广渠道有哪些
  • SHA-256 算法
  • 淘宝网官方网站购物商城电子邮件免费注册
  • 网站策划薪资大型网站开发团队
  • 广西建设质监站官方网站成都装修公司招聘装修工人
  • 了解学习Python编程之python基础