当前位置: 首页 > news >正文

ABP VNext + Dapr Workflows:轻量级分布式工作流

🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流


📚 目录

  • 🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流
    • 一、引言 ✨
      • TL;DR 🔥
    • 二、环境与依赖 🛠️
    • 三、系统架构与流程图 🏗️
    • 四、在 ABP 模块中注册 Dapr Workflows 📦
    • 五、定义 Workflow 与 Activities 🎯
      • 5.1 定义活动(Activity)
      • 5.2 定义工作流(Workflow)
    • 六、触发与查询工作流 🔍
      • 6.1 启动 ABP 应用
      • 6.2 发起工作流
      • 6.3 暴露查询端点
    • 七、示例演示 🎬
    • 八、最佳实践与优化 💡


一、引言 ✨

TL;DR 🔥

  • 在 ABP VNext 应用中,只需一行 services.AddDaprWorkflow(...) 即可无侵入集成 Dapr Workflow SDK,开启长运行分布式工作流编排 🎉 (Dapr Docs)
  • 通过 state.redis 或 CosmosDB 等可插拔 State Store 实现跨服务状态持久化与恢复,支持 Saga 补偿模式 🔄 (Dapr Docs)
  • 定义继承自 Workflow<TInput, TOutput> 的工作流类与 WorkflowActivity<TArg, TResult> 的活动类,使用 context.CallActivityAsync 保证确定性重放 🛠️ (Diagrid)
  • 演示“下单—保留库存—扣款—失败补偿”全流程,涵盖高性能、高可用、易复现实践 ✅

背景
在微服务架构中,分布式事务难以扩展,“最终一致性”与 Saga 模式已成主流。Dapr Workflows 提供代码化工作流,基于 DurableTask 引擎在 State Store 中持久化状态,结合补偿与定时器,简化复杂业务的可靠编排。


二、环境与依赖 🛠️

  • .NET 平台:.NET 9,ABP vNext v9.x

  • Dapr 运行时:Dapr CLI ≥1.10;Workflow Runtime v1.15.4

  • NuGet 包

    dotnet add package Dapr.Workflow --version 1.15.4
    
  • State Store 组件 (components/statestore.yaml):

    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:name: statestore
    spec:type: state.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"# 生产环境推荐使用支持事务的后端,如 Azure Cosmos DB 或 SQL Server
    

    (Dapr Docs)

  • 基础设施:Redis / Azure Cosmos DB;Dapr Sidecar


三、系统架构与流程图 🏗️

State_Store
Workflow_Runtime
ABP_App
ScheduleNewWorkflowAsync
Redis/CosmosDB
OrderWorkflow 实例
ReserveInventoryActivity
ChargePaymentActivity
RefundPaymentActivity
ReleaseInventoryActivity
Dapr Sidecar
OrderService API
  • OrderService API 通过 Dapr Sidecar 调用 Workflow 管理 API
  • Workflow Runtime 调度活动并将状态写入 State Store,支持断点重放
  • Saga 补偿:在失败场景通过补偿活动保证最终一致性

四、在 ABP 模块中注册 Dapr Workflows 📦

using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;public class MyAppModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// 可选:显式注册 DaprClientcontext.Services.AddDaprClient();// 一行集成 Dapr Workflows,自动注册 Client 与 Workercontext.Services.AddDaprWorkflow(options =>{options.RegisterWorkflow<OrderWorkflow>();options.RegisterActivity<ReserveInventoryActivity>();options.RegisterActivity<ChargePaymentActivity>();options.RegisterActivity<RefundPaymentActivity>();options.RegisterActivity<ReleaseInventoryActivity>();});}
}

AddDaprWorkflow 会自动注册 DaprWorkflowClientDaprClient(若未注册)及后台 HostedService,无需额外中间件调用 (Dapr Docs)


五、定义 Workflow 与 Activities 🎯

using Dapr.Workflow;
using Dapr.Workflow.Models;

5.1 定义活动(Activity)

继承自 WorkflowActivity<TArg, TResult> 并重写 RunAsync,实现幂等逻辑:

public record PaymentInput(Guid OrderId, decimal Amount);public class ReserveInventoryActivity : WorkflowActivity<Guid, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,Guid orderId){// 调用库存服务,保证幂等return Task.FromResult(true);}
}public class ChargePaymentActivity : WorkflowActivity<PaymentInput, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,PaymentInput input){// 调用支付服务,保证幂等return Task.FromResult(true);}
}

(Diagrid)

5.2 定义工作流(Workflow)

继承自 Workflow<OrderDto, object>,在 RunAsync 中编排活动并处理补偿:

public class OrderWorkflow : Workflow<OrderDto, object>
{public override async Task<object> RunAsync(WorkflowContext context,OrderDto order){var logger = context.CreateReplaySafeLogger<OrderWorkflow>();logger.LogInformation("Order {OrderId} 开始", order.Id);try{await context.CallActivityAsync<bool>(nameof(ReserveInventoryActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ChargePaymentActivity),new PaymentInput(order.Id, order.Amount));}catch (Exception ex){logger.LogWarning(ex, "执行失败,开始补偿");await context.CallActivityAsync<bool>(nameof(RefundPaymentActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ReleaseInventoryActivity),order.Id);throw;}logger.LogInformation("Order {OrderId} 完成", order.Id);return null!;}
}

(Diagrid)


六、触发与查询工作流 🔍

6.1 启动 ABP 应用

dapr run \--app-id order-api \--app-port 5000 \--dapr-http-port 3500 \--components-path ./components \dotnet run

6.2 发起工作流

using Dapr.Workflow;public class OrderAppService : ApplicationService
{public async Task<string> CreateOrderAsync(CreateOrderDto dto){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();string instanceId = Guid.NewGuid().ToString();await client.ScheduleNewWorkflowAsync(workflowName: nameof(OrderWorkflow),instanceId: instanceId,input: dto);return instanceId;}// 新增:查询工作流状态public async Task<WorkflowState> GetWorkflowStateAsync(string instanceId){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();return await client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);}
}

使用 ScheduleNewWorkflowAsync 启动实例 (Dapr Docs)

6.3 暴露查询端点

using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;[ApiController]
[Route("api/workflows")]
public class WorkflowController : ControllerBase
{private readonly DaprWorkflowClient _client;public WorkflowController(DaprWorkflowClient client) => _client = client;[HttpGet("{instanceId}")]public async Task<IActionResult> Get(string instanceId){var state = await _client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);return Ok(state);}
}
curl http://localhost:5000/api/workflows/{instanceId}
  • 返回 JSON 包含 RuntimeStatus、输入输出、历史事件等信息。

七、示例演示 🎬

  1. 基础设施

    docker run -d --name redis -p 6379:6379 redis
    dapr init --runtime-version v1.10
    
  2. 运行应用并发起订单

    curl -X POST http://localhost:5000/api/orders \-H "Content-Type: application/json" \-d '{"productId":"123","quantity":1,"amount":100}'
    
  3. 查询状态

    curl http://localhost:5000/api/workflows/{instanceId}
    
  4. 模拟失败:在 ChargePaymentActivity 抛出异常,验证补偿活动自动执行 💥


八、最佳实践与优化 💡

  • 幂等性:活动内部调用尽量幂等,防止重试产生副作用。
  • 超时与重试:结合 Durable Timers 及 RetryOptions 控制超时与重试。
  • 并行与分支:可在工作流中使用 Task.WhenAll(...) 或动态 CallActivityAsync 实现并行。
  • 版本兼容:升级工作流时,通过前缀或迁移逻辑兼容老实例。
  • 生产环境:推荐使用支持事务回滚的 State Store(Cosmos DB、SQL Server)替代 Redis (Dapr Docs)

http://www.dtcms.com/a/309224.html

相关文章:

  • (AC)唐克的新游戏
  • Vue3中Markdown解析与渲染的完整解决方案:从安全到性能优化
  • PostgreSQL 中删除指定数据库下的所有表结构
  • 微服务的编程测评系统9-竞赛新增-竞赛编辑
  • 如何保护 Redis 实例的安全?
  • 快速排序算法详解与洛谷例题实战
  • 【PHP 构造函数与析构函数:从基础到高级的完整指南】
  • 直播平台中的美白滤镜实现:美颜SDK的核心架构与性能优化指南
  • Qt结合ffmpeg实现图片参数调节/明亮度对比度饱和度设置/滤镜的使用
  • Windows编译安装ffmpeg和sdl
  • CG--逻辑判断1
  • 实战指南:如何将Git仓库中的特定文件夹及其历史完整迁移到另一个仓库
  • Git 各场景使用方法总结
  • java8学习笔记-Stream流
  • 在uni-app中引入本地日志插件
  • 城市数字孪生之GISBox三维顶层重建白皮书
  • 操作系统:共享内存通信(Shared Memory Systems)
  • WAIC 2025再发AI十大展望
  • WaitForSingleObject 函数参数影响及信号处理分析
  • SpringAI智能客服Function Calling兼容性问题解决方案
  • 中国信通院/华为:智能体技术和应用研究报告(2025)(转载)
  • 充电桩与照明“联动”创新:智慧灯杆破解新能源基建难题
  • AntFlow 1.0.0 正式发布:企业级开源工作流引擎,历经一年打磨,全面上线!
  • Nginx配置优先级问题导致静态资源404
  • 新书速览|Python数据分析师成长之路
  • 实战指南|虚拟电厂管理平台搭建全流程解析(一)
  • 谷歌Firebase动态链接将失效:如何选择深度链接替代方案?
  • ccf接口测试实战
  • 机器学习sklearn:编码、哑变量、二值化和分段
  • Implement recovery based on PITR using dump file and binlog