当前位置: 首页 > news >正文

基于 ABP vNext + CQRS + MediatR 构建高可用与高性能微服务系统:从架构设计到落地实战

🧠 基于 ABP vNext + CQRS + MediatR 构建高可用与高性能微服务系统:从架构设计到落地实战


目录

  • 🧠 基于 ABP vNext + CQRS + MediatR 构建高可用与高性能微服务系统:从架构设计到落地实战
    • 🧰 模块结构概览
    • 📦 各模块注册代码示例
      • DomainModule
      • ApplicationModule
      • HttpApiModule
      • DbMigratorModule
    • 🧱 架构图 & 概念说明
    • ✍️ 核心代码实践
      • 1. 聚合根 & 仓储接口
      • 2. CacheKeys 常量
      • 3. 命令 + 验证器 + 幂等处理
      • 4. Pipeline 行为示意图
      • 5. ValidationBehavior + LoggingBehavior
      • 6. 查询缓存 + 布隆过滤器
      • 部署架构示意图
    • 🧪 自动化测试示例(Testcontainers)
      • 测试流程图
    • 📦 Docker Compose


🧰 模块结构概览

Abp.CqrsDemo
├── src
│   ├── Abp.CqrsDemo.Domain
│   ├── Abp.CqrsDemo.Application
│   ├── Abp.CqrsDemo.HttpApi
│   └── Abp.CqrsDemo.DbMigrator
└── tests└── Abp.CqrsDemo.Tests
  • Domain:实体、聚合根、领域事件、仓储接口
  • Application:命令、查询、MediatR 处理器、DTO、Pipeline Behaviors、CacheKeys
  • HttpApi:控制器、外部配置绑定
  • DbMigrator:数据库初始化与迁移工具

📦 各模块注册代码示例

DomainModule

[DependsOn(typeof(AbpDddDomainModule))]
public class CqrsDemoDomainModule : AbpModule
{
}

ApplicationModule

[DependsOn(typeof(CqrsDemoDomainModule))]
public class CqrsDemoApplicationModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// CQRS & Validationcontext.Services.AddMediatR(typeof(CqrsDemoApplicationModule).Assembly);context.Services.AddValidatorsFromAssembly(typeof(CqrsDemoApplicationModule).Assembly);// Pipeline Behaviorscontext.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));context.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));// 分布式缓存注册(Application 层即可依赖)context.Services.AddDistributedCache<OrderDto>();}
}

HttpApiModule

[DependsOn(typeof(CqrsDemoApplicationModule))]
public class CqrsDemoHttpApiModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var configuration = context.Services.GetConfiguration();// Redis 连接与缓存context.Services.AddStackExchangeRedisCache(options =>{options.Configuration = configuration["Redis:Connection"];});// 控制器托管context.Services.AddControllers();}
}

DbMigratorModule

[DependsOn(typeof(AbpEntityFrameworkCoreModule))]
public class CqrsDemoDbMigratorModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){context.Services.AddAbpDbContext<CqrsDemoDbContext>(options =>{options.AddDefaultRepositories(includeAllEntities: true);});// 设置 EF Core 数据库提供者Configure<AbpDbContextOptions>(options =>{options.UseSqlServer(); // 或 UseNpgsql()});}
}

🧱 架构图 & 概念说明

Command/Query
API 控制器
MediatR Dispatcher
CommandHandler
QueryHandler
EFCore 写库
Redis 查询缓存
  • 命令 负责状态更改
  • 查询 可接入 Redis 或 读库
  • Pipeline:Validation → Logging → Handler

✍️ 核心代码实践

1. 聚合根 & 仓储接口

public class Order : AggregateRoot<Guid>
{public string ProductId { get; private set; }public int Quantity { get; private set; }protected Order() { } // EF Core 兼容public Order(Guid id, string productId, int quantity){Id = id;ProductId = productId;Quantity = quantity;}// 业务规则示例public void ChangeQuantity(int newQty){if (newQty <= 0) throw new BusinessException("Quantity must be positive");Quantity = newQty;}
}public interface IOrderRepository : IRepository<Order, Guid>
{
}

2. CacheKeys 常量

public static class CacheKeys
{public const string OrderById = "order:{0}";
}

3. 命令 + 验证器 + 幂等处理

public record CreateOrderCommand(Guid OrderId, string ProductId, int Quantity) : IRequest<Guid>;public class CreateOrderValidator : AbstractValidator<CreateOrderCommand>
{public CreateOrderValidator(){RuleFor(x => x.ProductId).NotEmpty();RuleFor(x => x.Quantity).GreaterThan(0);}
}public class CreateOrderHandler : IRequestHandler<CreateOrderCommand, Guid>
{private readonly IOrderRepository _repo;private readonly IDistributedIdempotencyService _idempotency;public CreateOrderHandler(IOrderRepository repo, IDistributedIdempotencyService idempotency){_repo = repo;_idempotency = idempotency;}public async Task<Guid> Handle(CreateOrderCommand request, CancellationToken ct){return await _idempotency.ExecuteAsync(request.OrderId.ToString(), async () =>{var order = new Order(request.OrderId, request.ProductId, request.Quantity);await _repo.InsertAsync(order, cancellationToken: ct);return order.Id;});}
}

4. Pipeline 行为示意图

Client PipelineBehaviors Handler Send Request ValidationBehavior LoggingBehavior Handle Command/Query Return Response Client PipelineBehaviors Handler

5. ValidationBehavior + LoggingBehavior

public class ValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>where TRequest : notnull
{private readonly IEnumerable<IValidator<TRequest>> _validators;public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators) => _validators = validators;public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct){var context = new ValidationContext<TRequest>(request);var results = await Task.WhenAll(_validators.Select(v => v.ValidateAsync(context, ct)));var failures = results.SelectMany(r => r.Errors).Where(f => f != null).ToList();if (failures.Any()) throw new ValidationException(failures);return await next();}
}public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger) => _logger = logger;public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct){_logger.LogInformation("Handling {Request} with payload: {@Payload}", typeof(TRequest).Name, request);var sw = Stopwatch.StartNew();var response = await next();sw.Stop();_logger.LogInformation("Handled {Request} in {Elapsed}ms with response: {@Response}", typeof(TRequest).Name, sw.ElapsedMilliseconds, response);return response;}
}

6. 查询缓存 + 布隆过滤器

public class GetOrderByIdHandler : IRequestHandler<GetOrderByIdQuery, OrderDto>
{private readonly IOrderRepository _repo;private readonly IDistributedCache<OrderDto> _cache;private readonly IBloomFilter _bloomFilter;public GetOrderByIdHandler(IOrderRepository repo,IDistributedCache<OrderDto> cache,IBloomFilter bloomFilter){_repo = repo;_cache = cache;_bloomFilter = bloomFilter;}public async Task<OrderDto> Handle(GetOrderByIdQuery request, CancellationToken ct){if (!_bloomFilter.Contains(request.OrderId))throw new EntityNotFoundException(typeof(Order), request.OrderId);var cacheKey = string.Format(CacheKeys.OrderById, request.OrderId);return await _cache.GetOrAddAsync(cacheKey,async () =>{var order = await _repo.FindAsync(request.OrderId, ct);if (order == null) throw new EntityNotFoundException(typeof(Order), request.OrderId);return new OrderDto(order.Id, order.ProductId, order.Quantity);},options =>{options.SlidingExpiration = TimeSpan.FromMinutes(5);options.AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1);});}
}```markdown---## 🚀 部署与运行(Program.cs)```csharp
var builder = WebApplication.CreateBuilder(args);
var config = builder.Configuration;// 外部化配置
builder.Host.ConfigureAppConfiguration((ctx, cfg) =>
{cfg.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true).AddEnvironmentVariables();
});// EF Core & DbContext
builder.Services.AddAbpDbContext<CqrsDemoDbContext>(options =>
{options.AddDefaultRepositories();
});
builder.Services.Configure<AbpDbContextOptions>(options =>options.UseSqlServer(config.GetConnectionString("Default")));// Redis 缓存 & 布隆过滤器
builder.Services.AddStackExchangeRedisCache(opts =>opts.Configuration = config["Redis:Connection"]);
builder.Services.AddSingleton<IBloomFilter>(_ =>new BloomFilter(expectedItemCount: 10000, falsePositiveRate: 0.01));// Health Checks
builder.Services.AddHealthChecks().AddDbContextCheck<CqrsDemoDbContext>("Database").AddRedis(config["Redis:Connection"], name: "Redis");// Polly HTTP 客户端
builder.Services.AddHttpClient("order").AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)))).AddTransientHttpErrorPolicy(p => p.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30)));var app = builder.Build();
app.MapHealthChecks("/health/live", new HealthCheckOptions { Predicate = _ => false });
app.MapHealthChecks("/health/ready");app.MapControllers();
app.Run();

部署架构示意图

HTTP
Route
Read/Write
Cache
Client
Ingress/Nginx
API Pods
PostgreSQL
Redis

🧪 自动化测试示例(Testcontainers)

using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;public class CustomWebAppFactory : WebApplicationFactory<Program>, IAsyncLifetime
{private readonly PostgreSqlTestcontainer _postgres;public CustomWebAppFactory(){_postgres = new TestcontainersBuilder<PostgreSqlTestcontainer>().WithDatabase(new PostgreSqlTestcontainerConfiguration{Database = "demo",Username = "sa",Password = "sa"}).Build();}public async Task InitializeAsync() => await _postgres.StartAsync();public async Task DisposeAsync() => await _postgres.DisposeAsync();protected override void ConfigureWebHost(IWebHostBuilder builder){builder.ConfigureAppConfiguration((ctx, cfg) =>{cfg.AddInMemoryCollection(new Dictionary<string, string>{["ConnectionStrings:Default"] = _postgres.ConnectionString});});}
}public class OrderApiTests : IClassFixture<CustomWebAppFactory>
{private readonly HttpClient _client;public OrderApiTests(CustomWebAppFactory factory) => _client = factory.CreateClient();[Fact]public async Task Should_Create_Order_Successfully(){var command = new { orderId = Guid.NewGuid(), productId = "A001", quantity = 1 };var response = await _client.PostAsJsonAsync("/api/orders", command);response.EnsureSuccessStatusCode();}
}

测试流程图

Test 初始化
启动 Postgres 容器
配置 WebHost
运行集成测试
销毁容器

📦 Docker Compose

version: "3.9"
services:api:build: .ports:- "5000:80"db:image: postgresenvironment:POSTGRES_DB: demoPOSTGRES_USER: saPOSTGRES_PASSWORD: saredis:image: redis


文章转载自:

http://Qt3Kcyqv.rbsmm.cn
http://KiiAZi9K.rbsmm.cn
http://mEP5aRMR.rbsmm.cn
http://idRVqhxs.rbsmm.cn
http://bj5exYX3.rbsmm.cn
http://uHIZzgsE.rbsmm.cn
http://USEvdSUK.rbsmm.cn
http://3bKZ0bD4.rbsmm.cn
http://g02DZEMc.rbsmm.cn
http://L5xtGUt2.rbsmm.cn
http://q32QrS0z.rbsmm.cn
http://2gFR388M.rbsmm.cn
http://B3uoZr93.rbsmm.cn
http://xVui9pgr.rbsmm.cn
http://U30O2sIL.rbsmm.cn
http://PMbyyytW.rbsmm.cn
http://RyDyAftF.rbsmm.cn
http://M4bRI11g.rbsmm.cn
http://wsnpgpOu.rbsmm.cn
http://yWhmzIVa.rbsmm.cn
http://5xBt7izg.rbsmm.cn
http://kHBm1AZo.rbsmm.cn
http://QxT95JxP.rbsmm.cn
http://wc4f1d64.rbsmm.cn
http://jOgdSvRb.rbsmm.cn
http://CXNhymsw.rbsmm.cn
http://eAjFBrBA.rbsmm.cn
http://Ciscm8I7.rbsmm.cn
http://WXWCrPwU.rbsmm.cn
http://hdfOU6Ef.rbsmm.cn
http://www.dtcms.com/a/201789.html

相关文章:

  • 源码分析之Leaflet中TileLayer
  • Linux Bash 中 $? 的详细用法
  • 每日算法 -【Swift 算法】寻找两个有序数组的中位数(O(log(m+n)))详细讲解版
  • 深挖navigator.webdriver浏览器自动化检测的底层分析
  • k8s1.27版本集群部署minio分布式
  • jQuery Ajax中dataType 和 content-type 参数的作用详解
  • MySQL 8.0 OCP 英文题库解析(六)
  • Java中字符串(String类)的常用方法
  • 海康威视摄像头C#开发指南:从SDK对接到安全增强与高并发优化
  • win7无线网络名称显示为编码,连接对应网络不方便【解决办法】
  • 基于springboot的校园二手电动车 交易可视化系统【附源码】
  • 【Jitsi Meet】(腾讯会议的平替)Docker安装Jitsi Meet指南-使用内网IP访问
  • docker- Harbor 配置 HTTPS 协议的私有镜像仓库
  • Pytorch针对不同电脑配置详细讲解+安装(CPU)
  • Prompt Tuning:高效微调大模型的新利器
  • 基于CATIA参数化圆锥建模的自动化插件开发实践——NX建模之圆锥体命令的参考与移植(二)
  • 零基础设计模式——创建型模式 - 单例模式
  • Qt项目开发中所遇
  • 基于Springboot + vue3实现的工商局商家管理系统
  • 使用 lock4j-redis-template-spring-boot-starter 实现 Redis 分布式锁
  • 图像处理基础知识
  • Vue百日学习计划Day46-48天详细计划-Gemini版
  • CentOS Stream 9 中部署 MySQL 8.0 MGR(MySQL Group Replication)一主两从高可用集群
  • YOLOv8 的双 Backbone 架构:解锁目标检测新性能
  • SQLynx 团队协作实践:提升数据库开发效率的解决方案​
  • [luogu12542] [APIO2025] 排列游戏 - 交互 - 博弈 - 分类讨论 - 构造
  • Spark大数据分析案例(pycharm)
  • pycharm无法正常调试问题
  • 山东大学软件学院项目实训-基于大模型的模拟面试系统-Vditor编辑器上传图片
  • C++学习:六个月从基础到就业——多线程编程:std::thread基础