当前位置: 首页 > news >正文

ABP VNext + Marten:事件溯源与 CQRS 全流程实战

ABP VNext + Marten:事件溯源与 CQRS 全流程实战


📚 目录

  • ABP VNext + Marten:事件溯源与 CQRS 全流程实战
    • 1. 引言
    • 2. 环境与依赖
    • 3. 系统架构概览
    • 4. 在 ABP 中注册 Marten
    • 5. 定义领域事件与聚合
      • 5.1 事件类
      • 5.2 聚合根
    • 6. 写入事件与快照
    • 7. 读模型投影(Projections)
      • 7.1 查询模型
      • 7.2 投影实现
    • 8. 全量回放与版本兼容
    • 9. 查询与 API 暴露
      • 9.1 Query Service
      • 9.2 Minimal API
    • 10. 性能与最佳实践
    • 11. 端到端示例结构


1. 引言

  • TL;DR

    • 在 ABP VNext 上集成 Marten,实现事件存储与文档数据库一体化
    • 写模型侧使用 SnapshotLifecycle.Inline 内联快照,大幅加速聚合重建
    • 读模型侧可选 ProjectionLifecycle.Inline 同步投影,或异步模式由 HotCold Daemon 驱动
    • 支持显式 StartStream<T>() 开启新聚合、AppendAsync 追加事件、全量回放与版本兼容迁移
  • 背景与动机

    • 事件溯源(Event Sourcing):将每次状态变更记录为不可变事件,实现可审计与可回放
    • CQRS:命令(写)与查询(读)分离,解耦并发与扩展
    • Marten:PostgreSQL 原生文档与事件存储扩展,天然支持 JSONB、事务、快照与投影

2. 环境与依赖

  • 平台:.NET 8 + ABP VNext 8.x

  • 数据库:PostgreSQL ≥ 13(启用 jsonb

  • NuGet 包

    dotnet add package Marten
    dotnet add package Marten.AspNetCore
    dotnet add package Marten.Events.Aggregation
    # 若需分布式 Saga 集成:
    dotnet add package Volo.Abp.EventBus.MassTransit
    

3. 系统架构概览

写模型 WriteSide
Publish Event
Persist & Snapshot
读模型 ReadSide
Fetch Events & Project
ReadModelDb
HotCold Daemon
EventStore
API/ApplicationService
mt_events / mt_snapshots
QueryService
  • 写模型侧:事件持久化与内联快照在同一事务中完成
  • 读模型侧:Daemon 异步消费事件并更新查询表
  • ApplicationService 与 QueryService 分别处理命令与查询

4. 在 ABP 中注册 Marten

using Marten;
using Marten.Events.Daemon;public override void ConfigureServices(ServiceConfigurationContext context)
{var services = context.Services;var cfg = services.GetConfiguration();services.AddMarten(opts =>{// 1. 连接与对象创建策略opts.Connection(cfg.GetConnectionString("Default"));opts.AutoCreateSchemaObjects = Weasel.Core.AutoCreate.All; // 生产:None// 2. 事件类型映射opts.Events.AddEventTypeMapping<OrderCreated>("OrderCreated");opts.Events.AddEventTypeMapping<OrderItemAdded>("OrderItemAdded");// 3. 写模型:内联快照(加速 AggregateStreamAsync)opts.Events.Snapshot<OrderAggregate>(SnapshotLifecycle.Inline);// 4. 读模型:投影注册opts.Projections.Add<OrderSummaryProjection>(ProjectionLifecycle.Inline);opts.Projections.Add<OrderItemAddedProjection>(ProjectionLifecycle.Inline);// 5. 为查询模型添加索引opts.Schema.For<OrderSummary>().Index(x => x.Id).Index(x => x.TotalQuantity);})// 6. 异步守护:HotCold 模式.AddAsyncDaemon(DaemonMode.HotCold);// 7. 健康检查:监控 Daemon 延迟services.AddHealthChecks().AddMartenAsyncDaemonHealthCheck(maxEventLag: 100);
}

说明:

  • SnapshotLifecycle.Inline 在每次 SaveChanges 时同步持久化最新聚合状态。
  • 若只需异步投影,可将投影生命周期改为 Async
  • 索引配置可显著提升查询性能。

5. 定义领域事件与聚合

5.1 事件类

public record OrderCreated(Guid OrderId, Guid CustomerId, DateTime CreatedAt);
public record OrderItemAdded(Guid OrderId, string Sku, int Quantity);

5.2 聚合根

public class OrderAggregate
{private readonly List<object> _pending = new();public Guid Id { get; private set; }public int TotalQuantity { get; private set; }public void Create(Guid id, Guid customerId) =>Enqueue(new OrderCreated(id, customerId, DateTime.UtcNow));public void AddItem(string sku, int qty) =>Enqueue(new OrderItemAdded(Id, sku, qty));private void Apply(OrderCreated e)   => Id = e.OrderId;private void Apply(OrderItemAdded e) => TotalQuantity += e.Quantity;private void Enqueue(object @evt){switch (@evt){case OrderCreated oc:    Apply(oc);    break;case OrderItemAdded oi:  Apply(oi);    break;}_pending.Add(@evt);}public IReadOnlyList<object> DequeueEvents(){var evts = _pending.ToList();_pending.Clear();return evts;}
}

最佳实践:

  • 使用 switch 替代反射,提升性能。
  • DequeueEvents 清空缓冲,避免重复写入。

6. 写入事件与快照

await using var session = documentStore.LightweightSession();// (1)新聚合:显式 StartStream
await session.Events.StartStream<OrderAggregate>(new OrderCreated(orderId, customerId, DateTime.UtcNow));
await session.SaveChangesAsync(ct);// (2)后续更新:重建 + 追加
var agg = await session.Events.AggregateStreamAsync<OrderAggregate>(orderId, ct)?? new OrderAggregate();agg.AddItem("SKU-001", 3);await session.Events.AppendAsync(orderId, agg.DequeueEvents());
await session.SaveChangesAsync(ct);
ApplicationServiceMartenEventStorePostgreSQLSnapshotStoreStartStream / AppendAsyncPersist mt_events & mt_snapshots (Inline)Commit TransactionApplicationServiceMartenEventStorePostgreSQLSnapshotStore

快照策略:

  • Inline:每次写入同步更新快照,平衡写入延迟与重建性能。
  • EventCount(100):按事件计数批量快照,适合超长流场景。

7. 读模型投影(Projections)

7.1 查询模型

public class OrderSummary
{public Guid Id { get; set; }public int TotalQuantity { get; set; }public DateTime CreatedAt { get; set; }
}

7.2 投影实现

public class OrderSummaryProjection : ViewProjection<OrderCreated>
{public override ValueTask ApplyAsync(IDocumentSession s, OrderCreated e, CancellationToken _) =>s.StoreAsync(new OrderSummary{Id = e.OrderId,TotalQuantity = 0,CreatedAt = e.CreatedAt}, _);
}public class OrderItemAddedProjection : ViewProjection<OrderItemAdded>
{public override async ValueTask ApplyAsync(IDocumentSession s, OrderItemAdded e, CancellationToken _){var sum = await s.LoadAsync<OrderSummary>(e.OrderId, _);if (sum is not null){sum.TotalQuantity += e.Quantity;await s.StoreAsync(sum, _);}}
}
HotCold DaemonMartenEventStoreReadModelDbFetch Unprocessed EventsDeliver EventStore Projection & Update HighWaterMarkloop[for each event]HotCold DaemonMartenEventStoreReadModelDb

一致性模式:

  • Inline:同步强一致,写事务内更新查询表。
  • Async:延迟一致,由 Daemon 背景消费,适合大规模并发。

8. 全量回放与版本兼容

var events = await session.Events.FetchStreamAsync(orderId, ct);
var replay = new OrderAggregate();foreach (var evt in events)
{switch (evt){case OrderCreated oc:// 若版本迁移需要,可以在此调用 Adaptervar migrated = OrderCreatedAdapter.Adapt(oc);replay.Create(migrated.OrderId, migrated.CustomerId);break;case OrderItemAdded oi:replay.AddItem(oi.Sku, oi.Quantity);break;}
}
Yes
No
Fetch mt_events
Iterate Events
Is Old Version?
Adapter -> New Event
Use Raw Event
Apply to Aggregate
  • 版本迁移:通过 事件 Adapter 补全字段或转换类型。
  • 不可变原则:历史事件永不修改,所有演进通过 Adapter 或新事件。

9. 查询与 API 暴露

9.1 Query Service

public interface IOrderQueryService
{Task<OrderSummary?> GetAsync(Guid id);
}public class OrderQueryService : IOrderQueryService
{private readonly IQuerySession _qs;public OrderQueryService(IQuerySession qs) => _qs = qs;public Task<OrderSummary?> GetAsync(Guid id) =>_qs.LoadAsync<OrderSummary>(id);
}

9.2 Minimal API

app.MapGet("/api/orders/{id:guid}", async (Guid id, IOrderQueryService svc) =>(await svc.GetAsync(id)) is OrderSummary s? Results.Ok(s): Results.NotFound());

10. 性能与最佳实践

  • 并发写分区:多租户 + 聚合 ID 混合 StreamId,避免热点集中过载

  • 幂等与事务:结合业务唯一键 + PostgreSQL 本地事务,保证写入原子性

  • 索引优化:为 OrderSummary.IdTotalQuantity 添加数据库索引

  • 监控可观测

    • 使用 Prometheus/Grafana 监控事件写入速率与投影延迟
    • 利用 Marten Async Daemon 专用健康检查监测滞后

11. 端到端示例结构

DemoProject/
├── modules/
│   └── Order/
│       ├── Domain/
│       │   ├── Aggregates/
│       │   └── Events/
│       ├── Application/
│       │   ├── Services/
│       │   └── Projections/
│       └── EntityFrameworkCore/  # 可选
└── Program.cs                   # Marten + AsyncDaemon + HealthChecks 配置

http://www.dtcms.com/a/271187.html

相关文章:

  • Amazon SageMaker 部署 AIGC 应用:训练 - 优化 - 部署 - Web 前端集成应用实践
  • 解决IDEA缺少Add Framework Support选项的可行性方案
  • ObjectClear - 图像处理新革命,一键“抹除”图像中任意物体与阴影 支持50系显卡 一键整合包下载
  • 响应式原理二:响应式依赖收集
  • 前端进阶之路-从传统前端到VUE-JS(第四期-VUE-JS页面布局与动态内容实现)(Element Plus方式)
  • Higress 上架 KubeSphere Marketplace,助力企业构建云原生流量入口
  • 海信IP501H_GK6323处理器免拆卡刷包和线刷救砖包_当贝纯净版
  • 类模板的语法
  • 计算机网络实验——网线的制作和测试
  • 网安-SSRF-pikachu
  • RNN及其变体的概念和案例
  • Vue响应式原理一:认识响应式逻辑
  • python 为什么推荐使用虚拟环境(如 venv)?它解决了什么问题?
  • doris2.1.8连接报错ERROR 1203 (42000): Reach limit of connections解决办法
  • 使用 Docker Compose 简化 INFINI Console 与 Easysearch 环境搭建
  • Oracle:使用ONLINE选项创建索引
  • 【内核基础精讲】I2C 子系统核心概念与结构全解析
  • 类与对象【下篇】-- 关于类的其它语法
  • 蓝凌EKP产品:属性转换器系统优化
  • c语言学习_函数递归2
  • 70、【OS】【Nuttx】【构建】配置 stm32 工程
  • STM32继电器万能控制设备
  • 【04】MFC入门到精通——MFC 自己手动新添加对话框模板 并 创建对话框类
  • SpringBoot集成文件 - 大文件的上传(异步,分片,断点续传和秒传)
  • 数据结构基础准备:包装类 泛型 泛型的上界 密封类
  • 零知开源——STM32F407VET6驱动SHT41温湿度传感器完整教程
  • 2023年全国青少年信息素养大赛Python编程小学组复赛真题+答案解析-北京赛区
  • idea 常用快捷键
  • Mysql中的日志-undo/redo/binlog详解
  • 学习open62541 --- [79] 在docker中运行open62541工程