ABP VNext + Temporal:分布式工作流与 Saga
ABP VNext + Temporal:分布式工作流与 Saga 🚀
📚 目录
- ABP VNext + Temporal:分布式工作流与 Saga 🚀
- TL;DR
- 1. 环境与依赖 🛠️
- 2. 系统架构概览 📊
- 3. 接入 Temporal 客户端 & OpenTelemetry 🌐
- 4. 定义 Workflow 与 Activities ✍️
- 4.1 Workflow 接口
- 4.2 Activities 接口与实现
- 5. Worker 宿主托管 & DI 映射 🔧
- 6. Workflow 实现:补偿、重试/超时与 Continue-As-New ⏱️
- 7. 启动 Workflow:同步结果 & Fire-and-Forget 🔄
- 8. Patch API(版本管理)示例 🧩
- 9. 保持 Workflow 确定性 ⚠️
- 10. 本地单元测试示例 📦
- 11. 监控与可观测 👁️
TL;DR
- 使用 Temporal .NET SDK
TemporalClient.ConnectAsync
配合Temporalio.Extensions.Hosting
插件,在 ABP 中注册并注入客户端 🎯 - 在 Workflow 接口 上加
[Workflow]
,在入口方法上加[WorkflowRun]
,在活动方法上加[Activity]
,符合 SDK 要求 ✅ - 通过
ITemporalClient.ExecuteWorkflowAsync<bool>
启动并获取布尔结果,或用StartWorkflowAsync
Fire-and-Forget 🔄 - 在
ActivityOptions
中配置RetryOptions
与TimeoutOptions
,并示范 Patch API、OpenTelemetry 拦截器、Continue-As-New、子 Workflow 补偿及本地测试等生产级细节 🛠️
1. 环境与依赖 🛠️
-
平台:.NET 6 + ABP VNext 6.x
-
NuGet 包:
Temporalio.Client
、Temporalio.Worker
(核心 SDK)Temporalio.Extensions.Hosting
(DI 与 Worker 托管扩展)Temporalio.Extensions.OpenTelemetry
(OpenTelemetry 支持)Volo.Abp.BackgroundJobs.Abstractions
(ABP 后台作业)
确保在
Program.cs
或模块的ConfigureServices
中添加以上包与引用。
2. 系统架构概览 📊
3. 接入 Temporal 客户端 & OpenTelemetry 🌐
// Program.cs 或 Module.ConfigureServices
using Temporalio.Client;
using Temporalio.Extensions.Hosting;
using Temporalio.Extensions.OpenTelemetry;public override void ConfigureServices(ServiceConfigurationContext context)
{var conf = context.Services.GetConfiguration();// 注册 Temporal Client(Singleton)context.Services.AddTemporalClient(options =>{options.TargetHost = conf["Temporal:Host"]; // e.g. "localhost:7233"options.Namespace = conf["Temporal:Namespace"]; // e.g. "default"})// 注入 OpenTelemetry 拦截器.Configure<TemporalClientConnectOptions>(o =>{o.Interceptors = new[] { new TracingInterceptor() };});
}
4. 定义 Workflow 与 Activities ✍️
4.1 Workflow 接口
using Temporalio.Workflows;[Workflow] // 类型级标记
public interface IOrderWorkflow
{[WorkflowRun] // 入口方法Task<bool> RunAsync(Guid orderId, decimal amount);
}
- 仅在接口(或类)上使用
[Workflow]
,入口方法标注[WorkflowRun]
,返回Task<T>
。
4.2 Activities 接口与实现
using Temporalio.Activities;public interface IOrderActivities
{[Activity] Task ReserveInventory(Guid orderId);[Activity] Task ChargePayment(Guid orderId, decimal amount);[Activity] Task RefundPayment(Guid orderId);[Activity] Task ReleaseInventory(Guid orderId);
}public class OrderActivities : IOrderActivities
{private readonly IInventoryRepository _inv;private readonly IPaymentService _pay;public OrderActivities(IInventoryRepository inv, IPaymentService pay)=> (_inv, _pay) = (inv, pay);public Task ReserveInventory(Guid orderId)=> _inv.ReserveAsync(orderId);public Task ChargePayment(Guid orderId, decimal amount)=> _pay.ChargeAsync(orderId, amount);public Task RefundPayment(Guid orderId)=> _pay.RefundAsync(orderId);public Task ReleaseInventory(Guid orderId)=> _inv.ReleaseAsync(orderId);
}
5. Worker 宿主托管 & DI 映射 🔧
// Program.cs
using Temporalio.Extensions.Hosting;builder.Services.AddScoped<IOrderActivities, OrderActivities>(); // 明确映射接口
builder.Services.AddHostedTemporalWorker(builder.Configuration["Temporal:Host"],builder.Configuration["Temporal:Namespace"],"order-queue").AddScopedActivities<OrderActivities>() // 注册活动实现.AddWorkflow<IOrderWorkflow, OrderWorkflow>(); // 注册 Workflow
- 必须先
AddScoped<IOrderActivities, OrderActivities>()
,再AddScopedActivities<OrderActivities>()
。 AddHostedTemporalWorker
会将 Worker 与 ASP.NET 生命周期绑定。
6. Workflow 实现:补偿、重试/超时与 Continue-As-New ⏱️
using Temporalio.Workflows;public class OrderWorkflow : IOrderWorkflow
{private readonly IOrderActivities _act;public OrderWorkflow(IOrderActivities act) => _act = act;public async Task<bool> RunAsync(Guid orderId, decimal amount){// 活动选项(含超时与重试)var actOpts = new ActivityOptions{TaskQueue = "order-queue",StartToCloseTimeout = TimeSpan.FromMinutes(1),RetryOptions = new RetryOptions{MaximumAttempts = 3,InitialInterval = TimeSpan.FromSeconds(5),BackoffCoefficient = 2,MaximumInterval = TimeSpan.FromMinutes(1),}};// 工作流选项示例(在调用端传入)// var wfOpts = new WorkflowOptions// {// WorkflowId = orderId.ToString(),// TaskQueue = "order-queue",// WorkflowExecutionTimeout = TimeSpan.FromHours(24),// WorkflowRunTimeout = TimeSpan.FromHours(1),// WorkflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,// RetryPolicy = new() { MaximumAttempts = 1 }// };try{await Workflow.ExecuteActivityAsync(() => _act.ReserveInventory(orderId), actOpts);await Workflow.ExecuteActivityAsync(() => _act.ChargePayment(orderId, amount), actOpts);// 对于非常长流程,可 Continue-As-New 重置历史// if (needContinue)// await Workflow.ContinueAsNewAsync(orderId, amount);return true;}catch{// 逆序补偿(或使用子 Workflow)await Workflow.ExecuteActivityAsync(() => _act.RefundPayment(orderId),new() { TaskQueue = "order-queue" });await Workflow.ExecuteActivityAsync(() => _act.ReleaseInventory(orderId),new() { TaskQueue = "order-queue" });// 高级:也可执行子 Workflow// await Workflow.ExecuteChildWorkflowAsync<ICompensationWorkflow>(// cw => cw.RunAsync(orderId),// new() { TaskQueue = "order-queue" });return false;}}
}
7. 启动 Workflow:同步结果 & Fire-and-Forget 🔄
public class OrderAppService : ApplicationService
{private readonly ITemporalClient _client;public OrderAppService(ITemporalClient client) => _client = client;// 同步获取布尔结果public async Task<Guid> CreateOrderAsync(CreateOrderDto dto){var orderId = Guid.NewGuid();var success = await _client.ExecuteWorkflowAsync<bool>(wf => wf.RunAsync(orderId, dto.Amount),new WorkflowOptions{WorkflowId = orderId.ToString(),TaskQueue = "order-queue",WorkflowExecutionTimeout = TimeSpan.FromHours(24),WorkflowRunTimeout = TimeSpan.FromHours(1),WorkflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,RetryPolicy = new() { MaximumAttempts = 1 }});return orderId;}// Fire-and-Forgetpublic Task<Guid> CreateOrderFireAndForgetAsync(CreateOrderDto dto){var orderId = Guid.NewGuid();_ = _client.StartWorkflowAsync((IOrderWorkflow wf) => wf.RunAsync(orderId, dto.Amount),new WorkflowOptions{WorkflowId = orderId.ToString(),TaskQueue = "order-queue"});return Task.FromResult(orderId);}
}
8. Patch API(版本管理)示例 🧩
var handle = _client.GetWorkflowHandle(orderId.ToString());
await handle.PatchAsync(new WorkflowPatchingOptions(),patch =>{patch.MigrationCallback = () =>{// 新版本逻辑,例如增加新 Activity 调用};}
);
9. 保持 Workflow 确定性 ⚠️
- 禁止在 Workflow 代码中使用
Task.Run
、DateTime.Now
、Guid.NewGuid()
(用 Workflow 提供的 ID)、随机数、外部 I/O 等。 - 必须只依赖
Workflow
API、Workflow.Now
、参数、活动调用,确保重放时行为一致。
10. 本地单元测试示例 📦
using Temporalio.Testing;
using Xunit;public class OrderWorkflowTests
{[Fact]public async Task RunAsync_SuccessfulPath(){await using var env = await TestWorkflowEnvironment.CreateAsync();var worker = env.NewWorker("order-queue", w =>w.AddWorkflow<OrderWorkflow>().AddActivityImplementation(new OrderActivities(/* mocks */)));await worker.StartAsync();var client = env.GetTestWorkflowClient();var handle = client.GetWorkflowHandle("test-order", TaskQueue: "order-queue");var result = await handle.ExecuteAsync<bool>(wf => wf.RunAsync(Guid.Parse("test-order"), 100m));Assert.True(result);}
}
11. 监控与可观测 👁️
- Temporal Web UI:实时查看 Workflow 实例及历史事件
- OpenTelemetry:通过
TracingInterceptor
导出 Span 至 Jaeger/Prometheus - ABP 管理后台:可将 Workflow 状态同步到实体表并展示
参考资料
- Temporal .NET SDK 属性与 API 文档
- Temporalio.Extensions.Hosting 快速入门(NuGet)
- Temporalio.Extensions.OpenTelemetry 文档(NuGet)
- ABP.IO 背景作业概览