ABP VNext + Marten:事件溯源与 CQRS 全流程实战
ABP VNext + Marten:事件溯源与 CQRS 全流程实战
📚 目录
- ABP VNext + Marten:事件溯源与 CQRS 全流程实战
- 1. 引言
- 2. 环境与依赖
- 3. 系统架构概览
- 4. 在 ABP 中注册 Marten
- 5. 定义领域事件与聚合
- 5.1 事件类
- 5.2 聚合根
- 6. 写入事件与快照
- 7. 读模型投影(Projections)
- 7.1 查询模型
- 7.2 投影实现
- 8. 全量回放与版本兼容
- 9. 查询与 API 暴露
- 9.1 Query Service
- 9.2 Minimal API
- 10. 性能与最佳实践
- 11. 端到端示例结构
1. 引言
-
TL;DR
- 在 ABP VNext 上集成 Marten,实现事件存储与文档数据库一体化
- 写模型侧使用
SnapshotLifecycle.Inline
内联快照,大幅加速聚合重建 - 读模型侧可选
ProjectionLifecycle.Inline
同步投影,或异步模式由 HotCold Daemon 驱动 - 支持显式
StartStream<T>()
开启新聚合、AppendAsync
追加事件、全量回放与版本兼容迁移
-
背景与动机
- 事件溯源(Event Sourcing):将每次状态变更记录为不可变事件,实现可审计与可回放
- CQRS:命令(写)与查询(读)分离,解耦并发与扩展
- Marten:PostgreSQL 原生文档与事件存储扩展,天然支持 JSONB、事务、快照与投影
2. 环境与依赖
-
平台:.NET 8 + ABP VNext 8.x
-
数据库:PostgreSQL ≥ 13(启用
jsonb
) -
NuGet 包:
dotnet add package Marten dotnet add package Marten.AspNetCore dotnet add package Marten.Events.Aggregation # 若需分布式 Saga 集成: dotnet add package Volo.Abp.EventBus.MassTransit
3. 系统架构概览
- 写模型侧:事件持久化与内联快照在同一事务中完成
- 读模型侧:Daemon 异步消费事件并更新查询表
- ApplicationService 与 QueryService 分别处理命令与查询
4. 在 ABP 中注册 Marten
using Marten;
using Marten.Events.Daemon;public override void ConfigureServices(ServiceConfigurationContext context)
{var services = context.Services;var cfg = services.GetConfiguration();services.AddMarten(opts =>{// 1. 连接与对象创建策略opts.Connection(cfg.GetConnectionString("Default"));opts.AutoCreateSchemaObjects = Weasel.Core.AutoCreate.All; // 生产:None// 2. 事件类型映射opts.Events.AddEventTypeMapping<OrderCreated>("OrderCreated");opts.Events.AddEventTypeMapping<OrderItemAdded>("OrderItemAdded");// 3. 写模型:内联快照(加速 AggregateStreamAsync)opts.Events.Snapshot<OrderAggregate>(SnapshotLifecycle.Inline);// 4. 读模型:投影注册opts.Projections.Add<OrderSummaryProjection>(ProjectionLifecycle.Inline);opts.Projections.Add<OrderItemAddedProjection>(ProjectionLifecycle.Inline);// 5. 为查询模型添加索引opts.Schema.For<OrderSummary>().Index(x => x.Id).Index(x => x.TotalQuantity);})// 6. 异步守护:HotCold 模式.AddAsyncDaemon(DaemonMode.HotCold);// 7. 健康检查:监控 Daemon 延迟services.AddHealthChecks().AddMartenAsyncDaemonHealthCheck(maxEventLag: 100);
}
说明:
SnapshotLifecycle.Inline
在每次SaveChanges
时同步持久化最新聚合状态。- 若只需异步投影,可将投影生命周期改为
Async
。 - 索引配置可显著提升查询性能。
5. 定义领域事件与聚合
5.1 事件类
public record OrderCreated(Guid OrderId, Guid CustomerId, DateTime CreatedAt);
public record OrderItemAdded(Guid OrderId, string Sku, int Quantity);
5.2 聚合根
public class OrderAggregate
{private readonly List<object> _pending = new();public Guid Id { get; private set; }public int TotalQuantity { get; private set; }public void Create(Guid id, Guid customerId) =>Enqueue(new OrderCreated(id, customerId, DateTime.UtcNow));public void AddItem(string sku, int qty) =>Enqueue(new OrderItemAdded(Id, sku, qty));private void Apply(OrderCreated e) => Id = e.OrderId;private void Apply(OrderItemAdded e) => TotalQuantity += e.Quantity;private void Enqueue(object @evt){switch (@evt){case OrderCreated oc: Apply(oc); break;case OrderItemAdded oi: Apply(oi); break;}_pending.Add(@evt);}public IReadOnlyList<object> DequeueEvents(){var evts = _pending.ToList();_pending.Clear();return evts;}
}
最佳实践:
- 使用
switch
替代反射,提升性能。 DequeueEvents
清空缓冲,避免重复写入。
6. 写入事件与快照
await using var session = documentStore.LightweightSession();// (1)新聚合:显式 StartStream
await session.Events.StartStream<OrderAggregate>(new OrderCreated(orderId, customerId, DateTime.UtcNow));
await session.SaveChangesAsync(ct);// (2)后续更新:重建 + 追加
var agg = await session.Events.AggregateStreamAsync<OrderAggregate>(orderId, ct)?? new OrderAggregate();agg.AddItem("SKU-001", 3);await session.Events.AppendAsync(orderId, agg.DequeueEvents());
await session.SaveChangesAsync(ct);
快照策略:
Inline
:每次写入同步更新快照,平衡写入延迟与重建性能。EventCount(100)
:按事件计数批量快照,适合超长流场景。
7. 读模型投影(Projections)
7.1 查询模型
public class OrderSummary
{public Guid Id { get; set; }public int TotalQuantity { get; set; }public DateTime CreatedAt { get; set; }
}
7.2 投影实现
public class OrderSummaryProjection : ViewProjection<OrderCreated>
{public override ValueTask ApplyAsync(IDocumentSession s, OrderCreated e, CancellationToken _) =>s.StoreAsync(new OrderSummary{Id = e.OrderId,TotalQuantity = 0,CreatedAt = e.CreatedAt}, _);
}public class OrderItemAddedProjection : ViewProjection<OrderItemAdded>
{public override async ValueTask ApplyAsync(IDocumentSession s, OrderItemAdded e, CancellationToken _){var sum = await s.LoadAsync<OrderSummary>(e.OrderId, _);if (sum is not null){sum.TotalQuantity += e.Quantity;await s.StoreAsync(sum, _);}}
}
一致性模式:
- Inline:同步强一致,写事务内更新查询表。
- Async:延迟一致,由 Daemon 背景消费,适合大规模并发。
8. 全量回放与版本兼容
var events = await session.Events.FetchStreamAsync(orderId, ct);
var replay = new OrderAggregate();foreach (var evt in events)
{switch (evt){case OrderCreated oc:// 若版本迁移需要,可以在此调用 Adaptervar migrated = OrderCreatedAdapter.Adapt(oc);replay.Create(migrated.OrderId, migrated.CustomerId);break;case OrderItemAdded oi:replay.AddItem(oi.Sku, oi.Quantity);break;}
}
- 版本迁移:通过 事件 Adapter 补全字段或转换类型。
- 不可变原则:历史事件永不修改,所有演进通过 Adapter 或新事件。
9. 查询与 API 暴露
9.1 Query Service
public interface IOrderQueryService
{Task<OrderSummary?> GetAsync(Guid id);
}public class OrderQueryService : IOrderQueryService
{private readonly IQuerySession _qs;public OrderQueryService(IQuerySession qs) => _qs = qs;public Task<OrderSummary?> GetAsync(Guid id) =>_qs.LoadAsync<OrderSummary>(id);
}
9.2 Minimal API
app.MapGet("/api/orders/{id:guid}", async (Guid id, IOrderQueryService svc) =>(await svc.GetAsync(id)) is OrderSummary s? Results.Ok(s): Results.NotFound());
10. 性能与最佳实践
-
并发写分区:多租户 + 聚合 ID 混合 StreamId,避免热点集中过载
-
幂等与事务:结合业务唯一键 + PostgreSQL 本地事务,保证写入原子性
-
索引优化:为
OrderSummary.Id
、TotalQuantity
添加数据库索引 -
监控可观测:
- 使用 Prometheus/Grafana 监控事件写入速率与投影延迟
- 利用 Marten Async Daemon 专用健康检查监测滞后
11. 端到端示例结构
DemoProject/
├── modules/
│ └── Order/
│ ├── Domain/
│ │ ├── Aggregates/
│ │ └── Events/
│ ├── Application/
│ │ ├── Services/
│ │ └── Projections/
│ └── EntityFrameworkCore/ # 可选
└── Program.cs # Marten + AsyncDaemon + HealthChecks 配置