当前位置: 首页 > news >正文

Saga 模式实战 Demo

以下是一个基于 Apache Camel 的完整 Saga 模式实战 Demo,包含订单创建、库存扣减、支付扣款和物流发送四个服务,结合补偿机制实现最终一致性。代码已充分注释,可直接运行。


环境准备

  1. 依赖添加
    pom.xml 中添加 Apache Camel 和 ActiveMQ 依赖:

    <dependencies>
        <!-- Apache Camel 核心 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>3.14.0</version>
        </dependency>
        <!-- Camel Saga 组件 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-saga</artifactId>
            <version>3.14.0</version>
        </dependency>
        <!-- ActiveMQ 集成 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-activemq</artifactId>
            <version>3.14.0</version>
        </dependency>
        <!-- SLF4J 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
    </dependencies>
    
  2. ActiveMQ 配置
    application.yml 中配置 ActiveMQ 连接:

    camel:
      activemq:
        broker-url: tcp://localhost:61616
        connection-factory:
          pooling:
            enabled: true
            max-connections: 10
      components:
        activemq:
          component-name: activemq
          broker-url: tcp://localhost:61616
    

业务代码实现

1. 订单服务(OrderService)
@Service
public class OrderService {

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private LogisticsService logisticsService;

    @Autowired
    private CamelContext camelContext;

    // Saga 起始点:创建订单
    public void createOrder(OrderRequest request) {
        camelContext.createProducerTemplate().sendBody("direct:startOrder", request);
    }
}

2. 服务实现类
InventoryService(库存服务)
@Service
public class InventoryService {

    // 扣减库存(正向操作)
    public void deductStock(String sku) {
        // 模拟库存扣减(实际应查询数据库)
        System.out.println("扣减库存 SKU: " + sku + ",剩余库存: 99");
        // 故意模拟 30% 失败率
        if (Math.random() < 0.3) {
            throw new RuntimeException("库存不足,SKU: " + sku);
        }
    }

    // 恢复库存(补偿操作)
    public void restoreStock(String sku) {
        System.out.println("恢复库存 SKU: " + sku + ",剩余库存: 100");
    }
}
PaymentService(支付服务)
@Service
public class PaymentService {

    // 扣款(正向操作)
    public void chargePayment(BigDecimal amount) {
        System.out.println("扣款金额: " + amount + ",成功");
        // 模拟 20% 失败率
        if (Math.random() < 0.2) {
            throw new RuntimeException("支付失败,金额: " + amount);
        }
    }

    // 退款(补偿操作)
    public void refundPayment(BigDecimal amount) {
        System.out.println("退款金额: " + amount + ",成功");
    }
}
LogisticsService(物流服务)
@Service
public class LogisticsService {

    // 发送物流(正向操作)
    public void sendLogistics(Order order) {
        System.out.println("物流已发送,订单号: " + order.getId());
    }
}

3. Camel 路由配置

src/main/resources/routes.xml 中定义 Saga 路由:

<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
    <!-- Saga 路由定义 -->
    <route id="orderSaga" errorHandler="errorHandler">
        <from uri="direct:startOrder"/>
        
        <!-- Step 1: 扣减库存 -->
        <Saga>
            <serviceCall>
                <method>inventoryService.deductStock</method>
                <arguments>
                    <argument>${header.sku}</argument>
                </arguments>
            </serviceCall>
            <compensationMethod>inventoryService.restoreStock</compensationMethod>
        </Saga>
        <to uri="direct:generateOrder"/>
        
        <!-- Step 2: 生成订单 -->
        <Saga>
            <serviceCall>
                <method>orderDAO.insertOrder</method>
                <arguments>
                    <argument>${body.order}</argument>
                </arguments>
            </serviceCall>
            <compensationMethod>orderDAO.deleteOrder</compensationMethod>
        </Saga>
        <to uri="direct:chargePayment"/>
        
        <!-- Step 3: 扣款 -->
        <Saga>
            <serviceCall>
                <method>paymentService.chargePayment</method>
                <arguments>
                    <argument>${header.amount}</argument>
                </arguments>
            </serviceCall>
            <compensationMethod>paymentService.refundPayment</compensationMethod>
        </Saga>
        <to uri="direct:sendLogistics"/>
        
        <!-- Step 4: 发送物流 -->
        <Saga>
            <serviceCall>
                <method>logisticsService.sendLogistics</method>
                <arguments>
                    <argument>${body.order}</argument>
                </arguments>
            </serviceCall>
            <!-- 无需补偿操作 -->
        </Saga>
    </route>

    <!-- 错误处理 -->
    <errorHandler id="errorHandler">
        <onException>
            <exception>java.lang.RuntimeException</exception>
            <handled>true</handled>
            <log message="发生错误: ${exception.message}"/>
            <!-- 触发全链路补偿 -->
            <redeliveryPolicy>
                <maximumRedeliveries>3</maximumRedeliveries>
                <delay>5000</delay>
            </redeliveryPolicy>
        </onException>
    </errorHandler>
</routes>

4. 配置与启动

启动类配置

在 Spring Boot 启动类中启用 Camel 路由:

@SpringBootApplication
public class SagaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SagaDemoApplication.class, args);
    }

    @Bean
    public RouteBuilder routeBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                // 加载 XML 路由
                from("file:src/main/resources/routes.xml").to("camelContext:routes");
            }
        };
    }
}

5. 测试与验证

1. 正常流程

发送请求:

OrderRequest request = new OrderRequest(
    "user123",
    new BigDecimal("100.00"),
    "SKU_123"
);

orderService.createOrder(request);

输出日志

扣减库存 SKU: SKU_123,剩余库存: 99
扣款金额: 100.00,成功
物流已发送,订单号: 1
2. 模拟支付失败

修改 PaymentService.chargePayment 方法,强制抛出异常:

public void chargePayment(BigDecimal amount) {
    throw new RuntimeException("支付失败!");
}

再次发送请求,观察补偿流程:

扣减库存 SKU: SKU_123,剩余库存: 99
退款金额: 100.00,成功
恢复库存 SKU: SKU_123,剩余库存: 100

关键原理与优势

1. Saga 路由定义

<Saga> 标签:标记事务边界,自动绑定正向服务和补偿方法。
serviceCall:指定具体服务方法及参数(通过 ${header}${body} 传递上下文)。
compensationMethod:定义失败时的反向补偿操作。

2. 消息持久化

ActiveMQ 配置:通过 camel-activemq 的持久化配置,确保事件在服务重启后不丢失。

3. 错误重试

<redeliveryPolicy>:配置最大重试次数和延迟时间,避免无限重试。


常见问题与解决

1. 补偿操作未执行

检查点
• 确保补偿方法签名与正向方法完全一致。
• 验证服务类是否被 Spring 管理(添加 @Service 注解)。

2. 消息重复消费

解决方案
• 在事件中添加全局唯一 ID(如 UUID),通过 exchange.getMessage().getMessageId() 获取。
• 使用 Redis 或数据库记录已处理消息。

3. 服务调用超时

解决方案
• 在 serviceCall 中配置超时时间:
xml <serviceCall> <method>...</method> <timeout>5000</timeout> <!-- 5秒超时 --> </serviceCall>


总结

通过 Apache Camel 的 Saga 组件,我们无需手动编写复杂的补偿逻辑,即可实现分布式事务的最终一致性保障。其核心优势在于:
声明式路由:通过 XML 或 DSL 定义事务流程,清晰直观。
自动补偿:失败时自动触发逆向操作,减少开发成本。
高可用性:结合 ActiveMQ 实现事件持久化和重试机制。

适用场景
• 微服务架构下的长事务(如电商订单、金融交易)。
• 需要灵活补偿逻辑的复杂业务场景。

进阶方向
• 结合 Seata 框架实现更强大的分布式事务管理。
• 添加监控和报警(如 Prometheus + Grafana)实时跟踪 Saga 执行状态。

相关文章:

  • 人工智能:企业RAG方案
  • Java面试黄金宝典8
  • 算法1--两束求和
  • 【电源专题】锂电池保护IC/锂电池电压监测IC/锂电池电量计IC/锂电池充电控制IC常见封装一览表
  • MySQL连接较慢原因分析及解决措施
  • 基于Arm GNU Toolchain编译生成的.elf转hex/bin文件格式方法
  • 卷积神经网络 - 梯度和反向传播算法
  • 六十天前端强化训练之第二十五天之组件生命周期大师级详解(Vue3 Composition API 版)
  • 2020年全国职业院校技能大赛改革试点赛高职组“云计算”竞赛赛卷第二场次题目:容器云平台部署与运维
  • spring.datasource.filters = stat,wall配置解释
  • 深入解析 Java Stream API:从 List 到 Map 的优雅转换!!!
  • 计算机基础:编码03,根据十进制数,求其原码
  • 工业数据驱动智能维护的深度调研报告
  • SystemVerilog 数据类型
  • DeepSeek的崛起:2025新春国产AI模型的全球影响力
  • C# 集合(Collection)详解以及区别
  • 【C++网络编程】第2篇:简单的TCP服务器与客户端
  • 【华为Pura先锋盛典】华为Pura X“阔折叠”手机发布:首次全面搭载HarmonyOS 5
  • 常用的 MyBatis 标签及其作用
  • Java EE(14)——网络原理——UDPTCP数据报的结构
  • 刘元春在《光明日报》撰文:以法治护航民营经济高质量发展
  • 人民日报刊文:加快解放和发展新质战斗力
  • 人民日报钟声:平等对话是解决大国间问题的正确之道
  • 罕见沙尘再度入川,官方:沙尘传输高度达到平流层,远超以往
  • “降息潮”延续!存款利率全面迈向“1时代”
  • 101条关于减重的知识,其中一定有你不知道的