Java 设计模式——观察者模式进阶:分布式场景扩展与实战配置
Java 设计模式——观察者模式进阶:分布式场景扩展与实战配置
前文已覆盖单体应用中观察者模式的核心写法与应用,但在实际项目中,随着业务发展,往往会从 “单体” 走向 “分布式”—— 此时传统的本地观察者模式已无法满足 “跨服务通知” 的需求。本节将补充分布式场景下的观察者模式扩展方案,以及单体应用中观察者模式的完整运行配置,让你不仅能在本地用,更能在分布式架构中落地。
文章目录
- Java 设计模式——观察者模式进阶:分布式场景扩展与实战配置
- 一、分布式场景:观察者模式的扩展方案
- 1. 基于消息队列(MQ)的分布式观察者模式
- 核心原理
- 实战代码(基于 RocketMQ + SpringBoot)
- (1)依赖引入(pom.xml)
- (2)配置文件(application.yml)
- (3)被观察者(订单服务:发布消息)
- (4)观察者 1(库存服务:订阅消息)
- (5)观察者 2(账户服务:订阅消息)
- 核心优势与注意事项
- 2. 基于 Spring Cloud Stream 的分布式观察者模式
- 核心原理
- 实战代码(简化版)
- (1)依赖引入(pom.xml)
- (2)配置文件(application.yml)
- (3)定义通道接口(公共模块)
- (4)被观察者(订单服务:发布消息)
- (5)观察者(库存服务:订阅消息)
- 核心优势
- 二、分布式场景:事务一致性解决方案
- 1. 基于MQ事务消息(强一致性)
- 核心原理
- 实战代码(RocketMQ事务消息)
- (1)订单服务:事务消息发送
- (2)订单服务:事务监听器
- (3)库存服务:订阅事务消息
- 2. 基于最终一致性(柔性事务)
- 核心代码(定时补偿任务)
- 方案 1:全量确认(所有消费者处理完成才标记)
- 方案 2:独立确认(每个消费者单独标记,补偿时按类型重试)
- 两种方案对比与选择
- 三、观察者模式性能优化技巧
- 1. 单体应用性能优化
- (1)异步执行观察者
- (2)观察者分组执行
- 2. 分布式应用性能优化
- (1)消息批量处理
- (2)消息过滤
- 四、总结:观察者模式的全场景应用图谱
一、分布式场景:观察者模式的扩展方案
分布式环境下,被观察者与观察者可能部署在不同服务节点(如订单服务取消订单后,需通知库存服务、账户服务、日志服务,且这些服务可能是独立部署的),此时需借助 “中间件” 实现跨服务的 “发布 - 订阅”,本质是观察者模式的 “远程化”。
1. 基于消息队列(MQ)的分布式观察者模式
核心原理
用 MQ 的 “主题(Topic)” 机制替代本地观察者列表:
-
被观察者(发布者):订单服务取消订单后,向 MQ 的
order.cancel
主题发送消息(相当于 “发布事件”); -
观察者(订阅者):库存服务、账户服务订阅
order.cancel
主题,接收消息后执行本地业务(相当于 “接收通知”); -
中间件角色:MQ(如 RocketMQ、Kafka)负责消息的存储、转发,确保跨服务通知的可靠性(如重试、死信队列)。
实战代码(基于 RocketMQ + SpringBoot)
(1)依赖引入(pom.xml)
<!-- RocketMQ SpringBoot Starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
(2)配置文件(application.yml)
rocketmq:name-server: 127.0.0.1:9876 # MQ服务地址producer:group: order-producer-group # 生产者组(订单服务作为生产者)consumer:group: storage-consumer-group # 消费者组(库存服务作为消费者,每个服务独立组)
(3)被观察者(订单服务:发布消息)
package com.boke.order.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 取消订单:发布消息到MQ的order.cancel主题public void cancelOrder(CancelOrderBO cancelOrderBO) {System.out.printf("订单服务:处理订单[%s]取消,发布消息到MQ%n", cancelOrderBO.getOrderNo());// 发送消息(主题:order.cancel,消息体:cancelOrderBO)rocketMQTemplate.convertAndSend("order.cancel", cancelOrderBO);}
}
(4)观察者 1(库存服务:订阅消息)
package com.boke.storage.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
// 订阅order.cancel主题,消费者组为storage-consumer-group
@RocketMQMessageListener(topic = "order.cancel", consumerGroup = "storage-consumer-group")
public class StorageConsumer implements RocketMQListener<CancelOrderBO> {// 接收消息:执行库存恢复逻辑@Overridepublic void onMessage(CancelOrderBO cancelOrderBO) {System.out.printf("库存服务:收到订单[%s]取消消息,恢复商品库存%n", cancelOrderBO.getOrderNo());// ps 分布式锁:防止并发处理同一订单(可选,视并发量而定),记得最后释放锁
// RLock lock = redissonClient.getLock("storage:confirm:" + orderNo + ":" + consumerType);// 1. 幂等性校验:查询订单是否已处理过库存
// boolean hasProcessed = storageDAO.checkOrderStockProcessed(cancelOrderBO.getOrderNo());
// if (hasProcessed) {
// System.out.printf("库存服务:订单[%s]已处理过库存,跳过%n", cancelOrderBO.getOrderNo());
// return;
// }// 2. 实际业务:调用库存DAO修改库存// storageDAO.increaseStock(cancelOrderBO.getProductId(), cancelOrderBO.getQuantity());// 3. 更新自身确认状态(标记库存服务已完成)
// confirmDAO.updateConsumerStatus(bo.getOrderNo(), "STORAGE", "COMPLETED");
//
// // 4. 检查是否所有消费者都已完成
// if (confirmDAO.isAllConfirmed(bo.getOrderNo())) {
// // 全量完成,更新订单整体状态
// orderFeign.updateOrderNotifyStatus(bo.getOrderNo(), "NOTIFIED");
// }}
}
(5)观察者 2(账户服务:订阅消息)
package com.boke.account.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "order.cancel", consumerGroup = "account-consumer-group")
public class AccountConsumer implements RocketMQListener<CancelOrderBO> {@Overridepublic void onMessage(CancelOrderBO cancelOrderBO) {System.out.printf("账户服务:收到订单[%s]取消消息,给用户[%s]退款[%s]元%n", cancelOrderBO.getOrderNo(), cancelOrderBO.getUserId(), cancelOrderBO.getAmount());// ps 分布式锁:防止并发处理同一订单(可选,视并发量而定),记得最后释放锁
// RLock lock = redissonClient.getLock("account:confirm:" + orderNo + ":" + consumerType);// 1. 幂等性校验:查询订单是否已处理过退款
// boolean hasProcessed = accountDAO.checkOrderRefundProcessed(cancelOrderBO.getOrderNo());
// if (hasProcessed) {
// System.out.printf("账户服务:订单[%s]已处理过退款,跳过%n", cancelOrderBO.getOrderNo());
// return;
// }// 2. 实际业务:调用账户DAO退款// accountDAO.refund(cancelOrderBO.getUserId(), cancelOrderBO.getAmount());// 3. 更新自身确认状态(标记退款服务已完成)
// confirmDAO.updateRefundStatus(bo.getOrderNo(), "REFUND", "COMPLETED");
//
// // 4. 检查是否所有消费者都已完成
// if (confirmDAO.isAllConfirmed(bo.getOrderNo())) {
// // 全量完成,更新订单整体状态
// orderFeign.updateOrderNotifyStatus(bo.getOrderNo(), "NOTIFIED");
// }}
}
核心优势与注意事项
-
优势:解耦彻底(服务间无直接依赖)、支持高并发(MQ 削峰填谷)、可靠性高(支持消息重试、死信队列);
-
注意事项:
消息幂等性:需确保观察者重复接收消息时,业务逻辑不会重复执行(如给订单加 “已处理” 标记,接收消息时先校验);
消息顺序:若需观察者按顺序执行(如先退款后恢复库存),需使用 MQ 的 “顺序消息” 功能(如 RocketMQ 的分区顺序);
事务一致性:若需保证 “订单取消” 与 “观察者执行” 的事务一致性(如订单取消失败,观察者不能执行),需使用 “事务消息”(如 RocketMQ 的事务消息机制)。
2. 基于 Spring Cloud Stream 的分布式观察者模式
核心原理
Spring Cloud Stream 是 Spring 官方的 “消息驱动开发框架”,封装了不同 MQ(RocketMQ、Kafka、RabbitMQ)的差异,提供统一的 “绑定器(Binder)” 接口,让开发者无需关注具体 MQ 实现,只需定义 “输入(Input)” 和 “输出(Output)” 通道,即可实现分布式观察者模式。
实战代码(简化版)
(1)依赖引入(pom.xml)
<!-- Spring Cloud Stream + RocketMQ Binder -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rocketmq</artifactId><version>2.2.3.RELEASE</version>
</dependency>
(2)配置文件(application.yml)
spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876 # MQ服务地址bindings:# 输出通道(订单服务:发布消息到order.cancel主题)orderCancelOutput:destination: order.cancel # 对应MQ主题content-type: application/json # 消息格式# 输入通道(库存服务:订阅order.cancel主题)orderCancelInput:destination: order.cancelcontent-type: application/jsongroup: storage-consumer-group # 消费者组
(3)定义通道接口(公共模块)
package com.boke.common.stream;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface OrderStream {// 输出通道名称(与配置文件中orderCancelOutput对应)String ORDER_CANCEL_OUTPUT = "orderCancelOutput";// 输入通道名称(与配置文件中orderCancelInput对应)String ORDER_CANCEL_INPUT = "orderCancelInput";// 输出通道:发布消息@Output(ORDER_CANCEL_OUTPUT)MessageChannel orderCancelOutput();// 输入通道:订阅消息@Input(ORDER_CANCEL_INPUT)SubscribableChannel orderCancelInput();
}
(4)被观察者(订单服务:发布消息)
package com.boke.order.service;import com.boke.common.stream.OrderStream;
import com.boke.desginpattern.bo.CancelOrderBO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class OrderService {@Autowiredprivate OrderStream orderStream;public void cancelOrder(CancelOrderBO cancelOrderBO) {System.out.printf("订单服务:处理订单[%s]取消,通过Stream发布消息%n", cancelOrderBO.getOrderNo());// 通过输出通道发布消息orderStream.orderCancelOutput().send(MessageBuilder.withPayload(cancelOrderBO).build());}
}
(5)观察者(库存服务:订阅消息)
package com.boke.storage.service;import com.boke.common.stream.OrderStream;
import com.boke.desginpattern.bo.CancelOrderBO;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;@Service
public class StorageService {// 监听输入通道:接收消息@StreamListener(OrderStream.ORDER_CANCEL_INPUT)public void handleOrderCancel(CancelOrderBO cancelOrderBO) {System.out.printf("库存服务:收到订单[%s]取消消息,恢复库存%n", cancelOrderBO.getOrderNo());}
}
核心优势
-
屏蔽 MQ 差异:切换 MQ(如从 RocketMQ 改为 Kafka)只需修改依赖和配置,无需改业务代码;
-
Spring 生态集成:天然支持 Spring 的事务、异步、依赖注入等特性,开发体验一致;
-
弹性伸缩:支持消费者组的动态扩容,提高消息处理能力。
二、分布式场景:事务一致性解决方案
在分布式观察者模式中,“订单取消”与“跨服务观察者执行”的事务一致性是核心痛点(如订单取消成功,但库存服务未收到消息导致库存未恢复)。以下是两种主流解决方案:
1. 基于MQ事务消息(强一致性)
核心原理
以RocketMQ的事务消息为例,通过“半消息+本地事务+确认/回滚”三步确保事务一致性:
-
发送半消息:订单服务向MQ发送“订单取消”的半消息(半消息对消费者不可见);
-
执行本地事务:订单服务执行“修改订单状态为已取消”的本地事务;
-
确认/回滚消息:
-
若本地事务成功:向MQ发送“确认消息”,半消息变为可见,观察者(库存、账户服务)接收消息执行;
-
若本地事务失败:向MQ发送“回滚消息”,半消息被删除,观察者不会接收。
实战代码(RocketMQ事务消息)
(1)订单服务:事务消息发送
package com.boke.order.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class OrderTransactionService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderDAO orderDAO; // 订单DAO,操作数据库// 执行订单取消+发送事务消息@Transactionalpublic void cancelOrderWithTransaction(CancelOrderBO cancelOrderBO) {// 1. 发送半消息(topic: order.cancel.transaction,事务组:order-transaction-group)Message<CancelOrderBO> message = MessageBuilder.withPayload(cancelOrderBO)// 设置事务ID(确保消息唯一性,避免重复处理).setHeader(RocketMQHeaders.TRANSACTION_ID, "TX_" + cancelOrderBO.getOrderNo()).build();// 2. 发送事务消息,指定事务监听器(OrderTransactionListener)rocketMQTemplate.sendMessageInTransaction("order.cancel.transaction", // 主题"order-transaction-group", // 事务组message,null // 额外参数(可传递本地事务需要的上下文));}// 3. 本地事务执行逻辑(由事务监听器调用)public boolean executeLocalTransaction(CancelOrderBO cancelOrderBO) {try {// 执行本地事务:修改订单状态为已取消int rows = orderDAO.updateOrderStatus(cancelOrderBO.getOrderNo(), "CANCELED");return rows > 0; // 成功返回true,失败返回false} catch (Exception e) {// 异常时回滚本地事务return false;}}// 4. 本地事务状态检查(MQ定时回调,防止网络异常导致的状态不一致)public boolean checkLocalTransaction(CancelOrderBO cancelOrderBO) {// 查询订单当前状态:若为CANCELED,说明本地事务成功;否则失败String status = orderDAO.getOrderStatus(cancelOrderBO.getOrderNo());return "CANCELED".equals(status);}
}
(2)订单服务:事务监听器
package com.boke.order.listener;import com.boke.desginpattern.bo.CancelOrderBO;
import com.boke.order.service.OrderTransactionService;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderTransactionService transactionService;// 执行本地事务@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {CancelOrderBO cancelOrderBO = (CancelOrderBO) msg.getPayload();boolean success = transactionService.executeLocalTransaction(cancelOrderBO);return success ? RocketMQLocalTransactionState.COMMIT : // 成功:确认消息RocketMQLocalTransactionState.ROLLBACK; // 失败:回滚消息}// 检查本地事务状态(MQ定时回调)@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {CancelOrderBO cancelOrderBO = (CancelOrderBO) msg.getPayload();boolean success = transactionService.checkLocalTransaction(cancelOrderBO);return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;}
}
(3)库存服务:订阅事务消息
package com.boke.storage.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@RocketMQMessageListener(topic = "order.cancel.transaction",consumerGroup = "storage-transaction-consumer-group"
)
public class StorageTransactionConsumer implements RocketMQListener<CancelOrderBO> {@Autowiredprivate StorageDAO storageDAO;// 接收确认后的消息,执行库存恢复(本地事务)@Transactional@Overridepublic void onMessage(CancelOrderBO cancelOrderBO) {// 1. 幂等性校验:查询订单是否已处理过库存boolean hasProcessed = storageDAO.checkOrderStockProcessed(cancelOrderBO.getOrderNo());if (hasProcessed) {System.out.printf("库存服务:订单[%s]已处理过库存,跳过%n", cancelOrderBO.getOrderNo());return;}// 2. 恢复库存storageDAO.increaseStock(cancelOrderBO.getProductId(), cancelOrderBO.getQuantity());// 3. 标记订单已处理库存(幂等性记录)storageDAO.markOrderStockProcessed(cancelOrderBO.getOrderNo());System.out.printf("库存服务:订单[%s]库存恢复完成%n", cancelOrderBO.getOrderNo());}
}
2. 基于最终一致性(柔性事务)
若业务对一致性要求不高(如日志记录、通知推送),可采用 “最终一致性” 方案,通过 “重试 + 补偿” 确保观察者最终执行成功:
-
发送消息:订单服务取消订单后,直接向 MQ 发送消息(无需事务消息);
-
重试机制:观察者服务接收消息失败时,MQ 自动重试(如 RocketMQ 默认重试 16 次);
-
死信队列:重试多次失败的消息进入死信队列,后续通过人工干预或补偿任务处理;
-
定时补偿:定时任务查询 “未处理的订单取消记录”,重新发送消息触发观察者执行。
核心代码(定时补偿任务)
package com.boke.order.task;import com.boke.desginpattern.bo.CancelOrderBO;
import com.boke.order.dao.OrderDAO;
import com.boke.order.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;@Component
public class OrderCancelCompensateTask {@Autowiredprivate OrderDAO orderDAO;@Autowiredprivate OrderService orderService;// 每5分钟执行一次补偿任务@Scheduled(cron = "0 0/5 * * * ?")public void compensateUnprocessedOrders() {// 1. 查询“已取消但未通知观察者”的订单(状态:CANCELED,通知状态:UNNOTIFIED)List<CancelOrderBO> unprocessedOrders = orderDAO.listUnnotifiedCancelOrders();if (unprocessedOrders.isEmpty()) {return;}// 2. 重新发送消息,触发观察者执行for (CancelOrderBO order : unprocessedOrders) {try {orderService.cancelOrder(order); // 重新发送MQ消息System.out.printf("补偿任务:订单[%s]重新通知观察者成功%n", order.getOrderNo());} catch (Exception e) {System.err.printf("补偿任务:订单[%s]重新通知失败,下次重试%n", order.getOrderNo());}}}
}
cancelOrder这个方法调用的是(3)被观察者(订单服务:发布消息)中的方法,需要注意的是消费端所有消费者消费了消息后,才会更新通知状态:UNNOTIFIED。
这里有两种处理方案
方案 1:全量确认(所有消费者处理完成才标记)
适用于要求所有观察者必须处理完成的场景(如订单取消后,库存恢复和退款都必须成功)。以上介绍的就是这种方案。
方案 2:独立确认(每个消费者单独标记,补偿时按类型重试)
适用于消费者之间无依赖的场景(如日志服务和通知服务,某一个失败不影响其他)。
@Component
public class CompensateTask {@Autowiredprivate OrderDAO orderDAO;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Scheduled(cron = "0 0/5 * * * ?")public void compensate() {// 查询所有已取消但有未完成消费者的订单List<CancelOrderBO> orders = orderDAO.listOrdersWithUncompletedConsumers();for (CancelOrderBO order : orders) {// 获取该订单未完成的消费者类型List<String> uncompletedTypes = orderDAO.listUncompletedConsumers(order.getOrderNo());for (String type : uncompletedTypes) {// 按类型发送补偿消息(如只给库存服务重发)rocketMQTemplate.convertAndSend("order.cancel." + type, // 按类型拆分主题order);}}}
}
两种方案对比与选择
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
全量确认 | 消费者之间有强依赖(如库存恢复后才能退款) | 保证所有业务操作都完成 | 一个消费者失败会阻塞整体标记 |
独立确认 | 消费者之间无依赖(如日志、通知) | 单个消费者失败不影响其他 | 需要维护多类型状态,补偿逻辑较复杂 |
实际项目中,建议优先使用方案 1(全量确认),通过 “分布式锁” 避免多个消费者同时更新整体状态的冲突(如在isAllConfirmed查询后加锁再更新)。如果消费者之间确实无依赖,可采用方案 2 并按类型拆分消息主题,提高补偿效率。
三、观察者模式性能优化技巧
无论是单体还是分布式场景,观察者模式的性能优化都需关注 “减少阻塞”“避免重复执行”“资源合理分配” 三个核心点:
1. 单体应用性能优化
(1)异步执行观察者
对耗时的观察者(如调用第三方接口发送短信),采用异步执行避免阻塞被观察者:
// 方式1:Spring事件+@Async(前文已讲)
// 方式2:手动线程池(非Spring场景)
public class CancelOrderSubject {// 自定义线程池private final ExecutorService executor = Executors.newFixedThreadPool(5);public void notifyObservers(CancelOrderBO bo) {for (CancelOrderListener listener : listeners) {// 异步执行观察者executor.submit(() -> {try {listener.process(bo);} catch (Exception e) {System.err.printf("观察者执行失败:%s%n", e.getMessage());}});}}
}
(2)观察者分组执行
若观察者数量多,可按 “重要性” 分组(如 “核心观察者”:库存、账户;“非核心观察者”:日志、通知),核心观察者同步执行确保可靠性,非核心观察者异步执行减少阻塞:
@Component
public class OrderController {// 核心观察者(自动注入)@Resourceprivate List<CancelOrderCoreListener> coreListeners;// 非核心观察者(自动注入)@Resourceprivate List<CancelOrderNonCoreListener> nonCoreListeners;// 异步线程池@Autowiredprivate Executor observerAsyncExecutor;@PostMapping("cancel")public String cancel(@RequestBody CancelOrderBO bo) {// 1. 同步执行核心观察者(确保库存、账户必执行)for (CancelOrderCoreListener listener : coreListeners) {listener.process(bo);}// 2. 异步执行非核心观察者(日志、通知不阻塞响应)observerAsyncExecutor.execute(() -> {for (CancelOrderNonCoreListener listener : nonCoreListeners) {listener.process(bo);}});return "success";}
}
2. 分布式应用性能优化
(1)消息批量处理
若短时间内有大量订单取消(如秒杀活动结束后),观察者服务可开启 “批量消费”,减少 MQ 消息处理次数:
RocketMQ批量消费配置(application.yml)
rocketmq:consumer:group: storage-consumer-groupbatch-max-size: 32 # 每次拉取32条消息批量处理consume-message-batch-max-size: 16 # 每次消费16条消息
批量消费代码
@RocketMQMessageListener(topic = "order.cancel",consumerGroup = "storage-consumer-group",consumeMode = ConsumeMode.BATCH // 开启批量消费
)
public class StorageBatchConsumer implements RocketMQListener<List<CancelOrderBO>> {@Overridepublic void onMessage(List<CancelOrderBO> messages) {System.out.printf("库存服务:批量处理%d条订单取消消息%n", messages.size());// 批量恢复库存(减少数据库操作次数)storageDAO.batchIncreaseStock(messages);}
}
(2)消息过滤
观察者服务只消费自己关心的消息(如库存服务只处理 “实物商品订单”,虚拟商品订单无需处理),减少无效消费:
// RocketMQ消息过滤(Tag过滤)
@RocketMQMessageListener(topic = "order.cancel",consumerGroup = "storage-consumer-group",selectorExpression = "PHYSICAL_ORDER" // 只消费Tag为PHYSICAL_ORDER的消息
)public class StoragePhysicalConsumer implements RocketMQListener<CancelOrderBO> {@Overridepublic void onMessage(CancelOrderBO bo) {// 只处理实物商品订单库存if ("PHYSICAL".equals(bo.getProductType())) {storageDAO.increaseStock(bo.getProductId(), bo.getQuantity());}}
}// 订单服务发送消息时指定Tag
rocketMQTemplate.convertAndSend("order.cancel:PHYSICAL_ORDER", cancelOrderBO);
四、总结:观察者模式的全场景应用图谱
从单体到分布式,从同步到异步,观察者模式的应用需根据业务场景灵活选择方案,以下是全场景应用图谱,帮你快速定位适合的方案:
场景类型 | 核心需求 | 推荐方案 | 关键注意点 |
---|---|---|---|
单体同步 | 低延迟、强依赖 | 注入接口(@Order 控制顺序) | 避免观察者抛出异常中断执行 |
单体异步 | 不阻塞、耗时操作 | Spring 事件(@Async + 自定义线程池) | 异步线程安全、异常处理 |
分布式强一致 | 事务可靠、数据不丢失 | MQ 事务消息(RocketMQ/Kafka) | 幂等性校验、事务状态检查 |
分布式最终一致 | 低复杂度、非核心操作 | MQ 普通消息 + 重试 + 补偿任务 | 死信队列处理、定时补偿 |
高并发分布式 | 高吞吐、低延迟 | MQ 批量消费 + 消息过滤 | 批量大小调优、Tag/Filter 优化 |
观察者模式的核心价值在于 “解耦”,但 “解耦” 不是目的,而是手段 —— 最终是为了让系统更易维护、更易扩展。在实际项目中,无需盲目追求 “最复杂的方案”,而是根据业务的 “一致性要求”“并发量”“维护成本” 选择平衡点,这才是设计模式落地的关键。