Redis + ABP vNext 构建分布式高可用缓存架构
🚀 Redis + ABP vNext 构建分布式高可用缓存架构
🔧 环境准备
-
开发环境
- .NET 8.0 SDK
- Visual Studio 2022 / VS Code
- Docker & Docker Compose
-
NuGet 包
Volo.Abp.Caching.StackExchangeRedis v8.1.5
Volo.Abp.DistributedLocking.StackExchangeRedis v8.1.5
Volo.Abp.EventBus.Distributed.Redis v8.1.5
Polly v7.2.3
-
Global using
using System; using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Polly; using StackExchange.Redis; using Volo.Abp; using Volo.Abp.Caching; using Volo.Abp.Caching.StackExchangeRedis; using Volo.Abp.DistributedLocking; using Volo.Abp.DistributedLocking.StackExchangeRedis; using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed.Redis;
项目结构示例
src/ ├─ MyApp/ │ ├─ MyAppModule.cs │ ├─ Program.cs │ └─ Services/ │ └─ ProductService.cs └─ test/└─ MyApp.Tests/└─ CacheTests.cs
📚 目录
- 🚀 Redis + ABP vNext 构建分布式高可用缓存架构
- 🔧 环境准备
- 1️⃣ 背景与需求 🎯
- 2️⃣ 架构设计:ABP vNext + Redis 分布式缓存
- 3️⃣ 搭建 Redis 集群(Sentinel & Cluster)
- 🔹 Sentinel HA 示例(docker-compose.yml)
- 🔹 Cluster 分片示例
- 4️⃣ Program.cs 最小示例
- 🚀 启动流程示意图
- 5️⃣ ABP vNext 分布式部署与配置 🛠️
- 1. 模块依赖与预配置
- 2. 服务注册
- 6️⃣ 缓存设计与二级缓存策略 🔑
- 🔄 服务缓存调用流程
- 缓存 Key 常量
- GetOrAddAsync + 空值占位
- 7️⃣ 分布式锁与缓存击穿保护 🔒
- 🔒 分布式锁获取流程
- 8️⃣ 缓存失效同步:Redis Pub/Sub / 分布式事件总线 📣
- 事件定义
- 发布 & 处理
- 9️⃣ 性能调优与监控 📈
- OpenTelemetry + Grafana
- 🔟 单元测试示例 🧪
1️⃣ 背景与需求 🎯
分布式微服务场景下,缓存可显著提升性能和降低数据库压力,但需解决:
- 🔄 多节点更新 导致数据不一致
- 💥 热点缓存失效 引发击穿
- 🔄 节点扩缩容 后本地缓存未命中
量化场景:
- QPS:10,000 req/s
- 读写比:95% 读 / 5% 写
- 缓存命中率目标:>95%
需求:构建 高可用、高性能、可复现 的缓存系统。
2️⃣ 架构设计:ABP vNext + Redis 分布式缓存
- 🗝️ Redis 统一缓存源
- 📣 Pub/Sub / 分布式事件广播失效
- 🔁 本地
IMemoryCache
+ 分布式IDistributedCache
二级缓存
3️⃣ 搭建 Redis 集群(Sentinel & Cluster)
🔹 Sentinel HA 示例(docker-compose.yml)
version: '3'
services:redis-master:image: redis:6.2volumes:- ./redis-master.conf:/usr/local/etc/redis/redis.confcommand: redis-server /usr/local/etc/redis/redis.confports: ["6379:6379"]redis-slave:image: redis:6.2command: redis-server --slaveof redis-master 6379ports: ["6380:6379"]sentinel-1:image: redis:6.2command: redis-sentinel /usr/local/etc/redis/sentinel.confports: ["26379:26379"]
# 同理配置 sentinel-2、sentinel-3 …
sentinel.conf:
sentinel monitor mymaster redis-master 6379 2 sentinel auth-pass mymaster <password>
🔹 Cluster 分片示例
version: '3'
services:redis-node-1:image: bitnami/redis-cluster:6environment:- REDIS_PASSWORD=yourpasswordports: ["7000:7000","7001:7001"]# 至少 6 个节点...
集群创建:
docker exec -it <container> \redis-cli --cluster create \127.0.0.1:7000 127.0.0.1:7001 ... \--cluster-replicas 1 --cluster-yes
4️⃣ Program.cs 最小示例
🚀 启动流程示意图
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddApplication<MyAppModule>();var app = builder.Build();
app.InitializeApplication();
app.Run();
5️⃣ ABP vNext 分布式部署与配置 🛠️
1. 模块依赖与预配置
[DependsOn(typeof(AbpCachingStackExchangeRedisModule),typeof(AbpDistributedLockingStackExchangeRedisModule),typeof(AbpEventBusDistributedRedisModule)
)]
public class MyAppModule : AbpModule
{public override void PreConfigureServices(ServiceConfigurationContext ctx){PreConfigure<AbpDistributedCacheOptions>(o => o.KeyPrefix = "MyApp:");PreConfigure<AbpDistributedEventBusOptions>(o =>{o.IsEnabled = true;o.BusConfigurationOptions = ConfigurationOptions.Parse(ctx.Services.GetConfiguration()["Redis:Configuration"]);});}
}
2. 服务注册
public override void ConfigureServices(ServiceConfigurationContext ctx)
{var config = ctx.Services.GetConfiguration();ctx.Services.AddStackExchangeRedisCache(opts =>{opts.Configuration = config["Redis:Configuration"];opts.InstanceName = config["Redis:InstanceName"];});ctx.Services.AddAbpDistributedLocking(o =>o.UseRedis(config["Redis:Configuration"]));ctx.Services.AddAbpDistributedEventBusRedis();ctx.Services.AddMemoryCache();
}
6️⃣ 缓存设计与二级缓存策略 🔑
🔄 服务缓存调用流程
缓存 Key 常量
public static class CacheKeyConsts
{public const string Product = "product:";
}
GetOrAddAsync + 空值占位
public async Task<ProductDto> GetProductAsync(Guid id)
{var key = CacheKeyConsts.Product + id;var dto = await _distributedCache.GetOrAddAsync(key,async () =>{var r = await _repository.FindAsync(id);if (r == null){await _distributedCache.SetAsync(key, NullObject.Value,new DistributedCacheEntryOptions{ AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(30) });return NullObject.Value;}return r;},opts => opts.SetAbsoluteExpiration(TimeSpan.FromMinutes(5)));_memoryCache.Set(key, dto, TimeSpan.FromSeconds(30));return dto;
}
7️⃣ 分布式锁与缓存击穿保护 🔒
🔒 分布式锁获取流程
public class ProductService
{private readonly IDistributedLockProvider _lock;private readonly IDistributedCache _cache;private readonly IRepository<Product, Guid> _repo;private readonly ILogger<ProductService> _logger;public ProductService(IDistributedLockProvider lockProvider,IDistributedCache cache,IRepository<Product, Guid> repo,ILogger<ProductService> logger){_lock = lockProvider;_cache = cache;_repo = repo;_logger = logger;}public async Task<ProductDto> GetWithLockAsync(Guid id){var key = CacheKeyConsts.Product + id;using var scope = _logger.BeginScope("Lock:{Key}", key);await using var handle = await _lock.TryAcquireAsync("lock:" + key,TimeSpan.FromSeconds(3),TimeSpan.FromSeconds(5));if (handle == null){_logger.LogWarning("Lock failed for {Key}", key);return await _repo.FindAsync(id);}var result = await _repo.FindAsync(id);await Policy.Handle<RedisException>().RetryAsync(3).ExecuteAsync(() => _cache.SetAsync(key, result));return result;}
}
8️⃣ 缓存失效同步:Redis Pub/Sub / 分布式事件总线 📣
事件定义
public class CacheInvalidatedEvent : EventData
{public string Key { get; }public Guid Id { get; }public CacheInvalidatedEvent(string key, Guid id) { Key = key; Id = id; }
}
发布 & 处理
await _distributedEventBus.PublishAsync(new CacheInvalidatedEvent(CacheKeyConsts.Product, productId));public class CacheInvalidatedHandler : IDistributedEventHandler<CacheInvalidatedEvent>
{private readonly IDistributedCache _cache;public CacheInvalidatedHandler(IDistributedCache cache) => _cache = cache;public async Task HandleEventAsync(CacheInvalidatedEvent e){await _cache.RemoveAsync(e.Key + ":" + e.Id);}
}
ctx.Services.AddTransient<IDistributedEventHandler<CacheInvalidatedEvent>,CacheInvalidatedHandler>();
9️⃣ 性能调优与监控 📈
OpenTelemetry + Grafana
services.AddOpenTelemetry().WithTracing(b => b.AddAspNetCoreInstrumentation().AddRedisInstrumentation().AddConsoleExporter());
{"panels":[{"title":"Redis RTT","type":"graph","targets":[{"expr":"redis_command_duration_seconds_bucket"}]}]
}
埋点:缓存命中/未命中、锁竞争次数、异常重试日志。
🔟 单元测试示例 🧪
public class CacheTests : AbpIntegratedTest<MyAppModule>
{[Fact]public async Task Should_Remove_Cache_On_Invalidate_Event(){var bus = ServiceProvider.GetRequiredService<IDistributedEventBus>();var cache = ServiceProvider.GetRequiredService<IDistributedCache>();var id = Guid.NewGuid();var key = CacheKeyConsts.Product + id;await cache.SetAsync(key, new ProductDto { Id = id });await bus.PublishAsync(new CacheInvalidatedEvent(CacheKeyConsts.Product, id));var val = await cache.GetAsync<ProductDto>(key);Assert.Null(val);}
}
欢迎点赞 ⭐、讨论交流!