当前位置: 首页 > news >正文

Java 设计模式——观察者模式进阶:分布式场景扩展与实战配置

Java 设计模式——观察者模式进阶:分布式场景扩展与实战配置

前文已覆盖单体应用中观察者模式的核心写法与应用,但在实际项目中,随着业务发展,往往会从 “单体” 走向 “分布式”—— 此时传统的本地观察者模式已无法满足 “跨服务通知” 的需求。本节将补充分布式场景下的观察者模式扩展方案,以及单体应用中观察者模式的完整运行配置,让你不仅能在本地用,更能在分布式架构中落地。

文章目录

  • Java 设计模式——观察者模式进阶:分布式场景扩展与实战配置
    • 一、分布式场景:观察者模式的扩展方案
      • 1. 基于消息队列(MQ)的分布式观察者模式
        • 核心原理
        • 实战代码(基于 RocketMQ + SpringBoot)
          • (1)依赖引入(pom.xml)
          • (2)配置文件(application.yml)
          • (3)被观察者(订单服务:发布消息)
          • (4)观察者 1(库存服务:订阅消息)
          • (5)观察者 2(账户服务:订阅消息)
        • 核心优势与注意事项
      • 2. 基于 Spring Cloud Stream 的分布式观察者模式
        • 核心原理
        • 实战代码(简化版)
          • (1)依赖引入(pom.xml)
          • (2)配置文件(application.yml)
          • (3)定义通道接口(公共模块)
          • (4)被观察者(订单服务:发布消息)
          • (5)观察者(库存服务:订阅消息)
        • 核心优势
    • 二、分布式场景:事务一致性解决方案
      • 1. 基于MQ事务消息(强一致性)
        • 核心原理
        • 实战代码(RocketMQ事务消息)
          • (1)订单服务:事务消息发送
          • (2)订单服务:事务监听器
          • (3)库存服务:订阅事务消息
      • 2. 基于最终一致性(柔性事务)
        • 核心代码(定时补偿任务)
        • 方案 1:全量确认(所有消费者处理完成才标记)
        • 方案 2:独立确认(每个消费者单独标记,补偿时按类型重试)
        • 两种方案对比与选择
    • 三、观察者模式性能优化技巧
      • 1. 单体应用性能优化
        • (1)异步执行观察者
        • (2)观察者分组执行
      • 2. 分布式应用性能优化
        • (1)消息批量处理
        • (2)消息过滤
    • 四、总结:观察者模式的全场景应用图谱

一、分布式场景:观察者模式的扩展方案

分布式环境下,被观察者与观察者可能部署在不同服务节点(如订单服务取消订单后,需通知库存服务、账户服务、日志服务,且这些服务可能是独立部署的),此时需借助 “中间件” 实现跨服务的 “发布 - 订阅”,本质是观察者模式的 “远程化”。

1. 基于消息队列(MQ)的分布式观察者模式

核心原理

用 MQ 的 “主题(Topic)” 机制替代本地观察者列表:

  • 被观察者(发布者):订单服务取消订单后,向 MQ 的order.cancel主题发送消息(相当于 “发布事件”);

  • 观察者(订阅者):库存服务、账户服务订阅order.cancel主题,接收消息后执行本地业务(相当于 “接收通知”);

  • 中间件角色:MQ(如 RocketMQ、Kafka)负责消息的存储、转发,确保跨服务通知的可靠性(如重试、死信队列)。

实战代码(基于 RocketMQ + SpringBoot)
(1)依赖引入(pom.xml)
<!-- RocketMQ SpringBoot Starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
(2)配置文件(application.yml)
rocketmq:name-server: 127.0.0.1:9876  # MQ服务地址producer:group: order-producer-group  # 生产者组(订单服务作为生产者)consumer:group: storage-consumer-group  # 消费者组(库存服务作为消费者,每个服务独立组)
(3)被观察者(订单服务:发布消息)
package com.boke.order.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 取消订单:发布消息到MQ的order.cancel主题public void cancelOrder(CancelOrderBO cancelOrderBO) {System.out.printf("订单服务:处理订单[%s]取消,发布消息到MQ%n", cancelOrderBO.getOrderNo());// 发送消息(主题:order.cancel,消息体:cancelOrderBO)rocketMQTemplate.convertAndSend("order.cancel", cancelOrderBO);}
}
(4)观察者 1(库存服务:订阅消息)
package com.boke.storage.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
// 订阅order.cancel主题,消费者组为storage-consumer-group
@RocketMQMessageListener(topic = "order.cancel", consumerGroup = "storage-consumer-group")
public class StorageConsumer implements RocketMQListener<CancelOrderBO> {// 接收消息:执行库存恢复逻辑@Overridepublic void onMessage(CancelOrderBO cancelOrderBO) {System.out.printf("库存服务:收到订单[%s]取消消息,恢复商品库存%n", cancelOrderBO.getOrderNo());// ps 分布式锁:防止并发处理同一订单(可选,视并发量而定),记得最后释放锁
//       RLock lock = redissonClient.getLock("storage:confirm:" + orderNo + ":" + consumerType);// 1. 幂等性校验:查询订单是否已处理过库存
//       boolean hasProcessed = storageDAO.checkOrderStockProcessed(cancelOrderBO.getOrderNo());
//       if (hasProcessed) {
//           System.out.printf("库存服务:订单[%s]已处理过库存,跳过%n", cancelOrderBO.getOrderNo());
//           return;
//       }// 2. 实际业务:调用库存DAO修改库存// storageDAO.increaseStock(cancelOrderBO.getProductId(), cancelOrderBO.getQuantity());// 3. 更新自身确认状态(标记库存服务已完成)
//       confirmDAO.updateConsumerStatus(bo.getOrderNo(), "STORAGE", "COMPLETED");
//
//       // 4. 检查是否所有消费者都已完成
//       if (confirmDAO.isAllConfirmed(bo.getOrderNo())) {
//           // 全量完成,更新订单整体状态
//           orderFeign.updateOrderNotifyStatus(bo.getOrderNo(), "NOTIFIED");
//       }}
}
(5)观察者 2(账户服务:订阅消息)
package com.boke.account.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "order.cancel", consumerGroup = "account-consumer-group")
public class AccountConsumer implements RocketMQListener<CancelOrderBO> {@Overridepublic void onMessage(CancelOrderBO cancelOrderBO) {System.out.printf("账户服务:收到订单[%s]取消消息,给用户[%s]退款[%s]元%n", cancelOrderBO.getOrderNo(), cancelOrderBO.getUserId(), cancelOrderBO.getAmount());// ps 分布式锁:防止并发处理同一订单(可选,视并发量而定),记得最后释放锁
//       RLock lock = redissonClient.getLock("account:confirm:" + orderNo + ":" + consumerType);// 1. 幂等性校验:查询订单是否已处理过退款
//       boolean hasProcessed = accountDAO.checkOrderRefundProcessed(cancelOrderBO.getOrderNo());
//       if (hasProcessed) {
//           System.out.printf("账户服务:订单[%s]已处理过退款,跳过%n", cancelOrderBO.getOrderNo());
//           return;
//       }// 2. 实际业务:调用账户DAO退款// accountDAO.refund(cancelOrderBO.getUserId(), cancelOrderBO.getAmount());// 3. 更新自身确认状态(标记退款服务已完成)
//       confirmDAO.updateRefundStatus(bo.getOrderNo(), "REFUND", "COMPLETED");
//
//       // 4. 检查是否所有消费者都已完成
//       if (confirmDAO.isAllConfirmed(bo.getOrderNo())) {
//           // 全量完成,更新订单整体状态
//           orderFeign.updateOrderNotifyStatus(bo.getOrderNo(), "NOTIFIED");
//       }}
}
核心优势与注意事项
  • 优势:解耦彻底(服务间无直接依赖)、支持高并发(MQ 削峰填谷)、可靠性高(支持消息重试、死信队列);

  • 注意事项

消息幂等性:需确保观察者重复接收消息时,业务逻辑不会重复执行(如给订单加 “已处理” 标记,接收消息时先校验);

消息顺序:若需观察者按顺序执行(如先退款后恢复库存),需使用 MQ 的 “顺序消息” 功能(如 RocketMQ 的分区顺序);

事务一致性:若需保证 “订单取消” 与 “观察者执行” 的事务一致性(如订单取消失败,观察者不能执行),需使用 “事务消息”(如 RocketMQ 的事务消息机制)。

2. 基于 Spring Cloud Stream 的分布式观察者模式

核心原理

Spring Cloud Stream 是 Spring 官方的 “消息驱动开发框架”,封装了不同 MQ(RocketMQ、Kafka、RabbitMQ)的差异,提供统一的 “绑定器(Binder)” 接口,让开发者无需关注具体 MQ 实现,只需定义 “输入(Input)” 和 “输出(Output)” 通道,即可实现分布式观察者模式。

实战代码(简化版)
(1)依赖引入(pom.xml)
<!-- Spring Cloud Stream + RocketMQ Binder -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rocketmq</artifactId><version>2.2.3.RELEASE</version>
</dependency>
(2)配置文件(application.yml)
spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876  # MQ服务地址bindings:# 输出通道(订单服务:发布消息到order.cancel主题)orderCancelOutput:destination: order.cancel  # 对应MQ主题content-type: application/json  # 消息格式# 输入通道(库存服务:订阅order.cancel主题)orderCancelInput:destination: order.cancelcontent-type: application/jsongroup: storage-consumer-group  # 消费者组
(3)定义通道接口(公共模块)
package com.boke.common.stream;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface OrderStream {// 输出通道名称(与配置文件中orderCancelOutput对应)String ORDER_CANCEL_OUTPUT = "orderCancelOutput";// 输入通道名称(与配置文件中orderCancelInput对应)String ORDER_CANCEL_INPUT = "orderCancelInput";// 输出通道:发布消息@Output(ORDER_CANCEL_OUTPUT)MessageChannel orderCancelOutput();// 输入通道:订阅消息@Input(ORDER_CANCEL_INPUT)SubscribableChannel orderCancelInput();
}
(4)被观察者(订单服务:发布消息)
package com.boke.order.service;import com.boke.common.stream.OrderStream;
import com.boke.desginpattern.bo.CancelOrderBO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class OrderService {@Autowiredprivate OrderStream orderStream;public void cancelOrder(CancelOrderBO cancelOrderBO) {System.out.printf("订单服务:处理订单[%s]取消,通过Stream发布消息%n", cancelOrderBO.getOrderNo());// 通过输出通道发布消息orderStream.orderCancelOutput().send(MessageBuilder.withPayload(cancelOrderBO).build());}
}
(5)观察者(库存服务:订阅消息)
package com.boke.storage.service;import com.boke.common.stream.OrderStream;
import com.boke.desginpattern.bo.CancelOrderBO;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;@Service
public class StorageService {// 监听输入通道:接收消息@StreamListener(OrderStream.ORDER_CANCEL_INPUT)public void handleOrderCancel(CancelOrderBO cancelOrderBO) {System.out.printf("库存服务:收到订单[%s]取消消息,恢复库存%n", cancelOrderBO.getOrderNo());}
}
核心优势
  • 屏蔽 MQ 差异:切换 MQ(如从 RocketMQ 改为 Kafka)只需修改依赖和配置,无需改业务代码;

  • Spring 生态集成:天然支持 Spring 的事务、异步、依赖注入等特性,开发体验一致;

  • 弹性伸缩:支持消费者组的动态扩容,提高消息处理能力。

二、分布式场景:事务一致性解决方案

在分布式观察者模式中,“订单取消”与“跨服务观察者执行”的事务一致性是核心痛点(如订单取消成功,但库存服务未收到消息导致库存未恢复)。以下是两种主流解决方案:

1. 基于MQ事务消息(强一致性)

核心原理

以RocketMQ的事务消息为例,通过“半消息+本地事务+确认/回滚”三步确保事务一致性:

  1. 发送半消息:订单服务向MQ发送“订单取消”的半消息(半消息对消费者不可见);

  2. 执行本地事务:订单服务执行“修改订单状态为已取消”的本地事务;

  3. 确认/回滚消息

  • 若本地事务成功:向MQ发送“确认消息”,半消息变为可见,观察者(库存、账户服务)接收消息执行;

  • 若本地事务失败:向MQ发送“回滚消息”,半消息被删除,观察者不会接收。

实战代码(RocketMQ事务消息)
(1)订单服务:事务消息发送
package com.boke.order.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class OrderTransactionService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderDAO orderDAO; // 订单DAO,操作数据库// 执行订单取消+发送事务消息@Transactionalpublic void cancelOrderWithTransaction(CancelOrderBO cancelOrderBO) {// 1. 发送半消息(topic: order.cancel.transaction,事务组:order-transaction-group)Message<CancelOrderBO> message = MessageBuilder.withPayload(cancelOrderBO)// 设置事务ID(确保消息唯一性,避免重复处理).setHeader(RocketMQHeaders.TRANSACTION_ID, "TX_" + cancelOrderBO.getOrderNo()).build();// 2. 发送事务消息,指定事务监听器(OrderTransactionListener)rocketMQTemplate.sendMessageInTransaction("order.cancel.transaction", // 主题"order-transaction-group",   // 事务组message,null // 额外参数(可传递本地事务需要的上下文));}// 3. 本地事务执行逻辑(由事务监听器调用)public boolean executeLocalTransaction(CancelOrderBO cancelOrderBO) {try {// 执行本地事务:修改订单状态为已取消int rows = orderDAO.updateOrderStatus(cancelOrderBO.getOrderNo(), "CANCELED");return rows > 0; // 成功返回true,失败返回false} catch (Exception e) {// 异常时回滚本地事务return false;}}// 4. 本地事务状态检查(MQ定时回调,防止网络异常导致的状态不一致)public boolean checkLocalTransaction(CancelOrderBO cancelOrderBO) {// 查询订单当前状态:若为CANCELED,说明本地事务成功;否则失败String status = orderDAO.getOrderStatus(cancelOrderBO.getOrderNo());return "CANCELED".equals(status);}
}
(2)订单服务:事务监听器
package com.boke.order.listener;import com.boke.desginpattern.bo.CancelOrderBO;
import com.boke.order.service.OrderTransactionService;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderTransactionService transactionService;// 执行本地事务@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {CancelOrderBO cancelOrderBO = (CancelOrderBO) msg.getPayload();boolean success = transactionService.executeLocalTransaction(cancelOrderBO);return success ? RocketMQLocalTransactionState.COMMIT : // 成功:确认消息RocketMQLocalTransactionState.ROLLBACK; // 失败:回滚消息}// 检查本地事务状态(MQ定时回调)@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {CancelOrderBO cancelOrderBO = (CancelOrderBO) msg.getPayload();boolean success = transactionService.checkLocalTransaction(cancelOrderBO);return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;}
}
(3)库存服务:订阅事务消息
package com.boke.storage.service;import com.boke.desginpattern.bo.CancelOrderBO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@RocketMQMessageListener(topic = "order.cancel.transaction",consumerGroup = "storage-transaction-consumer-group"
)
public class StorageTransactionConsumer implements RocketMQListener<CancelOrderBO> {@Autowiredprivate StorageDAO storageDAO;// 接收确认后的消息,执行库存恢复(本地事务)@Transactional@Overridepublic void onMessage(CancelOrderBO cancelOrderBO) {// 1. 幂等性校验:查询订单是否已处理过库存boolean hasProcessed = storageDAO.checkOrderStockProcessed(cancelOrderBO.getOrderNo());if (hasProcessed) {System.out.printf("库存服务:订单[%s]已处理过库存,跳过%n", cancelOrderBO.getOrderNo());return;}// 2. 恢复库存storageDAO.increaseStock(cancelOrderBO.getProductId(), cancelOrderBO.getQuantity());// 3. 标记订单已处理库存(幂等性记录)storageDAO.markOrderStockProcessed(cancelOrderBO.getOrderNo());System.out.printf("库存服务:订单[%s]库存恢复完成%n", cancelOrderBO.getOrderNo());}
}

2. 基于最终一致性(柔性事务)

若业务对一致性要求不高(如日志记录、通知推送),可采用 “最终一致性” 方案,通过 “重试 + 补偿” 确保观察者最终执行成功:

  1. 发送消息:订单服务取消订单后,直接向 MQ 发送消息(无需事务消息);

  2. 重试机制:观察者服务接收消息失败时,MQ 自动重试(如 RocketMQ 默认重试 16 次);

  3. 死信队列:重试多次失败的消息进入死信队列,后续通过人工干预或补偿任务处理;

  4. 定时补偿:定时任务查询 “未处理的订单取消记录”,重新发送消息触发观察者执行。

核心代码(定时补偿任务)
package com.boke.order.task;import com.boke.desginpattern.bo.CancelOrderBO;
import com.boke.order.dao.OrderDAO;
import com.boke.order.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;@Component
public class OrderCancelCompensateTask {@Autowiredprivate OrderDAO orderDAO;@Autowiredprivate OrderService orderService;// 每5分钟执行一次补偿任务@Scheduled(cron = "0 0/5 * * * ?")public void compensateUnprocessedOrders() {// 1. 查询“已取消但未通知观察者”的订单(状态:CANCELED,通知状态:UNNOTIFIED)List<CancelOrderBO> unprocessedOrders = orderDAO.listUnnotifiedCancelOrders();if (unprocessedOrders.isEmpty()) {return;}// 2. 重新发送消息,触发观察者执行for (CancelOrderBO order : unprocessedOrders) {try {orderService.cancelOrder(order); // 重新发送MQ消息System.out.printf("补偿任务:订单[%s]重新通知观察者成功%n", order.getOrderNo());} catch (Exception e) {System.err.printf("补偿任务:订单[%s]重新通知失败,下次重试%n", order.getOrderNo());}}}
}

cancelOrder这个方法调用的是(3)被观察者(订单服务:发布消息)中的方法,需要注意的是消费端所有消费者消费了消息后,才会更新通知状态:UNNOTIFIED。
这里有两种处理方案

方案 1:全量确认(所有消费者处理完成才标记)

适用于要求所有观察者必须处理完成的场景(如订单取消后,库存恢复和退款都必须成功)。以上介绍的就是这种方案。

方案 2:独立确认(每个消费者单独标记,补偿时按类型重试)

适用于消费者之间无依赖的场景(如日志服务和通知服务,某一个失败不影响其他)。

@Component
public class CompensateTask {@Autowiredprivate OrderDAO orderDAO;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Scheduled(cron = "0 0/5 * * * ?")public void compensate() {// 查询所有已取消但有未完成消费者的订单List<CancelOrderBO> orders = orderDAO.listOrdersWithUncompletedConsumers();for (CancelOrderBO order : orders) {// 获取该订单未完成的消费者类型List<String> uncompletedTypes = orderDAO.listUncompletedConsumers(order.getOrderNo());for (String type : uncompletedTypes) {// 按类型发送补偿消息(如只给库存服务重发)rocketMQTemplate.convertAndSend("order.cancel." + type,  // 按类型拆分主题order);}}}
}
两种方案对比与选择
方案适用场景优点缺点
全量确认消费者之间有强依赖(如库存恢复后才能退款)保证所有业务操作都完成一个消费者失败会阻塞整体标记
独立确认消费者之间无依赖(如日志、通知)单个消费者失败不影响其他需要维护多类型状态,补偿逻辑较复杂

实际项目中,建议优先使用方案 1(全量确认),通过 “分布式锁” 避免多个消费者同时更新整体状态的冲突(如在isAllConfirmed查询后加锁再更新)。如果消费者之间确实无依赖,可采用方案 2 并按类型拆分消息主题,提高补偿效率。

三、观察者模式性能优化技巧

无论是单体还是分布式场景,观察者模式的性能优化都需关注 “减少阻塞”“避免重复执行”“资源合理分配” 三个核心点:

1. 单体应用性能优化

(1)异步执行观察者

对耗时的观察者(如调用第三方接口发送短信),采用异步执行避免阻塞被观察者:

// 方式1:Spring事件+@Async(前文已讲)
// 方式2:手动线程池(非Spring场景)
public class CancelOrderSubject {// 自定义线程池private final ExecutorService executor = Executors.newFixedThreadPool(5);public void notifyObservers(CancelOrderBO bo) {for (CancelOrderListener listener : listeners) {// 异步执行观察者executor.submit(() -> {try {listener.process(bo);} catch (Exception e) {System.err.printf("观察者执行失败:%s%n", e.getMessage());}});}}
}
(2)观察者分组执行

若观察者数量多,可按 “重要性” 分组(如 “核心观察者”:库存、账户;“非核心观察者”:日志、通知),核心观察者同步执行确保可靠性,非核心观察者异步执行减少阻塞:

@Component
public class OrderController {// 核心观察者(自动注入)@Resourceprivate List<CancelOrderCoreListener> coreListeners;// 非核心观察者(自动注入)@Resourceprivate List<CancelOrderNonCoreListener> nonCoreListeners;// 异步线程池@Autowiredprivate Executor observerAsyncExecutor;@PostMapping("cancel")public String cancel(@RequestBody CancelOrderBO bo) {// 1. 同步执行核心观察者(确保库存、账户必执行)for (CancelOrderCoreListener listener : coreListeners) {listener.process(bo);}// 2. 异步执行非核心观察者(日志、通知不阻塞响应)observerAsyncExecutor.execute(() -> {for (CancelOrderNonCoreListener listener : nonCoreListeners) {listener.process(bo);}});return "success";}
}

2. 分布式应用性能优化

(1)消息批量处理

若短时间内有大量订单取消(如秒杀活动结束后),观察者服务可开启 “批量消费”,减少 MQ 消息处理次数:

RocketMQ批量消费配置(application.yml)

rocketmq:consumer:group: storage-consumer-groupbatch-max-size: 32  # 每次拉取32条消息批量处理consume-message-batch-max-size: 16  # 每次消费16条消息

批量消费代码

@RocketMQMessageListener(topic = "order.cancel",consumerGroup = "storage-consumer-group",consumeMode = ConsumeMode.BATCH // 开启批量消费
)
public class StorageBatchConsumer implements RocketMQListener<List<CancelOrderBO>> {@Overridepublic void onMessage(List<CancelOrderBO> messages) {System.out.printf("库存服务:批量处理%d条订单取消消息%n", messages.size());// 批量恢复库存(减少数据库操作次数)storageDAO.batchIncreaseStock(messages);}
}
(2)消息过滤

观察者服务只消费自己关心的消息(如库存服务只处理 “实物商品订单”,虚拟商品订单无需处理),减少无效消费:

// RocketMQ消息过滤(Tag过滤)
@RocketMQMessageListener(topic = "order.cancel",consumerGroup = "storage-consumer-group",selectorExpression = "PHYSICAL_ORDER" // 只消费Tag为PHYSICAL_ORDER的消息
)public class StoragePhysicalConsumer implements RocketMQListener<CancelOrderBO> {@Overridepublic void onMessage(CancelOrderBO bo) {// 只处理实物商品订单库存if ("PHYSICAL".equals(bo.getProductType())) {storageDAO.increaseStock(bo.getProductId(), bo.getQuantity());}}
}// 订单服务发送消息时指定Tag
rocketMQTemplate.convertAndSend("order.cancel:PHYSICAL_ORDER", cancelOrderBO);

四、总结:观察者模式的全场景应用图谱

从单体到分布式,从同步到异步,观察者模式的应用需根据业务场景灵活选择方案,以下是全场景应用图谱,帮你快速定位适合的方案:

场景类型核心需求推荐方案关键注意点
单体同步低延迟、强依赖注入接口(@Order 控制顺序)避免观察者抛出异常中断执行
单体异步不阻塞、耗时操作Spring 事件(@Async + 自定义线程池)异步线程安全、异常处理
分布式强一致事务可靠、数据不丢失MQ 事务消息(RocketMQ/Kafka)幂等性校验、事务状态检查
分布式最终一致低复杂度、非核心操作MQ 普通消息 + 重试 + 补偿任务死信队列处理、定时补偿
高并发分布式高吞吐、低延迟MQ 批量消费 + 消息过滤批量大小调优、Tag/Filter 优化

观察者模式的核心价值在于 “解耦”,但 “解耦” 不是目的,而是手段 —— 最终是为了让系统更易维护、更易扩展。在实际项目中,无需盲目追求 “最复杂的方案”,而是根据业务的 “一致性要求”“并发量”“维护成本” 选择平衡点,这才是设计模式落地的关键。

http://www.dtcms.com/a/387265.html

相关文章:

  • ​​[硬件电路-238]:电阻、电容、电感对数字电路中的作用
  • IPD驱动下的电源技术革命:华为数字能源模块化复用与降本增效实践
  • 线性回归与 Softmax 回归:深度学习基础模型解析
  • 安全迎国庆|假日期间,企业如何做好网络安全防护?
  • Product Hunt 每日热榜 | 2025-09-16
  • 告别静态图谱!TextSSL如何用「稀疏学习」实现更智能的文档分类?
  • centos Apache服务器安装与配置全攻略
  • centos配置hadoop环境变量并可启动hadoop集群
  • 告别“扁平化”UI:我用Substance Painter+glTF,构建空间感交互界面工作流
  • 【2026计算机毕业设计】基于Django的选课系统的设计与实现
  • 大文件传输软件选型指南:如何选择高效安全的企业级解决方案
  • 元宇宙与教育产业:沉浸式交互重构教育全流程生态
  • linux时间同步
  • Linux嵌入式自学笔记(基于野火EBF6ULL):3.连网、Linux文件目录
  • 【高并发内存池——项目】thread cache 讲解
  • InnoDB ACID实现:数据库可靠性的核心秘密
  • python ui框架
  • 【Linux手册】解决多线程共享资源访问冲突:互斥锁与条件变量的使用及底层机制
  • 基于微信小程序跑腿小程序设计与实现
  • 微信小程序-6-页面布局和事件绑定以及页面跳转
  • InnoDB多版本控制:揭秘MVCC核心机制
  • SpringMVC 系列博客(二):核心功能深入 —— 请求映射、返回值与参数绑定
  • HTTPS报文在SSL/TLS证书安全隧道传输的原理
  • 线性回归与 Softmax 回归技术报告
  • 不同团队如何选GIS软件?ArcGIS Pro、GISBox与SuperMap优劣势及适用方案
  • 静态标签云
  • AI解决企业内训之痛-智能企业内训平台解决方案
  • 容器化部署番外篇之docker网络通信06
  • Windows安装ES8.10流程及安装过程中出现的问题
  • 【工具代码】使用Python截取(切割)视频片段,截取视频中的音频,截取音频片段