RocketMQ生产者多种API实战使用
一、前言
这些都是我对RocketMQTemplate中的生产者API包了一层,更加方便我们平常开发使用,如果想了解关于RocketMQ比较基础的知识或者单点部署RocketMQ可以看我下面两篇文章:
https://blog.csdn.net/qq_73440769/article/details/143443049?spm=1001.2014.3001.5502
https://blog.csdn.net/qq_73440769/article/details/151049115?spm=1001.2014.3001.5502
二、干货合集
有以下这些类型:
-
sendSyncSimpleMessage→ 同步普通消息发送。
📌 底层调用:rocketMQTemplate.syncSend(destination, message, timeout) -
sendAsyncMessage→ 异步普通消息发送,通过回调处理结果。
📌 底层调用:rocketMQTemplate.asyncSend(destination, message, callback, timeout) -
sendOneWayMessage→ 单向消息发送,只发不等,性能最高但不可靠。
📌 底层调用:rocketMQTemplate.sendOneWay(destination, message) -
sendDelayMessageByDelayLevel→ 按 RocketMQ 预定义延迟级别(1~18)发送延时消息。
📌 底层调用:rocketMQTemplate.syncSend(destination, message, timeout, delayLevel) -
sendDelayMessageBySeconds→ 按指定秒数发送延时消息
📌 底层调用:rocketMQTemplate.syncSendDelayTimeSeconds(destination, message, delaySeconds) -
sendSyncOrderlyMessage→ 同步顺序消息,相同shardingKey保证 FIFO。
📌 底层调用:rocketMQTemplate.syncSendOrderly(destination, message, hashKey, timeout) -
sendAsyncOrderlyMessage→ 异步顺序消息,兼顾顺序性与吞吐量。
📌 底层调用:rocketMQTemplate.asyncSendOrderly(destination, message, hashKey, callback, timeout) -
sendOneWayOrderMessage→ 单向顺序消息(极少使用)。
📌 底层调用:rocketMQTemplate.sendOneWayOrderly(destination, message, hashKey)
| 消息类型 | 方法示例(简化) | 是否阻塞 | 是否返回结果 | 可靠性 | 性能 | 典型使用场景 | 注意事项 |
|---|---|---|---|---|---|---|---|
| 同步普通消息 | sendSyncSimpleMessage | ✅ 是 | ✅ 是 | 高 | 中 | 支付成功通知、订单创建确认 | 超时可能失败;避免高频调用 |
| 异步普通消息 | sendAsyncMessage | ❌ 否 | 回调 | 中 | 高 | 用户行为埋点、日志上报、非核心通知 | 异常需在回调中处理;不阻塞主线程 |
| 单向消息 | sendOneWayMessage | ❌ 否 | ❌ 否 | 低 | 极高 | 心跳检测、监控指标、可丢失日志 | 绝不用于关键业务;消息可能静默丢失 |
| 延时消息(按级别) | sendDelayMessageByDelayLevel | ✅ 是 | ✅ 是 | 高 | 中 | 订单30分钟未支付自动取消 | delayLevel 仅支持 1~18;时间固定 |
| 延时消息(按秒) | sendDelayMessageBySeconds | ✅ 是 | ✅ 是 | 高 | 中 | 自定义延迟任务(如5分钟后提醒) | 按实际传的秒数 |
| 同步顺序消息 | sendSyncOrderlyMessage | ✅ 是 | ✅ 是 | 高 | 中 | 订单状态变更(创建→支付→发货) | 相同 shardingKey才保序;不同的key并行 |
| 异步顺序消息 | sendAsyncOrderlyMessage | ❌ 否 | 回调 | 中 | 高 | 需要顺序且高吞吐的场景 | 回调中处理异常;保证 key 一致性 |
| 单向顺序消息 | sendOneWayOrderMessage | ❌ 否 | ❌ 否 | 低 | 高 | 顺序但可容忍丢失的日志流 | 极少使用;顺序+不可靠,矛盾场景 |
三、生产者API详讲
这里的所有code只是结合讲解,文章最后会有最完整的代码
1、RocketMQ 消息模型
- Topic:消息主题,一类业务消息的逻辑分类(如
OrderTopic)。 - Tag:子标签,用于消费者端过滤(如
TagA,TagB)。 - Keys:消息唯一键(非强制),用于运维追踪、去重、按 key 查询。
- ShardingKey / HashKey:顺序消息的分区键,决定消息发往哪个队列(Queue),相同 key 保证 FIFO。
2、同步普通消息(Sync Send)
✅ 方法:
sendSyncSimpleMessage(String topic, Object message, String keys)
sendSyncSimpleMessage(String topic, String tag, Object message, String keys)
🔧 原理:
- 调用
rocketMQTemplate.syncSend(),阻塞等待 Broker 返回结果(成功/失败)。 - 默认超时 2 秒(可配置)。
🎯 使用场景:
- 关键业务链路,必须确保消息发送成功才能继续(如支付成功后发通知)。
- 需要获取
msgId做日志追踪或落库。 - 调用方能容忍短暂延迟(毫秒级)。
⚠️ 注意:
- 不适合高并发场景(会阻塞线程)。
- 若 Broker 不可用,会抛异常,需做好降级(如记录 DB + 定时重试)。
🖊️ code:
// 后续的方法也会用到这个
private GeneralMessageEntity buildMessagePayload(Object message, String keys) {return GeneralMessageEntity.builder().body(JSONUtil.toJsonStr(message)).keys(keys).build();
}
// ==================== 1. 同步普通消息 需要等待broker返回结果 ====================/*** <p>* description: 无Tag标签的发送同步普通消息* </p>** @param topic 发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult* @author: bluefoxyu* @date: 2025-01-08 09:22:38*/
public SendResult sendSyncSimpleMessage(String topic, Object message, String keys) {return sendSyncSimpleMessage(topic, null, message, keys);
}/*** <p>* description: 有Tag标签的发送同步普通消息* </p>** @param topic 发送消息的主题* @param tag 发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult 发送rocketmq的结果* @author: bluefoxyu* @date: 2025-01-08 09:20:39*/
public SendResult sendSyncSimpleMessage(String topic, String tag, Object message, String keys) {// 构造消息体GeneralMessageEntity payload = buildMessagePayload(message, keys);// 发送消息return this.doSendSync(topic, tag, keys, payload);
}/*** 发送普通消息** @param topic 消息发送主题,用于标识同一类业务逻辑的消息* @param tag 消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys 消息索引键,可根据关键字精确查找某条消息* @param generalMessage 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/
private SendResult doSendSync(String topic, String tag, String keys, GeneralMessageEntity generalMessage) {try {String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(generalMessage).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L);log.info("[同步消息] 发送成功 | status={}, msgId={}, keys={}",result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[同步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(generalMessage), e);throw e;}
}
3、异步普通消息(Async Send)
✅ 方法:
sendAsyncMessage(String topic, Object message, String keys)
sendAsyncMessage(String topic, String tag, Object message, String keys)
🔧 原理:
- 调用
asyncSend(),立即返回,不阻塞主线程。 - 结果通过
SendCallback回调处理(成功/异常)。
🎯 使用场景:
- 非核心链路,允许丢失少量消息(如用户行为埋点、日志上报)。
- 高吞吐场景,避免同步阻塞影响主流程性能。
- 需要异步处理发送结果(如失败告警、持久化重试)。
⚠️ 注意:
- 回调在独立线程执行,注意线程安全。
- 异常不会抛给调用方,需在
onException中处理(如打日志、发告警)。
🖊️ code:
// ==================== 2. 异步普通消息 不需要等待broker返回结果 ====================/*** 异步普通消息 不需要等待broker返回结果** @param topic 发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识*/
public void sendAsyncMessage(String topic, Object message, String keys) {sendAsyncMessage(topic, null, message, keys);
}/*** 异步普通消息** @param topic 发送消息的主题* @param tag 发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/
public void sendAsyncMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSend(destination, msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步消息] 发送成功 | status={}, msgId={}, keys={}",sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);
}
4、单向消息(One-Way Send)
✅ 方法:
sendOneWayMessage(String topic, Object message, String keys)
sendOneWayMessage(String topic, String tag, Object message, String keys)
🔧 原理:
- 调用
sendOneWay(),只发不等,连回调都没有。 - 性能最高,但完全不关心是否成功。
🎯 使用场景:
- 超高频、可丢失的消息(如心跳、监控指标)。
- 对可靠性要求极低,只求“尽力而为”。
- 系统资源紧张,不能承受任何额外开销。
⚠️ 注意:
- 绝对不要用于关键业务!消息可能静默丢失。
- 无法追踪
msgId,无法重试。
🖊️ code:
// ==================== 3. 单向消息(不关心结果)====================/*** 发送单向消息(不关心结果)** @param topic 待发送的 topic* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/
public void sendOneWayMessage(String topic, Object message, String keys) {sendOneWayMessage(topic, null, message, keys);
}/*** 发送单向消息(不关心结果)** @param topic 待发送的 topic* @param tag 待发送的 tag* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/
public void sendOneWayMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWay(destination, msg);log.info("[单向消息] 已发送 | keys={}", keys);
}
5、延时消息(Delay Message)
✅ 方法:
// 按预定义级别(1~18)
sendDelayMessageByDelayLevel(...)// 按秒数(更灵活)
sendDelayMessageBySeconds(...)
🔧 原理:
- RocketMQ 内部有 18 个延时队列(SCHEDULE_TOPIC_XXXX),消息先投递到延时队列,到期后再转到真实 Topic。
syncSendDelayTimeSeconds()是 Spring Boot Starter 提供的封装。
🎯 使用场景:
- 订单超时取消(30 分钟未支付自动关闭)。
- 任务延迟执行(如 5 分钟后提醒用户)。
- 重试退避机制(第一次失败 10 秒后重试,第二次 1 分钟后...)。
⚠️ 注意:
- 原生 RocketMQ 只支持固定 18 个级别,自定义秒数依赖客户端映射(可能不精确)。
- 延时精度受 Broker 调度影响,不是实时(通常误差 ±1 秒)。
- 不支持任意时间(如“明天上午 10 点”),需结合定时任务。
🖊️ code:
// ==================== 4. 延时消息(Delay Level 1~18)====================/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)不带 Tag** <p>⚠️ 注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------* 1 | 1 秒* 2 | 5 秒* 3 | 10 秒* 4 | 30 秒* 5 | 1 分钟* 6 | 2 分钟* 7 | 3 分钟* 8 | 4 分钟* 9 | 5 分钟* 10 | 6 分钟* 11 | 7 分钟* 12 | 8 分钟* 13 | 9 分钟* 14 | 10 分钟* 15 | 20 分钟* 16 | 30 分钟* 17 | 1 小时* 18 | 2 小时* </pre>** @param topic 消息主题(必须)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException 当发送过程中发生异常*/
public SendResult sendDelayMessageByDelayLevel(String topic, Object message, String keys, int delayLevel) {return sendDelayMessageByDelayLevel(topic, null, message, keys, delayLevel);
}/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)** <p>⚠️ 注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------* 1 | 1 秒* 2 | 5 秒* 3 | 10 秒* 4 | 30 秒* 5 | 1 分钟* 6 | 2 分钟* 7 | 3 分钟* 8 | 4 分钟* 9 | 5 分钟* 10 | 6 分钟* 11 | 7 分钟* 12 | 8 分钟* 13 | 9 分钟* 14 | 10 分钟* 15 | 20 分钟* 16 | 30 分钟* 17 | 1 小时* 18 | 2 小时* </pre>** @param topic 消息主题(必须)* @param tag 消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException 当发送过程中发生异常*/
public SendResult sendDelayMessageByDelayLevel(String topic, String tag, Object message, String keys, int delayLevel) {if (delayLevel < 1 || delayLevel > 18) {throw new IllegalArgumentException("RocketMQ delayLevel must be between 1 and 18");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L, delayLevel);log.info("[延时消息] 发送成功 | delayLevel={}, status={}, msgId={}, keys={}",delayLevel, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败 | delayLevel={}, keys={}", delayLevel, keys, e);throw e;}
}/*** 发送延时消息(指定延迟秒数)** @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键* @param delaySeconds 延迟秒数(必须 > 0)* @return SendResult*/
public SendResult sendDelayMessageBySeconds(String topic, Object message, String keys, long delaySeconds) {return sendDelayMessageBySeconds(topic, null, message, keys, delaySeconds);
}/*** 发送延时消息(指定延迟秒数 + Tag)** @param topic 消息主题* @param tag 消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键* @param delaySeconds 延迟秒数(>0)* @return SendResult*/
public SendResult sendDelayMessageBySeconds(String topic, String tag, Object message, String keys, long delaySeconds) {if (delaySeconds <= 0) {throw new IllegalArgumentException("delaySeconds must be greater than 0");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {// 使用 RocketMQ Spring 提供的秒级延迟 APISendResult result = rocketMQTemplate.syncSendDelayTimeSeconds(destination, msg, delaySeconds);log.info("[延时消息] 发送成功(按秒)| delaySeconds={}, status={}, msgId={}, keys={}",delaySeconds, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败(按秒)| delaySeconds={}, keys={}", delaySeconds, keys, e);throw e;}
}
6、顺序消息(Ordered Message)
✅ 方法:
// 同步
sendSyncOrderlyMessage(...)
// 异步
sendAsyncOrderlyMessage(...)
// 单向
sendOneWayOrderMessage(...)
🔧 原理:
- 通过
shardingKey(也叫 hashKey)计算队列索引,相同 key 的消息发往同一 Queue。 - Consumer 端单线程消费该 Queue,从而保证 FIFO。
🎯 使用场景:
- 强顺序业务:
- 订单状态变更:创建 → 支付 → 发货 → 完成
- 账户余额变动:充值 → 消费 → 退款
- 数据库 binlog 同步
⚠️ 注意:
- 全局顺序 vs 局部顺序:
- 全局顺序:整个 Topic 只有一个 Queue(性能差,不推荐)。
- 局部顺序:按业务 key 分区(如 orderId),不同订单并行,同订单串行(推荐)。
- Producer 和 Consumer 都必须使用相同的
shardingKey。 - 若 Consumer 处理慢,会导致该 Queue 积压,影响其他 key 的消费(需监控)。
🖊️ code:
// ==================== 5. 顺序消息(保证同一 shardingKey 有序)====================/*** 同步顺序消息** @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys 消息唯一键* @return SendResult*/
public SendResult sendSyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {return sendSyncOrderlyMessage(topic, null, message, shardingKey, keys);
}/*** 同步顺序消息(Tag)** @param topic 消息主题* @param tag 消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证* @param keys 消息唯一键* @return SendResult*/
public SendResult sendSyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000L);log.info("[顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[顺序消息] 发送失败 | shardingKey={}, keys={}", shardingKey, keys, e);throw e;}
}/*** 异步顺序消息(保证同一 shardingKey 有序)** @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys 消息唯一键*/
public void sendAsyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {sendAsyncOrderlyMessage(topic, null, message, shardingKey, keys);
}/*** 异步顺序消息(带 Tag)** @param topic 消息主题* @param tag 消息标签(可为 null)* @param message 发送的消息* @param shardingKey 顺序分片键* @param keys 消息唯一键*/
public void sendAsyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSendOrderly(destination, msg, shardingKey, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步顺序消息] 发送失败 | shardingKey={}, keys={}, message={}",shardingKey, keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);
}/*** 发送单向顺序消息** @param topic 消息主题* @param message 发送的消息内容(任意对象,内部会转为 JSON 字符串)* @param keys 消息的唯一业务键(用于消息追踪、去重、查询),建议使用 UUID 或业务 ID* @param shardingKey 顺序消息的分区键(也称 sharding key 或 ordering key)。* <p>* RocketMQ 会根据该值的哈希结果选择队列(queue),* 相同 hashKey 的消息会被发送到同一个队列,从而保证消费顺序。* <br>* 例如:订单 ID、用户 ID、流水号等需要保序的业务维度。* </p>*/
public void sendOneWayOrderMessage(String topic, Object message, String keys, String shardingKey) {sendOneWayOrderMessage(topic, null, message, keys, shardingKey);
}/*** 发送带 Tag 的单向顺序消息** @param topic 消息主题* @param tag 消息标签,用于消费者端过滤(可为 null)* @param message 消息体* @param keys 消息唯一键(用于运维追踪)* @param shardingKey 顺序分片键(sharding key):* <ul>* <li>相同 hashKey 的消息会路由到同一 MessageQueue</li>* <li>从而保证 FIFO 顺序消费</li>* <li>不同 hashKey 之间不保证顺序</li>* </ul>* 示例:订单系统中可用 orderId 作为 hashKey,确保同一订单的操作按序处理。*/
public void sendOneWayOrderMessage(String topic, String tag, Object message, String keys, String shardingKey) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWayOrderly(destination, msg, shardingKey);log.info("[单向顺序消息] 已发送 | keys={}, hashKey={}", keys, shardingKey);
}
| 类型 | 推荐使用 | 谨慎使用 |
|---|---|---|
| 同步消息 | 关键业务、需确认结果 | 高频调用、性能敏感 |
| 异步消息 | 主流程解耦、高吞吐 | 需强一致性的场景 |
| 单向消息 | 监控/心跳/日志 | 任何重要业务 |
| 延时消息 | 超时控制、延迟任务 | 精确时间调度 |
| 顺序消息 | 状态机、流水操作 | 非顺序业务(浪费性能) |
四、实战
大致结构如下:

1、依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bluefoxyu</groupId><artifactId>RocketMQ-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.4</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.4</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.27</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.16</version></dependency></dependencies></project>
2、配置
application.yml:
server:port: 8080spring:profiles:active: devrocketmq:name-server: rocketmq所在服务器IP:9876producer:group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局发送者组定义send-message-timeout: 2000# 发送消息失败时的重试次数。设置为 1 表示如果发送失败,会再重试一次(总共尝试两次)。适用于同步发送消息失败时的重试次数。retry-times-when-send-failed: 1# 异步发送失败时的重试次数。设置为 1 表示在异步发送失败时会再尝试一次。适用于异步发送消息失败时的重试次数。retry-times-when-send-async-failed: 1logging:level:com:bluefoxyu:producer: infoconsumer: infocontroller: info
org.springframework.boot.autoconfigure.AutoConfiguration.imports:
# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
3、统一消息实体
GeneralMessageEntity:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serial;
import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEntity implements Serializable {@Serialprivate static final long serialVersionUID = 1L;private String body;private String keys;}
4、常量类
RocketMQConstant:

/*** RocketMQ 常量类* @author bluefoxyu*/
public class RocketMQConstant {// =============== 【专用于 SimpleController 的常量】===============public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";public static final String MESSAGE_TOPIC_1 = "message_topic_1";public static final String MESSAGE_TOPIC_2 = "message_topic_2";public static final String MESSAGE_TAG_A = "message_tag_A";public static final String MESSAGE_TAG_B = "message_tag_B";public static final String MESSAGE_TAG_C = "message_tag_C";// =============== 【新增:专用于 MultipleController 的常量】===============// Topicspublic static final String MULTIPLE_SYNC_TOPIC = "multiple_sync_topic";public static final String MULTIPLE_ASYNC_TOPIC = "multiple_async_topic";public static final String MULTIPLE_ONEWAY_TOPIC = "multiple_oneway_topic";public static final String MULTIPLE_DELAY_TOPIC = "multiple_delay_topic";public static final String MULTIPLE_ORDERLY_TOPIC = "multiple_orderly_topic";// Consumer Groupspublic static final String MULTIPLE_SYNC_CONSUMER_GROUP = "multiple_sync_consumer_group";public static final String MULTIPLE_ASYNC_CONSUMER_GROUP = "multiple_async_consumer_group";public static final String MULTIPLE_ONEWAY_CONSUMER_GROUP = "multiple_oneway_consumer_group";public static final String MULTIPLE_DELAY_CONSUMER_GROUP = "multiple_delay_consumer_group";public static final String MULTIPLE_ORDERLY_CONSUMER_GROUP = "multiple_orderly_consumer_group";// Tags(可选,这里统一用 "*" 或简单 tag)public static final String MULTIPLE_TAG_SYNC = "SYNC";public static final String MULTIPLE_TAG_ASYNC = "ASYNC";public static final String MULTIPLE_TAG_ONEWAY = "ONEWAY";public static final String MULTIPLE_TAG_DELAY = "DELAY";public static final String MULTIPLE_TAG_ORDERLY = "ORDERLY";// RocketMQConstant.javapublic static final String MULTIPLE_ONEWAY_ORDERLY_TOPIC = "MULTIPLE_ONEWAY_ORDERLY_TOPIC";public static final String MULTIPLE_TAG_ONEWAY_ORDERLY = "TAG_ONEWAY_ORDERLY";public static final String MESSAGE_TAG_ORDER = "ORDER";}
5、生产者API封装
RocketMQMessageProducer:

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 封装全体的消息生产者*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQMessageProducer {private final RocketMQTemplate rocketMQTemplate;// ==================== 1. 同步普通消息 需要等待broker返回结果 ====================/*** <p>* description: 无Tag标签的发送同步普通消息* </p>** @param topic 发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult* @author: bluefoxyu* @date: 2025-01-08 09:22:38*/public SendResult sendSyncSimpleMessage(String topic, Object message, String keys) {return sendSyncSimpleMessage(topic, null, message, keys);}/*** <p>* description: 有Tag标签的发送同步普通消息* </p>** @param topic 发送消息的主题* @param tag 发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult 发送rocketmq的结果* @author: bluefoxyu* @date: 2025-01-08 09:20:39*/public SendResult sendSyncSimpleMessage(String topic, String tag, Object message, String keys) {// 构造消息体GeneralMessageEntity payload = buildMessagePayload(message, keys);// 发送消息return this.doSendSync(topic, tag, keys, payload);}/*** 发送普通消息** @param topic 消息发送主题,用于标识同一类业务逻辑的消息* @param tag 消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys 消息索引键,可根据关键字精确查找某条消息* @param generalMessage 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/private SendResult doSendSync(String topic, String tag, String keys, GeneralMessageEntity generalMessage) {try {String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(generalMessage).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L);log.info("[同步消息] 发送成功 | status={}, msgId={}, keys={}",result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[同步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(generalMessage), e);throw e;}}// ==================== 2. 异步普通消息 不需要等待broker返回结果 ====================/*** 异步普通消息 不需要等待broker返回结果* @param topic 发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识*/public void sendAsyncMessage(String topic, Object message, String keys) {sendAsyncMessage(topic, null, message, keys);}/*** 异步普通消息* @param topic 发送消息的主题* @param tag 发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/public void sendAsyncMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSend(destination, msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步消息] 发送成功 | status={}, msgId={}, keys={}",sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);}// ==================== 3. 单向消息(不关心结果)====================/*** 发送单向消息(不关心结果)* @param topic 待发送的 topic* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/public void sendOneWayMessage(String topic, Object message, String keys) {sendOneWayMessage(topic, null, message, keys);}/*** 发送单向消息(不关心结果)* @param topic 待发送的 topic* @param tag 待发送的 tag* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/public void sendOneWayMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWay(destination, msg);log.info("[单向消息] 已发送 | keys={}", keys);}// ==================== 4. 延时消息(Delay Level 1~18)====================/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)不带 Tag** <p>⚠️ 注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------* 1 | 1 秒* 2 | 5 秒* 3 | 10 秒* 4 | 30 秒* 5 | 1 分钟* 6 | 2 分钟* 7 | 3 分钟* 8 | 4 分钟* 9 | 5 分钟* 10 | 6 分钟* 11 | 7 分钟* 12 | 8 分钟* 13 | 9 分钟* 14 | 10 分钟* 15 | 20 分钟* 16 | 30 分钟* 17 | 1 小时* 18 | 2 小时* </pre>** @param topic 消息主题(必须)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException 当发送过程中发生异常*/public SendResult sendDelayMessageByDelayLevel(String topic, Object message, String keys, int delayLevel) {return sendDelayMessageByDelayLevel(topic, null, message, keys, delayLevel);}/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)** <p>⚠️注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------* 1 | 1 秒* 2 | 5 秒* 3 | 10 秒* 4 | 30 秒* 5 | 1 分钟* 6 | 2 分钟* 7 | 3 分钟* 8 | 4 分钟* 9 | 5 分钟* 10 | 6 分钟* 11 | 7 分钟* 12 | 8 分钟* 13 | 9 分钟* 14 | 10 分钟* 15 | 20 分钟* 16 | 30 分钟* 17 | 1 小时* 18 | 2 小时* </pre>** @param topic 消息主题(必须)* @param tag 消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException 当发送过程中发生异常*/public SendResult sendDelayMessageByDelayLevel(String topic, String tag, Object message, String keys, int delayLevel) {if (delayLevel < 1 || delayLevel > 18) {throw new IllegalArgumentException("RocketMQ delayLevel must be between 1 and 18");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L, delayLevel);log.info("[延时消息] 发送成功 | delayLevel={}, status={}, msgId={}, keys={}",delayLevel, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败 | delayLevel={}, keys={}", delayLevel, keys, e);throw e;}}/*** 发送延时消息(指定延迟秒数)** @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键* @param delaySeconds 延迟秒数(必须 > 0)* @return SendResult*/public SendResult sendDelayMessageBySeconds(String topic, Object message, String keys, long delaySeconds) {return sendDelayMessageBySeconds(topic, null, message, keys, delaySeconds);}/*** 发送延时消息(指定延迟秒数 + Tag)** @param topic 消息主题* @param tag 消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 消息唯一键* @param delaySeconds 延迟秒数(>0)* @return SendResult*/public SendResult sendDelayMessageBySeconds(String topic, String tag, Object message, String keys, long delaySeconds) {if (delaySeconds <= 0) {throw new IllegalArgumentException("delaySeconds must be greater than 0");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {// 使用 RocketMQ Spring 提供的秒级延迟 APISendResult result = rocketMQTemplate.syncSendDelayTimeSeconds(destination, msg, delaySeconds);log.info("[延时消息] 发送成功(按秒)| delaySeconds={}, status={}, msgId={}, keys={}",delaySeconds, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败(按秒)| delaySeconds={}, keys={}", delaySeconds, keys, e);throw e;}}// ==================== 5. 顺序消息(保证同一 shardingKey 有序)====================/*** 同步顺序消息* @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys 消息唯一键* @return SendResult*/public SendResult sendSyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {return sendSyncOrderlyMessage(topic, null, message, shardingKey, keys);}/*** 同步顺序消息(Tag)* @param topic 消息主题* @param tag 消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证* @param keys 消息唯一键* @return SendResult*/public SendResult sendSyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000L);log.info("[顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[顺序消息] 发送失败 | shardingKey={}, keys={}", shardingKey, keys, e);throw e;}}/*** 异步顺序消息(保证同一 shardingKey 有序)** @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys 消息唯一键*/public void sendAsyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {sendAsyncOrderlyMessage(topic, null, message, shardingKey, keys);}/*** 异步顺序消息(带 Tag)** @param topic 消息主题* @param tag 消息标签(可为 null)* @param message 发送的消息* @param shardingKey 顺序分片键* @param keys 消息唯一键*/public void sendAsyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSendOrderly(destination, msg, shardingKey, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步顺序消息] 发送失败 | shardingKey={}, keys={}, message={}",shardingKey, keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);}/*** 发送单向顺序消息** @param topic 消息主题* @param message 发送的消息内容(任意对象,内部会转为 JSON 字符串)* @param keys 消息的唯一业务键(用于消息追踪、去重、查询),建议使用 UUID 或业务 ID* @param shardingKey 顺序消息的分区键(也称 sharding key 或 ordering key)。* <p>* RocketMQ 会根据该值的哈希结果选择队列(queue),* 相同 hashKey 的消息会被发送到同一个队列,从而保证消费顺序。* <br>* 例如:订单 ID、用户 ID、流水号等需要保序的业务维度。* </p>*/public void sendOneWayOrderMessage(String topic, Object message, String keys, String shardingKey) {sendOneWayOrderMessage(topic, null, message, keys, shardingKey);}/*** 发送带 Tag 的单向顺序消息** @param topic 消息主题* @param tag 消息标签,用于消费者端过滤(可为 null)* @param message 消息体* @param keys 消息唯一键(用于运维追踪)* @param shardingKey 顺序分片键(sharding key):* <ul>* <li>相同 hashKey 的消息会路由到同一 MessageQueue</li>* <li>从而保证 FIFO 顺序消费</li>* <li>不同 hashKey 之间不保证顺序</li>* </ul>* 示例:订单系统中可用 orderId 作为 hashKey,确保同一订单的操作按序处理。*/public void sendOneWayOrderMessage(String topic, String tag, Object message, String keys, String shardingKey) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWayOrderly(destination, msg, shardingKey);log.info("[单向顺序消息] 已发送 | keys={}, hashKey={}", keys, shardingKey);}/*** 构建消息体* @param message 消息* @param keys 消息唯一键* @return GeneralMessageEntity*/private GeneralMessageEntity buildMessagePayload(Object message, String keys) {return GeneralMessageEntity.builder().body(JSONUtil.toJsonStr(message)).keys(keys).build();}}
6、消费者

AsyncMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 异步消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_ASYNC_TOPIC,consumerGroup = MULTIPLE_ASYNC_CONSUMER_GROUP
)
public class AsyncMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[AsyncMessageConsumer] 收到异步消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
DelayMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 延时消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_DELAY_TOPIC,consumerGroup = MULTIPLE_DELAY_CONSUMER_GROUP
)
public class DelayMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[DelayMessageConsumer] 收到延时消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
OneWayMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 单向消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_ONEWAY_TOPIC,consumerGroup = MULTIPLE_ONEWAY_CONSUMER_GROUP
)
public class OneWayMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[OneWayMessageConsumer] 收到单向消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
OneWayOrderlyMessageConsumer:
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import static com.bluefoxyu.constant.RocketMQConstant.MULTIPLE_ONEWAY_ORDERLY_TOPIC;/*** 单向顺序消息消费者* * 注意:* - 使用 CONSUME_FROM_LAST_OFFSET 避免消费历史消息(可根据需求调整)* - 顺序消息必须使用 MessageListenerOrderly,Spring 自动处理*/
@Slf4j
@Service
@RocketMQMessageListener(topic = MULTIPLE_ONEWAY_ORDERLY_TOPIC,selectorExpression = "*", // 接收该 Topic 下所有 Tag 的消息consumerGroup = "cg_multiple_oneway_orderly"
)
public class OneWayOrderlyMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {try {log.info("[单向顺序消息] 收到消息 | keys={}, body={}", message.getKeys(), message.getBody());// TODO: 在此处实现业务逻辑(如更新订单状态、写入 DB 等)// 注意:顺序消息要求消费逻辑不能抛异常(否则会阻塞后续消息),建议 try-catch} catch (Exception e) {log.error("[单向顺序消息] 消费异常 | keys={}", message.getKeys(), e);// 顺序消息中,异常会导致队列阻塞!务必谨慎处理// 可考虑记录失败日志 + 补偿机制,但不要让异常抛出}}
}
OrderlyMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 延时消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_ORDERLY_TOPIC,consumerGroup = MULTIPLE_ORDERLY_CONSUMER_GROUP
)
public class OrderlyMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[OrderlyMessageConsumer] 收到顺序消息 | keys={}, shardingKey={}, body={}",message.getKeys(), message.getKeys(), JSON.toJSONString(message.getBody()));// 注意:实际顺序由 RocketMQ 内部按 shardingKey 保证,消费者只需处理即可}
}
SyncMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 同步消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_SYNC_TOPIC,consumerGroup = MULTIPLE_SYNC_CONSUMER_GROUP
)
public class SyncMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[SyncMessageConsumer] 收到同步消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
7、接口测试类

import com.bluefoxyu.message.GeneralMessageEntity;
import com.bluefoxyu.producer.RocketMQMessageProducer;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.bluefoxyu.constant.RocketMQConstant.*;@RestController
@RequestMapping("/multiple")
@RequiredArgsConstructor
public class MultipleController {private final RocketMQMessageProducer generalMessageProducer;/*** 同步发送消息 需要等待broker返回结果*/@PostMapping("/sync")public String sendSyncMessage(@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Sync message from /multiple/sync, type=" + type);SendResult result = generalMessageProducer.sendSyncSimpleMessage(MULTIPLE_SYNC_TOPIC,MULTIPLE_TAG_SYNC,event,keys);return "Sync Send: " + result.getSendStatus().name() + " | msgId=" + result.getMsgId();}/*** 异步发送消息 不需要等待broker返回结果*/@PostMapping("/async")public String sendAsyncMessage(@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Async message from /multiple/async, type=" + type);generalMessageProducer.sendAsyncMessage(MULTIPLE_ASYNC_TOPIC,MULTIPLE_TAG_ASYNC,event,keys);return "Async message sent (non-blocking), keys=" + keys;}/*** 单向发送消息*/@PostMapping("/oneway")public String sendOneWayMessage(@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("One-way message from /multiple/oneway, type=" + type);generalMessageProducer.sendOneWayMessage(MULTIPLE_ONEWAY_TOPIC,MULTIPLE_TAG_ONEWAY,event,keys);return "One-way message fired, keys=" + keys;}/*** 延时发送消息*/@PostMapping("/delay")public String sendDelayMessage(@RequestParam(value = "delayLevel", defaultValue = "3") int delayLevel) {if (delayLevel < 1 || delayLevel > 18) {return "Invalid delayLevel, must be 1-18";}String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Delayed message (level=" + delayLevel + ") from /multiple/delay");SendResult result = generalMessageProducer.sendDelayMessageByDelayLevel(MULTIPLE_DELAY_TOPIC,MULTIPLE_TAG_DELAY,event,keys,delayLevel);return "Delay Send: " + result.getSendStatus().name() + " | level=" + delayLevel + " | msgId=" + result.getMsgId();}/*** 延时发送消息(按指定延迟秒数,例如 15 表示 15 秒后投递)** <p>⚠️ 注意:* <ul>* <li>底层仍受 RocketMQ 4.x 的 18 级延迟限制(最大 2 小时)</li>* <li>实际延迟时间可能映射到最接近的预定义级别(如 15 秒 → 可能为 10 秒或 30 秒)</li>* <li>若 delaySeconds 超出支持范围(>7200 秒),将抛出异常</li>* </ul>** @param delaySeconds 延迟秒数(必须 > 0)* @return 发送结果描述*/@PostMapping("/delay/seconds")public String sendDelayMessageBySeconds(@RequestParam("delaySeconds") long delaySeconds) {if (delaySeconds <= 0) {return "Invalid delaySeconds, must be greater than 0";}// 可选:限制最大值(避免传入过大值导致异常)if (delaySeconds > 7200) { // 2 小时 = 7200 秒return "delaySeconds too large, max allowed is 7200 seconds (2 hours)";}String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Delayed message (seconds=" + delaySeconds + ") from /multiple/delay/seconds");SendResult result = generalMessageProducer.sendDelayMessageBySeconds(MULTIPLE_DELAY_TOPIC,MULTIPLE_TAG_DELAY,event,keys,delaySeconds);return "Delay Send (by seconds): " + result.getSendStatus().name()+ " | delaySeconds=" + delaySeconds+ " | msgId=" + result.getMsgId();}/*** 顺序发送消息*/@PostMapping("/orderly")public String sendOrderlyMessage(@RequestParam("shardingKey") String shardingKey) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Orderly message with shardingKey=" + shardingKey);SendResult result = generalMessageProducer.sendSyncOrderlyMessage(MULTIPLE_ORDERLY_TOPIC,MULTIPLE_TAG_ORDERLY,event,shardingKey,keys);return "Orderly Send: " + result.getSendStatus().name() + " | shardingKey=" + shardingKey + " | msgId=" + result.getMsgId();}/*** 单向顺序消息(fire-and-forget + 保序)** <p>适用于对发送结果不关心、但要求同一 shardingKey 消息顺序处理的场景。* 示例:订单状态变更日志、用户行为流水等。** @param shardingKey 用于保证顺序的分片键(如 orderId)* @param type 消息类型标识* @return 提示信息*/@PostMapping("/oneway/orderly")public String sendOneWayOrderlyMessage(@RequestParam("shardingKey") String shardingKey,@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("One-way orderly message [shardingKey=" + shardingKey + ", type=" + type + "]");// 注意:这里传的是对象,内部会自动 JSON 序列化(见 buildMessagePayload)generalMessageProducer.sendOneWayOrderMessage(MULTIPLE_ONEWAY_ORDERLY_TOPIC, // 需在常量类中定义event,keys,shardingKey);return "One-way orderly message fired | shardingKey=" + shardingKey + " | keys=" + keys;}}
8、postman测试
这里主要测试代码是否可用和各个API性能对比:

同步发送消息(POST):http://localhost:8080/multiple/sync?type=test
异步发送消息(POST):http://localhost:8080/multiple/async?type=event
单向发送消息(POST):http://localhost:8080/multiple/oneway?type=ping
延时发送消息 指定延迟级别(POST):http://localhost:8080/multiple/delay?delayLevel=2
延时发送消息 指定秒(POST):http://localhost:8080/multiple/delay/seconds?delaySeconds=15
顺序同步发送消息(POST):http://localhost:8080/multiple/orderly?shardingKey=order_789

各性能对比:



对比可见:异步和单向的响应时间远远的小同步,不过也不能盲目使用异步和单向,需要根据具体实际业务进行。
