基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战
🚀 基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战
📑 目录
- 🚀 基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战
- ✨ 背景介绍
- 🧱 技术选型
- 🛠️ 环境准备
- ✅ Docker Compose(多库 & 读写分离演示)
- 🚀 环境与容器启动流程
- ✅ 基础表与分表初始化脚本
- 📦 分表初始化流程
- ✅ `appsettings.json` 样板
- ✅ NuGet 包安装
- 🧩 项目实现步骤
- 1. 定义分片键接口
- 2. 实体与 DbContext
- 3. SaveChanges 拦截器
- 4. 分表路由规则
- 5. `Program.cs` 配置
- 🛠️ 服务启动与配置流程
- 6. 分片引导器服务
- 7. 控制器示例:批量插入与异常处理
- 🔄 请求处理与分片路由流程
- 8. 集成测试示例(xUnit & WebApplicationFactory)
- ✅ 性能优化与最佳实践
- 🔍 监控与指标采集流程
- 🔗 总结
✨ 背景介绍
随着系统数据量的增长,单表性能瓶颈和数据库存储压力日益显著,分库分表逐渐成为解决大数据、高并发问题的有效手段。ABP vNext 提供模块化和扩展能力,ShardingCore 是基于 EF Core 的轻量级分库分表中间件。本文结合两者,并引入监控、健康检查与读写分离等生产级要素,基于 PostgreSQL 构建一套高性能、高可用、可复现的分库分表解决方案。
🧱 技术选型
- ABP vNext 8.1.5
- EF Core 8.0.4
- PostgreSQL 15
- ShardingCore 7.8.1.21
- .NET 8 SDK
- EFCore.BulkExtensions(批量插入)
- Polly(重试与熔断)
- OpenTelemetry & Prometheus(监控与指标)
🛠️ 环境准备
✅ Docker Compose(多库 & 读写分离演示)
🚀 环境与容器启动流程
version: '3.8'
services:pg0:image: postgres:15environment:POSTGRES_DB: shard_dbPOSTGRES_USER: postgresPOSTGRES_PASSWORD: passports:- "5432:5432"pg1:image: postgres:15environment:POSTGRES_DB: shard_db_1POSTGRES_USER: postgresPOSTGRES_PASSWORD: passports:- "5433:5432"pg-replica:image: postgres:15environment:POSTGRES_DB: shard_dbPOSTGRES_USER: postgresPOSTGRES_PASSWORD: pass# 可配置流复制,示例略ports:- "5434:5432"app:build: .depends_on:- pg0- pg1- pg-replicaenvironment:ConnectionStrings__Default: Host=pg0;Database=shard_db;Username=postgres;Password=passConnectionStrings__Shard1: Host=pg1;Database=shard_db_1;Username=postgres;Password=passConnectionStrings__ReadReplica: Host=pg-replica;Database=shard_db;Username=postgres;Password=passports:- "5000:80"
✅ 基础表与分表初始化脚本
-- 创建基础表
CREATE TABLE IF NOT EXISTS public."Order" ("Id" BIGSERIAL PRIMARY KEY,"OrderNo" VARCHAR(50) NOT NULL,"Amount" NUMERIC(18,2) NOT NULL,"CreationTime" TIMESTAMPTZ NOT NULL,"CreatorUserId" BIGINT NULL
);-- 自动创建近 12 个月的分表
DO $$
BEGINFOR i IN 0..11 LOOPEXECUTE format('CREATE TABLE IF NOT EXISTS public."Order_%s" (LIKE public."Order" INCLUDING ALL);',to_char(current_date - (i || '' month'')::interval, ''YYYYMM''));END LOOP;
END
$$;
📦 分表初始化流程
✅ appsettings.json
样板
{"ConnectionStrings": {"Default": "Host=pg0;Database=shard_db;Username=postgres;Password=pass","Shard1": "Host=pg1;Database=shard_db_1;Username=postgres;Password=pass","ReadReplica": "Host=pg-replica;Database=shard_db;Username=postgres;Password=pass"}
}
✅ NuGet 包安装
dotnet add package ShardingCore
dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
dotnet add package EFCore.BulkExtensions
dotnet add package Polly.Extensions.Http
dotnet add package OpenTelemetry.Extensions.Hosting
dotnet add package OpenTelemetry.Exporter.Console
dotnet add package prometheus-net.AspNetCore
🧩 项目实现步骤
1. 定义分片键接口
using System;
using ShardingCore.Core.EntityMetadatas;public interface IShardingKeyIsCreationTime
{[ShardingKey]DateTime CreationTime { get; set; }
}
2. 实体与 DbContext
using System;
using Volo.Abp.Domain.Entities.Auditing;
using Volo.Abp.Domain.Entities;public class Order : AggregateRoot<long>, ICreationAudited, IShardingKeyIsCreationTime
{public string OrderNo { get; set; }public decimal Amount { get; set; }public DateTime CreationTime { get; set; }public long? CreatorUserId { get; set; }
}using Microsoft.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;public class AppDbContext : AbpDbContext<AppDbContext>
{public DbSet<Order> Orders { get; set; }public AppDbContext(DbContextOptions<AppDbContext> options): base(options) { }protected override void OnModelCreating(ModelBuilder builder){base.OnModelCreating(builder);builder.Entity<Order>().Property(o => o.CreationTime).HasDefaultValueSql("NOW() at time zone 'utc'").ValueGeneratedOnAdd();}
}
3. SaveChanges 拦截器
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using ShardingCore.Core.EntityMetadatas;public class AuditInterceptor : SaveChangesInterceptor
{public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,InterceptionResult<int> result,CancellationToken ct = default){var context = eventData.Context;if (context != null){foreach (var entry in context.ChangeTracker.Entries<Order>().Where(e => e.State == EntityState.Added)){entry.Entity.CreationTime = DateTime.UtcNow;entry.Entity.OrderNo = SnowflakeIdGenerator.NewId();}}return base.SavingChangesAsync(eventData, result, ct);}
}
4. 分表路由规则
using System;
using ShardingCore.Core.VirtualRoutes.Modify;public class OrderMonthlyRoute : AbstractSimpleShardingMonthKeyDateTimeVirtualTableRoute<Order>
{public override bool AutoCreateTableByTime => true;public override string GetActualTableName(string tableName, DateTime shardingKey)=> $"{tableName}_{shardingKey:yyyyMM}";
}
5. Program.cs
配置
using Microsoft.AspNetCore.Builder;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Polly.Extensions.Http;
using Prometheus;
using ShardingCore.Core.VirtualRoutes.Modify;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;var builder = WebApplication.CreateBuilder(args);
var services = builder.Services;
var cfg = builder.Configuration;// 拦截器
services.AddSingleton<AuditInterceptor>();// EF + ShardingCore
services.AddShardingDbContext<AppDbContext>().UseRouteConfig(o => o.AddShardingTableRoute<OrderMonthlyRoute>()).UseConfig((sp, o) =>{o.UseShardingQuery((conn, opt) => opt.UseNpgsql(conn)).UseShardingTransaction((conn, opt) => opt.UseNpgsql(conn)).AddDefaultDataSource("ds0", cfg["ConnectionStrings:Default"]).AddDataSource("ds1", cfg["ConnectionStrings:Shard1"]).UseReadWriteSeparation(writeConn: cfg["ConnectionStrings:Default"],readConn: cfg["ConnectionStrings:ReadReplica"],readWeight: 5, writeWeight: 1);}).AddShardingCore().AddInterceptors(sp => new[] { sp.GetRequiredService<AuditInterceptor>() });// 分片引导后台服务
services.AddHostedService<ShardingBootstrapperService>();// 健康检查
services.AddHealthChecks().AddNpgSql(cfg["ConnectionStrings:Default"], name: "shard-ds0").AddNpgSql(cfg["ConnectionStrings:Shard1"], name: "shard-ds1");// OpenTelemetry Tracing
services.AddOpenTelemetryTracing(b =>b.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ShardingDemo")).AddAspNetCoreInstrumentation().AddEntityFrameworkCoreInstrumentation().AddConsoleExporter()
);// Prometheus Metrics
services.AddMetricServer();
services.AddHttpMetrics();// HttpClient + Polly
services.AddHttpClient("defaultClient").AddPolicyHandler(HttpPolicyExtensions.HandleTransientHttpError().WaitAndRetryAsync(3, _ => TimeSpan.FromSeconds(2))).AddPolicyHandler(HttpPolicyExtensions.HandleTransientHttpError().CircuitBreakerAsync(5, TimeSpan.FromSeconds(30)));services.AddControllers();var app = builder.Build();
app.UseHttpMetrics();
app.MapHealthChecks("/health");
app.MapControllers();
app.Run();
🛠️ 服务启动与配置流程
6. 分片引导器服务
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using ShardingCore.Sharding.Abstractions;public class ShardingBootstrapperService : IHostedService
{private readonly IShardingBootstrapper _bootstrapper;public ShardingBootstrapperService(IShardingBootstrapper bootstrapper)=> _bootstrapper = bootstrapper;public Task StartAsync(CancellationToken ct) => _bootstrapper.StartAsync(ct);public Task StopAsync(CancellationToken ct) => _bootstrapper.DisposeAsync().AsTask();
}
7. 控制器示例:批量插入与异常处理
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Domain.Repositories;[ApiController]
[Route("api/orders")]
public class OrderController : ControllerBase
{private readonly AppDbContext _context;public OrderController(AppDbContext context) => _context = context;[HttpPost("batch")]public async Task<IActionResult> CreateBatchAsync(CancellationToken ct){try{var orders = Enumerable.Range(1, 10).Select(_ => new Order { Amount = 99.99m }).ToList();await _context.BulkInsertAsync(orders, cancellationToken: ct);return Ok(orders);}catch (DbUpdateException ex){return StatusCode(500, new { message = "数据库写入失败", error = ex.Message });}}
}
🔄 请求处理与分片路由流程
8. 集成测试示例(xUnit & WebApplicationFactory)
using System.Collections.Generic;
using System.Net.Http;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc.Testing;
using Xunit;public class OrderControllerTests : IClassFixture<WebApplicationFactory<Program>>
{private readonly HttpClient _client;public OrderControllerTests(WebApplicationFactory<Program> factory)=> _client = factory.CreateClient();[Fact]public async Task CreateBatch_Returns10Orders(){var response = await _client.PostAsync("/api/orders/batch", null);response.EnsureSuccessStatusCode();var json = await response.Content.ReadAsStringAsync();var orders = JsonSerializer.Deserialize<List<Order>>(json, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });Assert.Equal(10, orders.Count);}
}
✅ 性能优化与最佳实践
- 基础表 DDL:提供
Order
表完整建表语句,避免“表不存在”问题 - 一键分表:脚本自动生成近 N 个月分表,支持生产环境预建表
- 业务化 ID:雪花算法生成全局唯一
OrderNo
- 批量插入:
EFCore.BulkExtensions
+CancellationToken
提升写入吞吐 - 多副本读写分离:真实多库演示,读库权重 configurable
- Polly:重试 + 熔断保障网络抖动
- 健康检查:
AddNpgSql
实时监控各分片状态 - 链路追踪:
OpenTelemetry.Exporter.Console
+ EF Core Instrumentation - 指标采集:
prometheus-net
+UseMetricServer()
🔍 监控与指标采集流程
🔗 总结
本文覆盖了从多实例 Docker Compose、基础表 DDL、分表脚本、appsettings.json
样板,到实体设计、拦截器、路由规则、完整 Program.cs
配置、后台引导、控制器示例和集成测试的全链路。帮助读者快速落地“高性能、高可用、可复现”的 ABP vNext + ShardingCore 分库分表解决方案。