当前位置: 首页 > news >正文

基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战

🚀 基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战


📑 目录

  • 🚀 基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战
    • ✨ 背景介绍
    • 🧱 技术选型
    • 🛠️ 环境准备
      • ✅ Docker Compose(多库 & 读写分离演示)
        • 🚀 环境与容器启动流程
      • ✅ 基础表与分表初始化脚本
        • 📦 分表初始化流程
      • ✅ `appsettings.json` 样板
      • ✅ NuGet 包安装
    • 🧩 项目实现步骤
      • 1. 定义分片键接口
      • 2. 实体与 DbContext
      • 3. SaveChanges 拦截器
      • 4. 分表路由规则
      • 5. `Program.cs` 配置
        • 🛠️ 服务启动与配置流程
      • 6. 分片引导器服务
      • 7. 控制器示例:批量插入与异常处理
        • 🔄 请求处理与分片路由流程
      • 8. 集成测试示例(xUnit & WebApplicationFactory)
    • ✅ 性能优化与最佳实践
      • 🔍 监控与指标采集流程
    • 🔗 总结


✨ 背景介绍

随着系统数据量的增长,单表性能瓶颈和数据库存储压力日益显著,分库分表逐渐成为解决大数据、高并发问题的有效手段。ABP vNext 提供模块化和扩展能力,ShardingCore 是基于 EF Core 的轻量级分库分表中间件。本文结合两者,并引入监控、健康检查与读写分离等生产级要素,基于 PostgreSQL 构建一套高性能、高可用、可复现的分库分表解决方案。


🧱 技术选型

  • ABP vNext 8.1.5
  • EF Core 8.0.4
  • PostgreSQL 15
  • ShardingCore 7.8.1.21
  • .NET 8 SDK
  • EFCore.BulkExtensions(批量插入)
  • Polly(重试与熔断)
  • OpenTelemetry & Prometheus(监控与指标)

🛠️ 环境准备

✅ Docker Compose(多库 & 读写分离演示)

🚀 环境与容器启动流程
撰写 docker-compose.yml
docker-compose up
Postgres 主库 pg0 启动
Postgres 分库 pg1 启动
Postgres 只读副本 pg-replica 启动
应用容器 app 启动
version: '3.8'
services:pg0:image: postgres:15environment:POSTGRES_DB: shard_dbPOSTGRES_USER: postgresPOSTGRES_PASSWORD: passports:- "5432:5432"pg1:image: postgres:15environment:POSTGRES_DB: shard_db_1POSTGRES_USER: postgresPOSTGRES_PASSWORD: passports:- "5433:5432"pg-replica:image: postgres:15environment:POSTGRES_DB: shard_dbPOSTGRES_USER: postgresPOSTGRES_PASSWORD: pass# 可配置流复制,示例略ports:- "5434:5432"app:build: .depends_on:- pg0- pg1- pg-replicaenvironment:ConnectionStrings__Default: Host=pg0;Database=shard_db;Username=postgres;Password=passConnectionStrings__Shard1:    Host=pg1;Database=shard_db_1;Username=postgres;Password=passConnectionStrings__ReadReplica: Host=pg-replica;Database=shard_db;Username=postgres;Password=passports:- "5000:80"

✅ 基础表与分表初始化脚本

-- 创建基础表
CREATE TABLE IF NOT EXISTS public."Order" ("Id" BIGSERIAL PRIMARY KEY,"OrderNo" VARCHAR(50) NOT NULL,"Amount" NUMERIC(18,2) NOT NULL,"CreationTime" TIMESTAMPTZ NOT NULL,"CreatorUserId" BIGINT NULL
);-- 自动创建近 12 个月的分表
DO $$
BEGINFOR i IN 0..11 LOOPEXECUTE format('CREATE TABLE IF NOT EXISTS public."Order_%s" (LIKE public."Order" INCLUDING ALL);',to_char(current_date - (i || '' month'')::interval, ''YYYYMM''));END LOOP;
END
$$;
📦 分表初始化流程
连接到 shard_db
执行基础表 DDL
生成近 12 个月分表列表
循环创建 public.Order_YYYYMM 分表
完成分表初始化

appsettings.json 样板

{"ConnectionStrings": {"Default": "Host=pg0;Database=shard_db;Username=postgres;Password=pass","Shard1": "Host=pg1;Database=shard_db_1;Username=postgres;Password=pass","ReadReplica": "Host=pg-replica;Database=shard_db;Username=postgres;Password=pass"}
}

✅ NuGet 包安装

dotnet add package ShardingCore
dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
dotnet add package EFCore.BulkExtensions
dotnet add package Polly.Extensions.Http
dotnet add package OpenTelemetry.Extensions.Hosting
dotnet add package OpenTelemetry.Exporter.Console
dotnet add package prometheus-net.AspNetCore

🧩 项目实现步骤

1. 定义分片键接口

using System;
using ShardingCore.Core.EntityMetadatas;public interface IShardingKeyIsCreationTime
{[ShardingKey]DateTime CreationTime { get; set; }
}

2. 实体与 DbContext

using System;
using Volo.Abp.Domain.Entities.Auditing;
using Volo.Abp.Domain.Entities;public class Order : AggregateRoot<long>, ICreationAudited, IShardingKeyIsCreationTime
{public string OrderNo { get; set; }public decimal Amount { get; set; }public DateTime CreationTime { get; set; }public long? CreatorUserId { get; set; }
}using Microsoft.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;public class AppDbContext : AbpDbContext<AppDbContext>
{public DbSet<Order> Orders { get; set; }public AppDbContext(DbContextOptions<AppDbContext> options): base(options) { }protected override void OnModelCreating(ModelBuilder builder){base.OnModelCreating(builder);builder.Entity<Order>().Property(o => o.CreationTime).HasDefaultValueSql("NOW() at time zone 'utc'").ValueGeneratedOnAdd();}
}

3. SaveChanges 拦截器

using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using ShardingCore.Core.EntityMetadatas;public class AuditInterceptor : SaveChangesInterceptor
{public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,InterceptionResult<int> result,CancellationToken ct = default){var context = eventData.Context;if (context != null){foreach (var entry in context.ChangeTracker.Entries<Order>().Where(e => e.State == EntityState.Added)){entry.Entity.CreationTime = DateTime.UtcNow;entry.Entity.OrderNo = SnowflakeIdGenerator.NewId();}}return base.SavingChangesAsync(eventData, result, ct);}
}

4. 分表路由规则

using System;
using ShardingCore.Core.VirtualRoutes.Modify;public class OrderMonthlyRoute : AbstractSimpleShardingMonthKeyDateTimeVirtualTableRoute<Order>
{public override bool AutoCreateTableByTime => true;public override string GetActualTableName(string tableName, DateTime shardingKey)=> $"{tableName}_{shardingKey:yyyyMM}";
}

5. Program.cs 配置

using Microsoft.AspNetCore.Builder;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Polly.Extensions.Http;
using Prometheus;
using ShardingCore.Core.VirtualRoutes.Modify;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;var builder = WebApplication.CreateBuilder(args);
var services = builder.Services;
var cfg = builder.Configuration;// 拦截器
services.AddSingleton<AuditInterceptor>();// EF + ShardingCore
services.AddShardingDbContext<AppDbContext>().UseRouteConfig(o => o.AddShardingTableRoute<OrderMonthlyRoute>()).UseConfig((sp, o) =>{o.UseShardingQuery((conn, opt) => opt.UseNpgsql(conn)).UseShardingTransaction((conn, opt) => opt.UseNpgsql(conn)).AddDefaultDataSource("ds0", cfg["ConnectionStrings:Default"]).AddDataSource("ds1", cfg["ConnectionStrings:Shard1"]).UseReadWriteSeparation(writeConn: cfg["ConnectionStrings:Default"],readConn: cfg["ConnectionStrings:ReadReplica"],readWeight: 5, writeWeight: 1);}).AddShardingCore().AddInterceptors(sp => new[] { sp.GetRequiredService<AuditInterceptor>() });// 分片引导后台服务
services.AddHostedService<ShardingBootstrapperService>();// 健康检查
services.AddHealthChecks().AddNpgSql(cfg["ConnectionStrings:Default"], name: "shard-ds0").AddNpgSql(cfg["ConnectionStrings:Shard1"], name: "shard-ds1");// OpenTelemetry Tracing
services.AddOpenTelemetryTracing(b =>b.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ShardingDemo")).AddAspNetCoreInstrumentation().AddEntityFrameworkCoreInstrumentation().AddConsoleExporter()
);// Prometheus Metrics
services.AddMetricServer();
services.AddHttpMetrics();// HttpClient + Polly
services.AddHttpClient("defaultClient").AddPolicyHandler(HttpPolicyExtensions.HandleTransientHttpError().WaitAndRetryAsync(3, _ => TimeSpan.FromSeconds(2))).AddPolicyHandler(HttpPolicyExtensions.HandleTransientHttpError().CircuitBreakerAsync(5, TimeSpan.FromSeconds(30)));services.AddControllers();var app = builder.Build();
app.UseHttpMetrics();
app.MapHealthChecks("/health");
app.MapControllers();
app.Run();
🛠️ 服务启动与配置流程
CreateBuilder(args)
ConfigureServices
AddShardingDbContext & 拦截器
AddHealthChecks & Monitoring
Build()
UseHttpMetrics()
MapHealthChecks & MapControllers()
Run()

6. 分片引导器服务

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using ShardingCore.Sharding.Abstractions;public class ShardingBootstrapperService : IHostedService
{private readonly IShardingBootstrapper _bootstrapper;public ShardingBootstrapperService(IShardingBootstrapper bootstrapper)=> _bootstrapper = bootstrapper;public Task StartAsync(CancellationToken ct) => _bootstrapper.StartAsync(ct);public Task StopAsync(CancellationToken ct) => _bootstrapper.DisposeAsync().AsTask();
}

7. 控制器示例:批量插入与异常处理

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Domain.Repositories;[ApiController]
[Route("api/orders")]
public class OrderController : ControllerBase
{private readonly AppDbContext _context;public OrderController(AppDbContext context) => _context = context;[HttpPost("batch")]public async Task<IActionResult> CreateBatchAsync(CancellationToken ct){try{var orders = Enumerable.Range(1, 10).Select(_ => new Order { Amount = 99.99m }).ToList();await _context.BulkInsertAsync(orders, cancellationToken: ct);return Ok(orders);}catch (DbUpdateException ex){return StatusCode(500, new { message = "数据库写入失败", error = ex.Message });}}
}
🔄 请求处理与分片路由流程
HTTP POST /api/orders/batch
OrderController.CreateBatchAsync
BulkInsertAsync 调用
AuditInterceptor SavingChangesAsync
ShardingCore 路由到 Order_YYYYMM
实际写入数据库
返回 200 OK + 数据

8. 集成测试示例(xUnit & WebApplicationFactory)

using System.Collections.Generic;
using System.Net.Http;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc.Testing;
using Xunit;public class OrderControllerTests : IClassFixture<WebApplicationFactory<Program>>
{private readonly HttpClient _client;public OrderControllerTests(WebApplicationFactory<Program> factory)=> _client = factory.CreateClient();[Fact]public async Task CreateBatch_Returns10Orders(){var response = await _client.PostAsync("/api/orders/batch", null);response.EnsureSuccessStatusCode();var json = await response.Content.ReadAsStringAsync();var orders = JsonSerializer.Deserialize<List<Order>>(json, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });Assert.Equal(10, orders.Count);}
}

✅ 性能优化与最佳实践

  • 基础表 DDL:提供 Order 表完整建表语句,避免“表不存在”问题
  • 一键分表:脚本自动生成近 N 个月分表,支持生产环境预建表
  • 业务化 ID:雪花算法生成全局唯一 OrderNo
  • 批量插入EFCore.BulkExtensions + CancellationToken 提升写入吞吐
  • 多副本读写分离:真实多库演示,读库权重 configurable
  • Polly:重试 + 熔断保障网络抖动
  • 健康检查AddNpgSql 实时监控各分片状态
  • 链路追踪OpenTelemetry.Exporter.Console + EF Core Instrumentation
  • 指标采集prometheus-net + UseMetricServer()

🔍 监控与指标采集流程

应用启动
OpenTelemetry Tracer 初始化
收集 ASP.NET Core & EF Core 追踪
ConsoleExporter 输出
Prometheus MetricServer 启动
暴露 /metrics 端点

🔗 总结

本文覆盖了从多实例 Docker Compose、基础表 DDL、分表脚本、appsettings.json 样板,到实体设计、拦截器、路由规则、完整 Program.cs 配置、后台引导、控制器示例和集成测试的全链路。帮助读者快速落地“高性能、高可用、可复现”的 ABP vNext + ShardingCore 分库分表解决方案。

相关文章:

  • 使用FastAPI和React以及MongoDB构建全栈Web应用05 FastAPI快速入门
  • 红黑树(C++)
  • A1062 PAT甲级JAVA题解 Talent and Virtue
  • 大语言模型通过MCP控制STM32-支持Ollama、DeepSeek、openai等
  • 【C++】内存管理 —— new 和 delete
  • D. Explorer Space(dfs+剪枝)
  • 深入理解深度Q网络DQN:基于python从零实现
  • 三、c语言练习四题
  • 前端项目打包部署流程j
  • 无人机空中物流优化:用 Python 打造高效配送模型
  • 华为IP(6)
  • 中空电机在安装垂直轴高速电机后无法动平衡的原因及解决方案
  • 【网络】:传输层协议 —— UDP、TCP协议
  • Compose笔记(二十二)--NavController
  • 嵌入式硬件篇---SPI
  • 嵌入式硬件篇---陀螺仪|PID
  • 验证码与登录过程逻辑学习总结
  • Go语言——kratos微服务框架使用
  • Linux 进程控制 基础IO
  • 关系数据库-关系运算
  • 沈阳一超市疑借领养名义烹食流浪狗,当地市监局:已收到多起投诉
  • A股高开高走:沪指涨0.82%,创指涨2.63%,超4100股收涨
  • 中国科学院院士徐春明不再担任山东石油化工学院校长
  • 重庆大学通报本科生发14篇SCI论文:涉事学生及其父亲被处理
  • 时代中国控股:前4个月销售额18.1亿元,境外债重组协议押后聆讯至5月底
  • 本周看啥|喜欢二次元的观众,去电影院吧