当前位置: 首页 > news >正文

RocketMQ生产者多种API实战使用

一、前言

这些都是我对RocketMQTemplate中的生产者API包了一层,更加方便我们平常开发使用,如果想了解关于RocketMQ比较基础的知识或者单点部署RocketMQ可以看我下面两篇文章:

https://blog.csdn.net/qq_73440769/article/details/143443049?spm=1001.2014.3001.5502

https://blog.csdn.net/qq_73440769/article/details/151049115?spm=1001.2014.3001.5502

二、干货合集

有以下这些类型:

  1. sendSyncSimpleMessage → 同步普通消息发送。
    📌 底层调用:rocketMQTemplate.syncSend(destination, message, timeout)

  2. sendAsyncMessage → 异步普通消息发送,通过回调处理结果。
    📌 底层调用:rocketMQTemplate.asyncSend(destination, message, callback, timeout)

  3. sendOneWayMessage → 单向消息发送,只发不等,性能最高但不可靠。
    📌 底层调用:rocketMQTemplate.sendOneWay(destination, message)

  4. sendDelayMessageByDelayLevel → 按 RocketMQ 预定义延迟级别(1~18)发送延时消息。
    📌 底层调用:rocketMQTemplate.syncSend(destination, message, timeout, delayLevel)

  5. sendDelayMessageBySeconds → 按指定秒数发送延时消息
    📌 底层调用:rocketMQTemplate.syncSendDelayTimeSeconds(destination, message, delaySeconds)

  6. sendSyncOrderlyMessage → 同步顺序消息,相同 shardingKey 保证 FIFO。
    📌 底层调用:rocketMQTemplate.syncSendOrderly(destination, message, hashKey, timeout)

  7. sendAsyncOrderlyMessage → 异步顺序消息,兼顾顺序性与吞吐量。
    📌 底层调用:rocketMQTemplate.asyncSendOrderly(destination, message, hashKey, callback, timeout)

  8. sendOneWayOrderMessage → 单向顺序消息(极少使用)。
    📌 底层调用:rocketMQTemplate.sendOneWayOrderly(destination, message, hashKey)

消息类型方法示例(简化)是否阻塞是否返回结果可靠性性能典型使用场景注意事项
同步普通消息sendSyncSimpleMessage✅ 是✅ 是支付成功通知、订单创建确认超时可能失败;避免高频调用
异步普通消息sendAsyncMessage❌ 否回调用户行为埋点、日志上报、非核心通知异常需在回调中处理;不阻塞主线程
单向消息sendOneWayMessage❌ 否❌ 否极高心跳检测、监控指标、可丢失日志绝不用于关键业务;消息可能静默丢失
延时消息(按级别)sendDelayMessageByDelayLevel✅ 是✅ 是订单30分钟未支付自动取消delayLevel 仅支持 1~18;时间固定
延时消息(按秒)sendDelayMessageBySeconds✅ 是✅ 是自定义延迟任务(如5分钟后提醒)按实际传的秒数
同步顺序消息sendSyncOrderlyMessage✅ 是✅ 是订单状态变更(创建→支付→发货)

相同 shardingKey才保序;不同的key并行

异步顺序消息sendAsyncOrderlyMessage❌ 否回调需要顺序且高吞吐的场景回调中处理异常;保证 key 一致性
单向顺序消息sendOneWayOrderMessage❌ 否❌ 否顺序但可容忍丢失的日志流极少使用;顺序+不可靠,矛盾场景

三、生产者API详讲

这里的所有code只是结合讲解,文章最后会有最完整的代码

1、RocketMQ 消息模型

  • Topic:消息主题,一类业务消息的逻辑分类(如 OrderTopic)。
  • Tag:子标签,用于消费者端过滤(如 TagATagB)。
  • Keys:消息唯一键(非强制),用于运维追踪、去重、按 key 查询。
  • ShardingKey / HashKey:顺序消息的分区键,决定消息发往哪个队列(Queue),相同 key 保证 FIFO。

2、同步普通消息(Sync Send)

✅ 方法:

sendSyncSimpleMessage(String topic, Object message, String keys)
sendSyncSimpleMessage(String topic, String tag, Object message, String keys)

🔧 原理:

  • 调用 rocketMQTemplate.syncSend()阻塞等待 Broker 返回结果(成功/失败)。
  • 默认超时 2 秒(可配置)。

🎯 使用场景:

  • 关键业务链路,必须确保消息发送成功才能继续(如支付成功后发通知)。
  • 需要获取 msgId 做日志追踪或落库。
  • 调用方能容忍短暂延迟(毫秒级)。

⚠️ 注意:

  • 不适合高并发场景(会阻塞线程)。
  • 若 Broker 不可用,会抛异常,需做好降级(如记录 DB + 定时重试)。

🖊️ code:

// 后续的方法也会用到这个
private GeneralMessageEntity buildMessagePayload(Object message, String keys) {return GeneralMessageEntity.builder().body(JSONUtil.toJsonStr(message)).keys(keys).build();
}
// ==================== 1. 同步普通消息 需要等待broker返回结果 ====================/*** <p>* description: 无Tag标签的发送同步普通消息* </p>** @param topic  发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys    发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult* @author: bluefoxyu* @date: 2025-01-08 09:22:38*/
public SendResult sendSyncSimpleMessage(String topic, Object message, String keys) {return sendSyncSimpleMessage(topic, null, message, keys);
}/*** <p>* description: 有Tag标签的发送同步普通消息* </p>** @param topic   发送消息的主题* @param tag     发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys    发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult 发送rocketmq的结果* @author: bluefoxyu* @date: 2025-01-08 09:20:39*/
public SendResult sendSyncSimpleMessage(String topic, String tag, Object message, String keys) {// 构造消息体GeneralMessageEntity payload = buildMessagePayload(message, keys);// 发送消息return this.doSendSync(topic, tag, keys, payload);
}/*** 发送普通消息** @param topic          消息发送主题,用于标识同一类业务逻辑的消息* @param tag            消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys           消息索引键,可根据关键字精确查找某条消息* @param generalMessage 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/
private SendResult doSendSync(String topic, String tag, String keys, GeneralMessageEntity generalMessage) {try {String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(generalMessage).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L);log.info("[同步消息] 发送成功 | status={}, msgId={}, keys={}",result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[同步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(generalMessage), e);throw e;}
}

3、异步普通消息(Async Send)

✅ 方法:

sendAsyncMessage(String topic, Object message, String keys)
sendAsyncMessage(String topic, String tag, Object message, String keys)

🔧 原理:

  • 调用 asyncSend()立即返回,不阻塞主线程
  • 结果通过 SendCallback 回调处理(成功/异常)。

🎯 使用场景:

  • 非核心链路,允许丢失少量消息(如用户行为埋点、日志上报)。
  • 高吞吐场景,避免同步阻塞影响主流程性能。
  • 需要异步处理发送结果(如失败告警、持久化重试)。

⚠️ 注意:

  • 回调在独立线程执行,注意线程安全。
  • 异常不会抛给调用方,需在 onException 中处理(如打日志、发告警)。

🖊️ code:

// ==================== 2. 异步普通消息 不需要等待broker返回结果 ====================/*** 异步普通消息 不需要等待broker返回结果** @param topic   发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys    发送的keys,关联每条消息的唯一标识*/
public void sendAsyncMessage(String topic, Object message, String keys) {sendAsyncMessage(topic, null, message, keys);
}/*** 异步普通消息** @param topic   发送消息的主题* @param tag     发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys    索引键,关联每条消息的唯一标识*/
public void sendAsyncMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSend(destination, msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步消息] 发送成功 | status={}, msgId={}, keys={}",sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);
}

4、单向消息(One-Way Send)

✅ 方法:

sendOneWayMessage(String topic, Object message, String keys)
sendOneWayMessage(String topic, String tag, Object message, String keys)

🔧 原理:

  • 调用 sendOneWay()只发不等,连回调都没有。
  • 性能最高,但完全不关心是否成功

🎯 使用场景:

  • 超高频、可丢失的消息(如心跳、监控指标)。
  • 对可靠性要求极低,只求“尽力而为”。
  • 系统资源紧张,不能承受任何额外开销。

⚠️ 注意:

  • 绝对不要用于关键业务!消息可能静默丢失。
  • 无法追踪 msgId,无法重试。

🖊️ code:

// ==================== 3. 单向消息(不关心结果)====================/*** 发送单向消息(不关心结果)** @param topic   待发送的 topic* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys    索引键,关联每条消息的唯一标识*/
public void sendOneWayMessage(String topic, Object message, String keys) {sendOneWayMessage(topic, null, message, keys);
}/*** 发送单向消息(不关心结果)** @param topic   待发送的 topic* @param tag     待发送的 tag* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys    索引键,关联每条消息的唯一标识*/
public void sendOneWayMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWay(destination, msg);log.info("[单向消息] 已发送 | keys={}", keys);
}

5、延时消息(Delay Message)

✅ 方法:

// 按预定义级别(1~18)
sendDelayMessageByDelayLevel(...)// 按秒数(更灵活)
sendDelayMessageBySeconds(...)

🔧 原理:

  • RocketMQ 内部有 18 个延时队列(SCHEDULE_TOPIC_XXXX),消息先投递到延时队列,到期后再转到真实 Topic。
  • syncSendDelayTimeSeconds() 是 Spring Boot Starter 提供的封装。

🎯 使用场景:

  • 订单超时取消(30 分钟未支付自动关闭)。
  • 任务延迟执行(如 5 分钟后提醒用户)。
  • 重试退避机制(第一次失败 10 秒后重试,第二次 1 分钟后...)。

⚠️ 注意:

  • 原生 RocketMQ 只支持固定 18 个级别,自定义秒数依赖客户端映射(可能不精确)。
  • 延时精度受 Broker 调度影响,不是实时(通常误差 ±1 秒)。
  • 不支持任意时间(如“明天上午 10 点”),需结合定时任务。

🖊️ code:

// ==================== 4. 延时消息(Delay Level 1~18)====================/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)不带 Tag** <p>⚠️ 注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------*   1   | 1 秒*   2   | 5 秒*   3   | 10 秒*   4   | 30 秒*   5   | 1 分钟*   6   | 2 分钟*   7   | 3 分钟*   8   | 4 分钟*   9   | 5 分钟*  10   | 6 分钟*  11   | 7 分钟*  12   | 8 分钟*  13   | 9 分钟*  14   | 10 分钟*  15   | 20 分钟*  16   | 30 分钟*  17   | 1 小时*  18   | 2 小时* </pre>** @param topic      消息主题(必须)* @param message    发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys       消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException         当发送过程中发生异常*/
public SendResult sendDelayMessageByDelayLevel(String topic, Object message, String keys, int delayLevel) {return sendDelayMessageByDelayLevel(topic, null, message, keys, delayLevel);
}/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)** <p>⚠️ 注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------*   1   | 1 秒*   2   | 5 秒*   3   | 10 秒*   4   | 30 秒*   5   | 1 分钟*   6   | 2 分钟*   7   | 3 分钟*   8   | 4 分钟*   9   | 5 分钟*  10   | 6 分钟*  11   | 7 分钟*  12   | 8 分钟*  13   | 9 分钟*  14   | 10 分钟*  15   | 20 分钟*  16   | 30 分钟*  17   | 1 小时*  18   | 2 小时* </pre>** @param topic      消息主题(必须)* @param tag        消息标签(可为 null)* @param message    发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys       消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException         当发送过程中发生异常*/
public SendResult sendDelayMessageByDelayLevel(String topic, String tag, Object message, String keys, int delayLevel) {if (delayLevel < 1 || delayLevel > 18) {throw new IllegalArgumentException("RocketMQ delayLevel must be between 1 and 18");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L, delayLevel);log.info("[延时消息] 发送成功 | delayLevel={}, status={}, msgId={}, keys={}",delayLevel, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败 | delayLevel={}, keys={}", delayLevel, keys, e);throw e;}
}/*** 发送延时消息(指定延迟秒数)** @param topic        消息主题* @param message      发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys         消息唯一键* @param delaySeconds 延迟秒数(必须 > 0)* @return SendResult*/
public SendResult sendDelayMessageBySeconds(String topic, Object message, String keys, long delaySeconds) {return sendDelayMessageBySeconds(topic, null, message, keys, delaySeconds);
}/*** 发送延时消息(指定延迟秒数 + Tag)** @param topic        消息主题* @param tag          消息标签(可为 null)* @param message      发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys         消息唯一键* @param delaySeconds 延迟秒数(>0)* @return SendResult*/
public SendResult sendDelayMessageBySeconds(String topic, String tag, Object message, String keys, long delaySeconds) {if (delaySeconds <= 0) {throw new IllegalArgumentException("delaySeconds must be greater than 0");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {// 使用 RocketMQ Spring 提供的秒级延迟 APISendResult result = rocketMQTemplate.syncSendDelayTimeSeconds(destination, msg, delaySeconds);log.info("[延时消息] 发送成功(按秒)| delaySeconds={}, status={}, msgId={}, keys={}",delaySeconds, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败(按秒)| delaySeconds={}, keys={}", delaySeconds, keys, e);throw e;}
}

6、顺序消息(Ordered Message)

✅ 方法:

// 同步
sendSyncOrderlyMessage(...)
// 异步
sendAsyncOrderlyMessage(...)
// 单向
sendOneWayOrderMessage(...)

🔧 原理:

  • 通过 shardingKey(也叫 hashKey)计算队列索引,相同 key 的消息发往同一 Queue
  • Consumer 端单线程消费该 Queue,从而保证 FIFO。

🎯 使用场景:

  • 强顺序业务
    • 订单状态变更:创建 → 支付 → 发货 → 完成
    • 账户余额变动:充值 → 消费 → 退款
    • 数据库 binlog 同步

⚠️ 注意:

  • 全局顺序 vs 局部顺序
    • 全局顺序:整个 Topic 只有一个 Queue(性能差,不推荐)。
    • 局部顺序:按业务 key 分区(如 orderId),不同订单并行,同订单串行(推荐)。
  • Producer 和 Consumer 都必须使用相同的 shardingKey
  • 若 Consumer 处理慢,会导致该 Queue 积压,影响其他 key 的消费(需监控)。

🖊️ code:

// ==================== 5. 顺序消息(保证同一 shardingKey 有序)====================/*** 同步顺序消息** @param topic        消息主题* @param message      发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey  顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys         消息唯一键* @return SendResult*/
public SendResult sendSyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {return sendSyncOrderlyMessage(topic, null, message, shardingKey, keys);
}/*** 同步顺序消息(Tag)** @param topic        消息主题* @param tag          消息标签(可为 null)* @param message      发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey  顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证* @param keys         消息唯一键* @return SendResult*/
public SendResult sendSyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000L);log.info("[顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[顺序消息] 发送失败 | shardingKey={}, keys={}", shardingKey, keys, e);throw e;}
}/*** 异步顺序消息(保证同一 shardingKey 有序)** @param topic        消息主题* @param message      发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey  顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys         消息唯一键*/
public void sendAsyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {sendAsyncOrderlyMessage(topic, null, message, shardingKey, keys);
}/*** 异步顺序消息(带 Tag)** @param topic        消息主题* @param tag          消息标签(可为 null)* @param message      发送的消息* @param shardingKey  顺序分片键* @param keys         消息唯一键*/
public void sendAsyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSendOrderly(destination, msg, shardingKey, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步顺序消息] 发送失败 | shardingKey={}, keys={}, message={}",shardingKey, keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);
}/*** 发送单向顺序消息** @param topic       消息主题* @param message     发送的消息内容(任意对象,内部会转为 JSON 字符串)* @param keys        消息的唯一业务键(用于消息追踪、去重、查询),建议使用 UUID 或业务 ID* @param shardingKey 顺序消息的分区键(也称 sharding key 或 ordering key)。*                    <p>*                    RocketMQ 会根据该值的哈希结果选择队列(queue),*                    相同 hashKey 的消息会被发送到同一个队列,从而保证消费顺序。*                    <br>*                    例如:订单 ID、用户 ID、流水号等需要保序的业务维度。*                    </p>*/
public void sendOneWayOrderMessage(String topic, Object message, String keys, String shardingKey) {sendOneWayOrderMessage(topic, null, message, keys, shardingKey);
}/*** 发送带 Tag 的单向顺序消息** @param topic       消息主题* @param tag         消息标签,用于消费者端过滤(可为 null)* @param message     消息体* @param keys        消息唯一键(用于运维追踪)* @param shardingKey 顺序分片键(sharding key):*                    <ul>*                      <li>相同 hashKey 的消息会路由到同一 MessageQueue</li>*                      <li>从而保证 FIFO 顺序消费</li>*                      <li>不同 hashKey 之间不保证顺序</li>*                    </ul>*                    示例:订单系统中可用 orderId 作为 hashKey,确保同一订单的操作按序处理。*/
public void sendOneWayOrderMessage(String topic, String tag, Object message, String keys, String shardingKey) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWayOrderly(destination, msg, shardingKey);log.info("[单向顺序消息] 已发送 | keys={}, hashKey={}", keys, shardingKey);
}
类型推荐使用谨慎使用
同步消息关键业务、需确认结果高频调用、性能敏感
异步消息主流程解耦、高吞吐需强一致性的场景
单向消息监控/心跳/日志任何重要业务
延时消息超时控制、延迟任务精确时间调度
顺序消息状态机、流水操作非顺序业务(浪费性能)

四、实战

大致结构如下:

1、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bluefoxyu</groupId><artifactId>RocketMQ-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.4</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.4</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.27</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.16</version></dependency></dependencies></project>

2、配置

application.yml:
server:port: 8080spring:profiles:active: devrocketmq:name-server: rocketmq所在服务器IP:9876producer:group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局发送者组定义send-message-timeout: 2000# 发送消息失败时的重试次数。设置为 1 表示如果发送失败,会再重试一次(总共尝试两次)。适用于同步发送消息失败时的重试次数。retry-times-when-send-failed: 1# 异步发送失败时的重试次数。设置为 1 表示在异步发送失败时会再尝试一次。适用于异步发送消息失败时的重试次数。retry-times-when-send-async-failed: 1logging:level:com:bluefoxyu:producer: infoconsumer: infocontroller: info
org.springframework.boot.autoconfigure.AutoConfiguration.imports:
# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3、统一消息实体

GeneralMessageEntity:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serial;
import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEntity implements Serializable {@Serialprivate static final long serialVersionUID = 1L;private String body;private String keys;}

4、常量类

RocketMQConstant:

/*** RocketMQ 常量类* @author bluefoxyu*/
public class RocketMQConstant {// =============== 【专用于 SimpleController 的常量】===============public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";public static final String MESSAGE_TOPIC_1 = "message_topic_1";public static final String MESSAGE_TOPIC_2 = "message_topic_2";public static final String MESSAGE_TAG_A = "message_tag_A";public static final String MESSAGE_TAG_B = "message_tag_B";public static final String MESSAGE_TAG_C = "message_tag_C";// =============== 【新增:专用于 MultipleController 的常量】===============// Topicspublic static final String MULTIPLE_SYNC_TOPIC       = "multiple_sync_topic";public static final String MULTIPLE_ASYNC_TOPIC      = "multiple_async_topic";public static final String MULTIPLE_ONEWAY_TOPIC     = "multiple_oneway_topic";public static final String MULTIPLE_DELAY_TOPIC      = "multiple_delay_topic";public static final String MULTIPLE_ORDERLY_TOPIC    = "multiple_orderly_topic";// Consumer Groupspublic static final String MULTIPLE_SYNC_CONSUMER_GROUP       = "multiple_sync_consumer_group";public static final String MULTIPLE_ASYNC_CONSUMER_GROUP      = "multiple_async_consumer_group";public static final String MULTIPLE_ONEWAY_CONSUMER_GROUP     = "multiple_oneway_consumer_group";public static final String MULTIPLE_DELAY_CONSUMER_GROUP      = "multiple_delay_consumer_group";public static final String MULTIPLE_ORDERLY_CONSUMER_GROUP    = "multiple_orderly_consumer_group";// Tags(可选,这里统一用 "*" 或简单 tag)public static final String MULTIPLE_TAG_SYNC       = "SYNC";public static final String MULTIPLE_TAG_ASYNC      = "ASYNC";public static final String MULTIPLE_TAG_ONEWAY     = "ONEWAY";public static final String MULTIPLE_TAG_DELAY      = "DELAY";public static final String MULTIPLE_TAG_ORDERLY    = "ORDERLY";// RocketMQConstant.javapublic static final String MULTIPLE_ONEWAY_ORDERLY_TOPIC = "MULTIPLE_ONEWAY_ORDERLY_TOPIC";public static final String MULTIPLE_TAG_ONEWAY_ORDERLY = "TAG_ONEWAY_ORDERLY";public static final String MESSAGE_TAG_ORDER = "ORDER";}

5、生产者API封装

RocketMQMessageProducer:

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 封装全体的消息生产者*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQMessageProducer {private final RocketMQTemplate rocketMQTemplate;// ==================== 1. 同步普通消息 需要等待broker返回结果 ====================/*** <p>* description: 无Tag标签的发送同步普通消息* </p>** @param topic 发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult* @author: bluefoxyu* @date: 2025-01-08 09:22:38*/public SendResult sendSyncSimpleMessage(String topic, Object message, String keys) {return sendSyncSimpleMessage(topic, null, message, keys);}/*** <p>* description: 有Tag标签的发送同步普通消息* </p>** @param topic 发送消息的主题* @param tag 发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识* @return: org.apache.rocketmq.client.producer.SendResult 发送rocketmq的结果* @author: bluefoxyu* @date: 2025-01-08 09:20:39*/public SendResult sendSyncSimpleMessage(String topic, String tag, Object message, String keys) {// 构造消息体GeneralMessageEntity payload = buildMessagePayload(message, keys);// 发送消息return this.doSendSync(topic, tag, keys, payload);}/*** 发送普通消息** @param topic            消息发送主题,用于标识同一类业务逻辑的消息* @param tag              消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys             消息索引键,可根据关键字精确查找某条消息* @param generalMessage   普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/private SendResult doSendSync(String topic, String tag, String keys, GeneralMessageEntity generalMessage) {try {String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(generalMessage).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L);log.info("[同步消息] 发送成功 | status={}, msgId={}, keys={}",result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[同步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(generalMessage), e);throw e;}}// ==================== 2. 异步普通消息 不需要等待broker返回结果 ====================/*** 异步普通消息 不需要等待broker返回结果* @param topic 发送消息的主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 发送的keys,关联每条消息的唯一标识*/public void sendAsyncMessage(String topic, Object message, String keys) {sendAsyncMessage(topic, null, message, keys);}/*** 异步普通消息* @param topic 发送消息的主题* @param tag 发送消息的标签* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/public void sendAsyncMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSend(destination, msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步消息] 发送成功 | status={}, msgId={}, keys={}",sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步消息] 发送失败 | keys={}, message={}", keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);}// ==================== 3. 单向消息(不关心结果)====================/*** 发送单向消息(不关心结果)* @param topic 待发送的 topic* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/public void sendOneWayMessage(String topic, Object message, String keys) {sendOneWayMessage(topic, null, message, keys);}/*** 发送单向消息(不关心结果)* @param topic 待发送的 topic* @param tag 待发送的 tag* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys 索引键,关联每条消息的唯一标识*/public void sendOneWayMessage(String topic, String tag, Object message, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWay(destination, msg);log.info("[单向消息] 已发送 | keys={}", keys);}// ==================== 4. 延时消息(Delay Level 1~18)====================/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)不带 Tag** <p>⚠️ 注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------*   1   | 1 秒*   2   | 5 秒*   3   | 10 秒*   4   | 30 秒*   5   | 1 分钟*   6   | 2 分钟*   7   | 3 分钟*   8   | 4 分钟*   9   | 5 分钟*  10   | 6 分钟*  11   | 7 分钟*  12   | 8 分钟*  13   | 9 分钟*  14   | 10 分钟*  15   | 20 分钟*  16   | 30 分钟*  17   | 1 小时*  18   | 2 小时* </pre>** @param topic      消息主题(必须)* @param message    发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys       消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException         当发送过程中发生异常*/public SendResult sendDelayMessageByDelayLevel(String topic, Object message, String keys, int delayLevel) {return sendDelayMessageByDelayLevel(topic, null, message, keys, delayLevel);}/*** 发送 RocketMQ 延时消息(支持 delayLevel 1~18)** <p>⚠️注意:RocketMQ 的延时消息仅支持预定义的 18 个延迟级别,对应如下默认时间:* <pre>* Level | 延迟时间* ------|----------*   1   | 1 秒*   2   | 5 秒*   3   | 10 秒*   4   | 30 秒*   5   | 1 分钟*   6   | 2 分钟*   7   | 3 分钟*   8   | 4 分钟*   9   | 5 分钟*  10   | 6 分钟*  11   | 7 分钟*  12   | 8 分钟*  13   | 9 分钟*  14   | 10 分钟*  15   | 20 分钟*  16   | 30 分钟*  17   | 1 小时*  18   | 2 小时* </pre>** @param topic      消息主题(必须)* @param tag        消息标签(可为 null)* @param message    发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys       消息唯一键(用于去重、查询等,建议使用 UUID)* @param delayLevel 延迟级别,取值范围 [1, 18]* @return SendResult 发送结果* @throws IllegalArgumentException 当 delayLevel 不在 [1,18] 范围内* @throws RuntimeException         当发送过程中发生异常*/public SendResult sendDelayMessageByDelayLevel(String topic, String tag, Object message, String keys, int delayLevel) {if (delayLevel < 1 || delayLevel > 18) {throw new IllegalArgumentException("RocketMQ delayLevel must be between 1 and 18");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSend(destination, msg, 2000L, delayLevel);log.info("[延时消息] 发送成功 | delayLevel={}, status={}, msgId={}, keys={}",delayLevel, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败 | delayLevel={}, keys={}", delayLevel, keys, e);throw e;}}/*** 发送延时消息(指定延迟秒数)** @param topic       消息主题* @param message     发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys        消息唯一键* @param delaySeconds 延迟秒数(必须 > 0)* @return SendResult*/public SendResult sendDelayMessageBySeconds(String topic, Object message, String keys, long delaySeconds) {return sendDelayMessageBySeconds(topic, null, message, keys, delaySeconds);}/*** 发送延时消息(指定延迟秒数 + Tag)** @param topic        消息主题* @param tag          消息标签(可为 null)* @param message      发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param keys         消息唯一键* @param delaySeconds 延迟秒数(>0)* @return SendResult*/public SendResult sendDelayMessageBySeconds(String topic, String tag, Object message, String keys, long delaySeconds) {if (delaySeconds <= 0) {throw new IllegalArgumentException("delaySeconds must be greater than 0");}GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {// 使用 RocketMQ Spring 提供的秒级延迟 APISendResult result = rocketMQTemplate.syncSendDelayTimeSeconds(destination, msg, delaySeconds);log.info("[延时消息] 发送成功(按秒)| delaySeconds={}, status={}, msgId={}, keys={}",delaySeconds, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[延时消息] 发送失败(按秒)| delaySeconds={}, keys={}", delaySeconds, keys, e);throw e;}}// ==================== 5. 顺序消息(保证同一 shardingKey 有序)====================/*** 同步顺序消息* @param topic 消息主题* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys 消息唯一键* @return SendResult*/public SendResult sendSyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {return sendSyncOrderlyMessage(topic, null, message, shardingKey, keys);}/*** 同步顺序消息(Tag)* @param topic 消息主题* @param tag   消息标签(可为 null)* @param message 发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证* @param keys  消息唯一键* @return SendResult*/public SendResult sendSyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();try {SendResult result = rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000L);log.info("[顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, result.getSendStatus(), result.getMsgId(), keys);return result;} catch (Exception e) {log.error("[顺序消息] 发送失败 | shardingKey={}, keys={}", shardingKey, keys, e);throw e;}}/*** 异步顺序消息(保证同一 shardingKey 有序)** @param topic       消息主题* @param message     发送的消息(传随意格式,在producer再进行转json,注意消费者要json转实体)* @param shardingKey 顺序消息的 key,用于区分不同顺序,相同 shardingKey 的消息会保证有序* @param keys        消息唯一键*/public void sendAsyncOrderlyMessage(String topic, Object message, String shardingKey, String keys) {sendAsyncOrderlyMessage(topic, null, message, shardingKey, keys);}/*** 异步顺序消息(带 Tag)** @param topic       消息主题* @param tag         消息标签(可为 null)* @param message     发送的消息* @param shardingKey 顺序分片键* @param keys        消息唯一键*/public void sendAsyncOrderlyMessage(String topic, String tag, Object message, String shardingKey, String keys) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.asyncSendOrderly(destination, msg, shardingKey, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("[异步顺序消息] 发送成功 | shardingKey={}, status={}, msgId={}, keys={}",shardingKey, sendResult.getSendStatus(), sendResult.getMsgId(), keys);}@Overridepublic void onException(Throwable throwable) {log.error("[异步顺序消息] 发送失败 | shardingKey={}, keys={}, message={}",shardingKey, keys, JSON.toJSONString(payload), throwable);// TODO: 可接入告警或持久化失败消息}}, 2000L);}/*** 发送单向顺序消息** @param topic   消息主题* @param message 发送的消息内容(任意对象,内部会转为 JSON 字符串)* @param keys    消息的唯一业务键(用于消息追踪、去重、查询),建议使用 UUID 或业务 ID* @param shardingKey 顺序消息的分区键(也称 sharding key 或 ordering key)。*                <p>*                RocketMQ 会根据该值的哈希结果选择队列(queue),*                相同 hashKey 的消息会被发送到同一个队列,从而保证消费顺序。*                <br>*                例如:订单 ID、用户 ID、流水号等需要保序的业务维度。*                </p>*/public void sendOneWayOrderMessage(String topic, Object message, String keys, String shardingKey) {sendOneWayOrderMessage(topic, null, message, keys, shardingKey);}/*** 发送带 Tag 的单向顺序消息** @param topic   消息主题* @param tag     消息标签,用于消费者端过滤(可为 null)* @param message 消息体* @param keys    消息唯一键(用于运维追踪)* @param shardingKey 顺序分片键(sharding key):*                <ul>*                  <li>相同 hashKey 的消息会路由到同一 MessageQueue</li>*                  <li>从而保证 FIFO 顺序消费</li>*                  <li>不同 hashKey 之间不保证顺序</li>*                </ul>*                示例:订单系统中可用 orderId 作为 hashKey,确保同一订单的操作按序处理。*/public void sendOneWayOrderMessage(String topic, String tag, Object message, String keys, String shardingKey) {GeneralMessageEntity payload = buildMessagePayload(message, keys);String destination = StrUtil.isBlank(tag) ? topic : topic + ":" + tag;Message<GeneralMessageEntity> msg = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();rocketMQTemplate.sendOneWayOrderly(destination, msg, shardingKey);log.info("[单向顺序消息] 已发送 | keys={}, hashKey={}", keys, shardingKey);}/*** 构建消息体* @param message 消息* @param keys 消息唯一键* @return GeneralMessageEntity*/private GeneralMessageEntity buildMessagePayload(Object message, String keys) {return GeneralMessageEntity.builder().body(JSONUtil.toJsonStr(message)).keys(keys).build();}}

6、消费者

AsyncMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 异步消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_ASYNC_TOPIC,consumerGroup = MULTIPLE_ASYNC_CONSUMER_GROUP
)
public class AsyncMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[AsyncMessageConsumer] 收到异步消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
DelayMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 延时消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_DELAY_TOPIC,consumerGroup = MULTIPLE_DELAY_CONSUMER_GROUP
)
public class DelayMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[DelayMessageConsumer] 收到延时消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
OneWayMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 单向消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_ONEWAY_TOPIC,consumerGroup = MULTIPLE_ONEWAY_CONSUMER_GROUP
)
public class OneWayMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[OneWayMessageConsumer] 收到单向消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}
OneWayOrderlyMessageConsumer:
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import static com.bluefoxyu.constant.RocketMQConstant.MULTIPLE_ONEWAY_ORDERLY_TOPIC;/*** 单向顺序消息消费者* * 注意:* - 使用 CONSUME_FROM_LAST_OFFSET 避免消费历史消息(可根据需求调整)* - 顺序消息必须使用 MessageListenerOrderly,Spring 自动处理*/
@Slf4j
@Service
@RocketMQMessageListener(topic = MULTIPLE_ONEWAY_ORDERLY_TOPIC,selectorExpression = "*",  // 接收该 Topic 下所有 Tag 的消息consumerGroup = "cg_multiple_oneway_orderly"
)
public class OneWayOrderlyMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {try {log.info("[单向顺序消息] 收到消息 | keys={}, body={}", message.getKeys(), message.getBody());// TODO: 在此处实现业务逻辑(如更新订单状态、写入 DB 等)// 注意:顺序消息要求消费逻辑不能抛异常(否则会阻塞后续消息),建议 try-catch} catch (Exception e) {log.error("[单向顺序消息] 消费异常 | keys={}", message.getKeys(), e);// 顺序消息中,异常会导致队列阻塞!务必谨慎处理// 可考虑记录失败日志 + 补偿机制,但不要让异常抛出}}
}
OrderlyMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 延时消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_ORDERLY_TOPIC,consumerGroup = MULTIPLE_ORDERLY_CONSUMER_GROUP
)
public class OrderlyMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[OrderlyMessageConsumer] 收到顺序消息 | keys={}, shardingKey={}, body={}",message.getKeys(), message.getKeys(), JSON.toJSONString(message.getBody()));// 注意:实际顺序由 RocketMQ 内部按 shardingKey 保证,消费者只需处理即可}
}
SyncMessageConsumer:
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** 同步消息消费者*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MULTIPLE_SYNC_TOPIC,consumerGroup = MULTIPLE_SYNC_CONSUMER_GROUP
)
public class SyncMessageConsumer implements RocketMQListener<GeneralMessageEntity> {@Overridepublic void onMessage(GeneralMessageEntity message) {log.info("[SyncMessageConsumer] 收到同步消息 | keys={}, body={}",message.getKeys(), JSON.toJSONString(message.getBody()));}
}

7、接口测试类

import com.bluefoxyu.message.GeneralMessageEntity;
import com.bluefoxyu.producer.RocketMQMessageProducer;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.bluefoxyu.constant.RocketMQConstant.*;@RestController
@RequestMapping("/multiple")
@RequiredArgsConstructor
public class MultipleController {private final RocketMQMessageProducer generalMessageProducer;/*** 同步发送消息 需要等待broker返回结果*/@PostMapping("/sync")public String sendSyncMessage(@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Sync message from /multiple/sync, type=" + type);SendResult result = generalMessageProducer.sendSyncSimpleMessage(MULTIPLE_SYNC_TOPIC,MULTIPLE_TAG_SYNC,event,keys);return "Sync Send: " + result.getSendStatus().name() + " | msgId=" + result.getMsgId();}/*** 异步发送消息 不需要等待broker返回结果*/@PostMapping("/async")public String sendAsyncMessage(@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Async message from /multiple/async, type=" + type);generalMessageProducer.sendAsyncMessage(MULTIPLE_ASYNC_TOPIC,MULTIPLE_TAG_ASYNC,event,keys);return "Async message sent (non-blocking), keys=" + keys;}/*** 单向发送消息*/@PostMapping("/oneway")public String sendOneWayMessage(@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("One-way message from /multiple/oneway, type=" + type);generalMessageProducer.sendOneWayMessage(MULTIPLE_ONEWAY_TOPIC,MULTIPLE_TAG_ONEWAY,event,keys);return "One-way message fired, keys=" + keys;}/*** 延时发送消息*/@PostMapping("/delay")public String sendDelayMessage(@RequestParam(value = "delayLevel", defaultValue = "3") int delayLevel) {if (delayLevel < 1 || delayLevel > 18) {return "Invalid delayLevel, must be 1-18";}String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Delayed message (level=" + delayLevel + ") from /multiple/delay");SendResult result = generalMessageProducer.sendDelayMessageByDelayLevel(MULTIPLE_DELAY_TOPIC,MULTIPLE_TAG_DELAY,event,keys,delayLevel);return "Delay Send: " + result.getSendStatus().name() + " | level=" + delayLevel + " | msgId=" + result.getMsgId();}/*** 延时发送消息(按指定延迟秒数,例如 15 表示 15 秒后投递)** <p>⚠️ 注意:* <ul>*   <li>底层仍受 RocketMQ 4.x 的 18 级延迟限制(最大 2 小时)</li>*   <li>实际延迟时间可能映射到最接近的预定义级别(如 15 秒 → 可能为 10 秒或 30 秒)</li>*   <li>若 delaySeconds 超出支持范围(>7200 秒),将抛出异常</li>* </ul>** @param delaySeconds 延迟秒数(必须 > 0)* @return 发送结果描述*/@PostMapping("/delay/seconds")public String sendDelayMessageBySeconds(@RequestParam("delaySeconds") long delaySeconds) {if (delaySeconds <= 0) {return "Invalid delaySeconds, must be greater than 0";}// 可选:限制最大值(避免传入过大值导致异常)if (delaySeconds > 7200) { // 2 小时 = 7200 秒return "delaySeconds too large, max allowed is 7200 seconds (2 hours)";}String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Delayed message (seconds=" + delaySeconds + ") from /multiple/delay/seconds");SendResult result = generalMessageProducer.sendDelayMessageBySeconds(MULTIPLE_DELAY_TOPIC,MULTIPLE_TAG_DELAY,event,keys,delaySeconds);return "Delay Send (by seconds): " + result.getSendStatus().name()+ " | delaySeconds=" + delaySeconds+ " | msgId=" + result.getMsgId();}/*** 顺序发送消息*/@PostMapping("/orderly")public String sendOrderlyMessage(@RequestParam("shardingKey") String shardingKey) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("Orderly message with shardingKey=" + shardingKey);SendResult result = generalMessageProducer.sendSyncOrderlyMessage(MULTIPLE_ORDERLY_TOPIC,MULTIPLE_TAG_ORDERLY,event,shardingKey,keys);return "Orderly Send: " + result.getSendStatus().name() + " | shardingKey=" + shardingKey + " | msgId=" + result.getMsgId();}/*** 单向顺序消息(fire-and-forget + 保序)** <p>适用于对发送结果不关心、但要求同一 shardingKey 消息顺序处理的场景。* 示例:订单状态变更日志、用户行为流水等。** @param shardingKey 用于保证顺序的分片键(如 orderId)* @param type        消息类型标识* @return 提示信息*/@PostMapping("/oneway/orderly")public String sendOneWayOrderlyMessage(@RequestParam("shardingKey") String shardingKey,@RequestParam(value = "type", defaultValue = "default") String type) {String keys = UUID.randomUUID().toString();GeneralMessageEntity event = new GeneralMessageEntity();event.setKeys(keys);event.setBody("One-way orderly message [shardingKey=" + shardingKey + ", type=" + type + "]");// 注意:这里传的是对象,内部会自动 JSON 序列化(见 buildMessagePayload)generalMessageProducer.sendOneWayOrderMessage(MULTIPLE_ONEWAY_ORDERLY_TOPIC,  // 需在常量类中定义event,keys,shardingKey);return "One-way orderly message fired | shardingKey=" + shardingKey + " | keys=" + keys;}}

8、postman测试

这里主要测试代码是否可用和各个API性能对比:

同步发送消息(POST):http://localhost:8080/multiple/sync?type=test

异步发送消息(POST):http://localhost:8080/multiple/async?type=event

单向发送消息(POST):http://localhost:8080/multiple/oneway?type=ping

延时发送消息 指定延迟级别(POST):http://localhost:8080/multiple/delay?delayLevel=2

延时发送消息 指定秒(POST):http://localhost:8080/multiple/delay/seconds?delaySeconds=15

顺序同步发送消息(POST):http://localhost:8080/multiple/orderly?shardingKey=order_789

各性能对比:

对比可见:异步和单向的响应时间远远的小同步,不过也不能盲目使用异步和单向,需要根据具体实际业务进行。

http://www.dtcms.com/a/617693.html

相关文章:

  • UI设计公司审美积累|办公类软件界面设计巧思,效率与视觉的双重升级
  • 力扣1513——仅含 1 的子串数
  • Kali Linux 中对某(靶机)监控设备进行漏洞验证的完整流程(卧室监控学习)
  • 将LabelMe工具目标检测标注生成的json文件转换成COCO json格式
  • 什么是求解器?
  • 课后作业-2025年11月16号作业
  • C#面试题及详细答案120道(116-120)-- 综合应用
  • 【报错解决】宝塔nginx404
  • 生信数据分析流程自动化:Snakemake实战全攻略
  • 网站建设什么专业重庆品牌餐饮加盟网站建设
  • 数据库 搭建 网站泉州手机网站建设价格
  • 小米电脑管家 V5.2.0.207 新版分享,镜像链接更稳定,AI自动亮度上线,分布式文件开放使用
  • 深入理解 Vue 3 中的计算属性与侦听器:联系、区别及与函数的对比
  • 2.FPGA板卡通过电脑映射连接上网
  • RTCP包之SR和RR
  • 40 token
  • 如何在 Celestia 区块链上构建验证者节点的详细手册
  • Linux权限知识点
  • MySQL: 数据库读写分离与负载均衡的实现方式及深度分析
  • 红帽企业Linux:企业级开源操作系统领航者
  • 怎么做网站开发建一个电商平台多少钱
  • 人工智能技术- 语音语言- 05 GPT-4o 自然人机对话
  • HarmonyOS实用指南:harmonyos + 华为
  • 什么是Spring Boot 应用开发?
  • uniapp实现android/IOS消息推送
  • 汽车网站开发流程html5 网站开发软件
  • HarmonyOS:harmonyos从入门到落地
  • OpenCV(二十九):高通滤波-索贝尔算子
  • 幽冥大陆(二十一)go语言智慧农业电子秤读取——东方仙盟炼气期
  • 北京网站建设需要花多少钱视觉冲击力的网站设计