当前位置: 首页 > news >正文

ABP VNext + Temporal:分布式工作流与 Saga

ABP VNext + Temporal:分布式工作流与 Saga 🚀


📚 目录

  • ABP VNext + Temporal:分布式工作流与 Saga 🚀
    • TL;DR
    • 1. 环境与依赖 🛠️
    • 2. 系统架构概览 📊
    • 3. 接入 Temporal 客户端 & OpenTelemetry 🌐
    • 4. 定义 Workflow 与 Activities ✍️
      • 4.1 Workflow 接口
      • 4.2 Activities 接口与实现
    • 5. Worker 宿主托管 & DI 映射 🔧
    • 6. Workflow 实现:补偿、重试/超时与 Continue-As-New ⏱️
    • 7. 启动 Workflow:同步结果 & Fire-and-Forget 🔄
    • 8. Patch API(版本管理)示例 🧩
    • 9. 保持 Workflow 确定性 ⚠️
    • 10. 本地单元测试示例 📦
    • 11. 监控与可观测 👁️


TL;DR

  • 使用 Temporal .NET SDK TemporalClient.ConnectAsync 配合 Temporalio.Extensions.Hosting 插件,在 ABP 中注册并注入客户端 🎯
  • Workflow 接口 上加 [Workflow],在入口方法上加 [WorkflowRun],在活动方法上加 [Activity],符合 SDK 要求 ✅
  • 通过 ITemporalClient.ExecuteWorkflowAsync<bool> 启动并获取布尔结果,或用 StartWorkflowAsync Fire-and-Forget 🔄
  • ActivityOptions 中配置 RetryOptionsTimeoutOptions,并示范 Patch API、OpenTelemetry 拦截器、Continue-As-New、子 Workflow 补偿及本地测试等生产级细节 🛠️

1. 环境与依赖 🛠️

  • 平台:.NET 6 + ABP VNext 6.x

  • NuGet 包

    • Temporalio.ClientTemporalio.Worker(核心 SDK)
    • Temporalio.Extensions.Hosting(DI 与 Worker 托管扩展)
    • Temporalio.Extensions.OpenTelemetry(OpenTelemetry 支持)
    • Volo.Abp.BackgroundJobs.Abstractions(ABP 后台作业)

确保在 Program.cs 或模块的 ConfigureServices 中添加以上包与引用。


2. 系统架构概览 📊

Worker 宿主
Temporal 平台
ABP 应用
启动 Workflow
状态同步/补偿
失败触发
失败触发
拉取并执行
监控
Temporal Web UI
ABP 管理后台
Temporal Worker
Workflow 实例
Activity: ReserveInventory
Activity: ChargePayment
Activity: RefundPayment
Activity: ReleaseInventory
Temporal 客户端
OrderAppService
自定义作业
ABP 后台作业

3. 接入 Temporal 客户端 & OpenTelemetry 🌐

// Program.cs 或 Module.ConfigureServices
using Temporalio.Client;
using Temporalio.Extensions.Hosting;
using Temporalio.Extensions.OpenTelemetry;public override void ConfigureServices(ServiceConfigurationContext context)
{var conf = context.Services.GetConfiguration();// 注册 Temporal Client(Singleton)context.Services.AddTemporalClient(options =>{options.TargetHost = conf["Temporal:Host"];      // e.g. "localhost:7233"options.Namespace  = conf["Temporal:Namespace"]; // e.g. "default"})// 注入 OpenTelemetry 拦截器.Configure<TemporalClientConnectOptions>(o =>{o.Interceptors = new[] { new TracingInterceptor() };});
}

4. 定义 Workflow 与 Activities ✍️

4.1 Workflow 接口

using Temporalio.Workflows;[Workflow]  // 类型级标记
public interface IOrderWorkflow
{[WorkflowRun]  // 入口方法Task<bool> RunAsync(Guid orderId, decimal amount);
}
  • 仅在接口(或类)上使用 [Workflow],入口方法标注 [WorkflowRun],返回 Task<T>

4.2 Activities 接口与实现

using Temporalio.Activities;public interface IOrderActivities
{[Activity] Task ReserveInventory(Guid orderId);[Activity] Task ChargePayment(Guid orderId, decimal amount);[Activity] Task RefundPayment(Guid orderId);[Activity] Task ReleaseInventory(Guid orderId);
}public class OrderActivities : IOrderActivities
{private readonly IInventoryRepository _inv;private readonly IPaymentService      _pay;public OrderActivities(IInventoryRepository inv, IPaymentService pay)=> (_inv, _pay) = (inv, pay);public Task ReserveInventory(Guid orderId)=> _inv.ReserveAsync(orderId);public Task ChargePayment(Guid orderId, decimal amount)=> _pay.ChargeAsync(orderId, amount);public Task RefundPayment(Guid orderId)=> _pay.RefundAsync(orderId);public Task ReleaseInventory(Guid orderId)=> _inv.ReleaseAsync(orderId);
}

5. Worker 宿主托管 & DI 映射 🔧

// Program.cs
using Temporalio.Extensions.Hosting;builder.Services.AddScoped<IOrderActivities, OrderActivities>();  // 明确映射接口
builder.Services.AddHostedTemporalWorker(builder.Configuration["Temporal:Host"],builder.Configuration["Temporal:Namespace"],"order-queue").AddScopedActivities<OrderActivities>()           // 注册活动实现.AddWorkflow<IOrderWorkflow, OrderWorkflow>();     // 注册 Workflow
  • 必须AddScoped<IOrderActivities, OrderActivities>(),再 AddScopedActivities<OrderActivities>()
  • AddHostedTemporalWorker 会将 Worker 与 ASP.NET 生命周期绑定。

6. Workflow 实现:补偿、重试/超时与 Continue-As-New ⏱️

using Temporalio.Workflows;public class OrderWorkflow : IOrderWorkflow
{private readonly IOrderActivities _act;public OrderWorkflow(IOrderActivities act) => _act = act;public async Task<bool> RunAsync(Guid orderId, decimal amount){// 活动选项(含超时与重试)var actOpts = new ActivityOptions{TaskQueue           = "order-queue",StartToCloseTimeout = TimeSpan.FromMinutes(1),RetryOptions = new RetryOptions{MaximumAttempts    = 3,InitialInterval    = TimeSpan.FromSeconds(5),BackoffCoefficient = 2,MaximumInterval    = TimeSpan.FromMinutes(1),}};// 工作流选项示例(在调用端传入)// var wfOpts = new WorkflowOptions// {//     WorkflowId               = orderId.ToString(),//     TaskQueue                = "order-queue",//     WorkflowExecutionTimeout = TimeSpan.FromHours(24),//     WorkflowRunTimeout       = TimeSpan.FromHours(1),//     WorkflowIdReusePolicy    = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,//     RetryPolicy              = new() { MaximumAttempts = 1 }// };try{await Workflow.ExecuteActivityAsync(() => _act.ReserveInventory(orderId), actOpts);await Workflow.ExecuteActivityAsync(() => _act.ChargePayment(orderId, amount), actOpts);// 对于非常长流程,可 Continue-As-New 重置历史// if (needContinue)//     await Workflow.ContinueAsNewAsync(orderId, amount);return true;}catch{// 逆序补偿(或使用子 Workflow)await Workflow.ExecuteActivityAsync(() => _act.RefundPayment(orderId),new() { TaskQueue = "order-queue" });await Workflow.ExecuteActivityAsync(() => _act.ReleaseInventory(orderId),new() { TaskQueue = "order-queue" });// 高级:也可执行子 Workflow// await Workflow.ExecuteChildWorkflowAsync<ICompensationWorkflow>(//     cw => cw.RunAsync(orderId),//     new() { TaskQueue = "order-queue" });return false;}}
}

7. 启动 Workflow:同步结果 & Fire-and-Forget 🔄

public class OrderAppService : ApplicationService
{private readonly ITemporalClient _client;public OrderAppService(ITemporalClient client) => _client = client;// 同步获取布尔结果public async Task<Guid> CreateOrderAsync(CreateOrderDto dto){var orderId = Guid.NewGuid();var success = await _client.ExecuteWorkflowAsync<bool>(wf => wf.RunAsync(orderId, dto.Amount),new WorkflowOptions{WorkflowId               = orderId.ToString(),TaskQueue                = "order-queue",WorkflowExecutionTimeout = TimeSpan.FromHours(24),WorkflowRunTimeout       = TimeSpan.FromHours(1),WorkflowIdReusePolicy    = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,RetryPolicy              = new() { MaximumAttempts = 1 }});return orderId;}// Fire-and-Forgetpublic Task<Guid> CreateOrderFireAndForgetAsync(CreateOrderDto dto){var orderId = Guid.NewGuid();_ = _client.StartWorkflowAsync((IOrderWorkflow wf) => wf.RunAsync(orderId, dto.Amount),new WorkflowOptions{WorkflowId = orderId.ToString(),TaskQueue  = "order-queue"});return Task.FromResult(orderId);}
}

8. Patch API(版本管理)示例 🧩

var handle = _client.GetWorkflowHandle(orderId.ToString());
await handle.PatchAsync(new WorkflowPatchingOptions(),patch =>{patch.MigrationCallback = () =>{// 新版本逻辑,例如增加新 Activity 调用};}
);

9. 保持 Workflow 确定性 ⚠️

  • 禁止在 Workflow 代码中使用 Task.RunDateTime.NowGuid.NewGuid()(用 Workflow 提供的 ID)、随机数、外部 I/O 等。
  • 必须只依赖 Workflow API、Workflow.Now、参数、活动调用,确保重放时行为一致。

10. 本地单元测试示例 📦

using Temporalio.Testing;
using Xunit;public class OrderWorkflowTests
{[Fact]public async Task RunAsync_SuccessfulPath(){await using var env = await TestWorkflowEnvironment.CreateAsync();var worker = env.NewWorker("order-queue", w =>w.AddWorkflow<OrderWorkflow>().AddActivityImplementation(new OrderActivities(/* mocks */)));await worker.StartAsync();var client = env.GetTestWorkflowClient();var handle = client.GetWorkflowHandle("test-order", TaskQueue: "order-queue");var result = await handle.ExecuteAsync<bool>(wf => wf.RunAsync(Guid.Parse("test-order"), 100m));Assert.True(result);}
}

11. 监控与可观测 👁️

  • Temporal Web UI:实时查看 Workflow 实例及历史事件
  • OpenTelemetry:通过 TracingInterceptor 导出 Span 至 Jaeger/Prometheus
  • ABP 管理后台:可将 Workflow 状态同步到实体表并展示

参考资料

  • Temporal .NET SDK 属性与 API 文档
  • Temporalio.Extensions.Hosting 快速入门(NuGet)
  • Temporalio.Extensions.OpenTelemetry 文档(NuGet)
  • ABP.IO 背景作业概览
http://www.dtcms.com/a/287246.html

相关文章:

  • 当OT遇见IT:Apache IoTDB如何用“时序空间一体化“破解工业物联网数据孤岛困局
  • 时序数据库选型实战:Apache IoTDB技术深度解析
  • Bicep入门篇
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘pillow’问题
  • C/C++---文件读取
  • kotlin部分常用特性总结
  • Node.js net.Socket.destroy()深入解析
  • 海思3516cv610 NPU学习
  • 【C语言进阶】题目练习(3)
  • kafka--基础知识点--6.1--LEO、HW、LW
  • Validation - Spring Boot项目中参数检验的利器
  • web.m3u8流媒体视频处理
  • Flutter基础(前端教程①③-单例)
  • 定时器与间歇函数
  • Web3.0与元宇宙:区块链驱动的数字新生态解析
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - snowNLP库实现中文情感分析
  • 如何增强LLM(大语言模型)的“置信度”和“自信心” :LLM的“自信”不是“什么都能答”,而是“该答的答得准,不该答的敢说不”。
  • 【unity游戏开发入门到精通——3D篇】3D光源之——unity使用Lens Flare (SRP) 组件实现太阳耀斑镜头光晕效果
  • 《Origin画百图》之多分类矩阵散点图
  • 2025最新版 Go语言Goland 专业安装及配置(超详细)
  • 华为仓颉编程语言语法简介与示例
  • 从0开始学习R语言--Day51--PH检验
  • 操作系统-分布式同步
  • 【REACT18.x】creat-react-app在添加eslint时报错Environment key “jest/globals“ is unknown
  • Spring AI 项目实战(十九):Spring Boot + AI + Vue3 + OSS + DashScope 构建多模态视觉理解平台(附完整源码)
  • 在 .NET Core 中创建 Web Socket API
  • Redis 如何保证高并发与高可用
  • Elasticsearch 重命名索引
  • OllyDbg技巧学习
  • Go-Redis × 向量检索实战用 HNSW 在 Redis 中索引与查询文本 Embedding(Hash JSON 双版本)