多级缓存一致性矩阵:ABP vNext 下的旁路 / 写穿 / 写回组合实战
多级缓存一致性矩阵:ABP vNext 下的旁路 / 写穿 / 写回组合实战
📚 目录
- 多级缓存一致性矩阵:ABP vNext 下的旁路 / 写穿 / 写回组合实战
- 1. 概念速览与边界
- 2. 体系结构
- 3. Key 规范与租户维度 TTL
- 4. 三种写法的组合与适用
- 5. 雪崩 / 穿透 / 击穿 治理
- 6. SWR:Soft-Expire
- 7. 本地缓存(L1)与失效广播
- 8. 布隆过滤器与负缓存
- 依赖 & using
- 统一序列化(JSON,生产可换 MessagePack)
- TTL 抖动扩展(用 Random.Shared)
- 策略与缓存信封
- RedisBloom 轻量客户端封装
- 关键服务:布隆 + 负缓存 + 正常缓存回写
- 使用示例(Controller/Service 层)
- 可选增强
- 9. 写回(Write-Back)落地(选配)
- 10. 策略决策树
- 11. “缓存治理中间件包”——ABP 模块设计
- 12. 可观测性与 SLO
- 13. 压测与验收(k6)
- 14. 常见坑与注意
1. 概念速览与边界
- 多级缓存:L1 进程内
IMemoryCache
🔐;L2 分布式IDistributedCache/Redis
🧑💻;L3 边缘(CDN/反代) 🌐。 - 写策略:旁路(Cache-Aside) 🔄、写穿(Write-Through) 📝、写回/写后(Write-Back/Behind) 🔄。
- 一致性指标:读你所写(强一致性/同步) ⚖️、软实时(≤N 秒追平) ⏱️、最终一致 🔁。
- 本文关注策略组合与治理,讨论如何在多租户 SaaS 环境下,合理配置缓存,提升系统的可用性与性能 ⚡。
2. 体系结构
- 读路径:L1 命中 → 否则查 L2 → 再否回源并回写 L2/L1(带 SWR 与抖动)。SWR=先返回陈旧值,后台重验证。
- 写路径:根据业务在旁路/写穿/写回之间选择;多实例间靠 Redis Pub/Sub 做 L1 失效广播。
- 多租户:ABP 支持多租户隔离,建议采用 租户隔离的 Key 空间与 TTL 策略,保证不同租户缓存不受影响。
3. Key 规范与租户维度 TTL
Key 模板:{app}:{env}:{tenant}:{entity}:{id}
(ABP 统一管理缓存前缀)
TTL 计算:TTL = BaseTTL(tenant,entity) × Jitter(±10~25%)
(加 TTL 抖动 以减少雪崩效应)。
策略提供器接口(放入 ABP 模块中):
public interface ICachePolicyProvider {CachePolicy Get(string tenant, string entity); // 包含 hardTtl, softTtl, jitter, negativeTtl 等
}
public record CachePolicy(TimeSpan HardTtl, TimeSpan SoftTtl, double JitterPct,TimeSpan NegativeTtl, bool EnableSWR, bool EnableBloom);
4. 三种写法的组合与适用
写法 | 一致性 | 典型风险 | 适用场景 |
---|---|---|---|
旁路 Cache-Aside | 软实时/最终一致 | 击穿/回源洪峰 | 读多写少,允许轻微延迟 |
写穿 Write-Through | 较强一致性 | 写延迟变大 | 单体/中低写压、需强一致性 |
写回 Write-Back/Behind | 低延迟/高吞吐 | 数据丢失/回放复杂 | 高写入、可容忍短暂不一致 |
组合建议:
- 读多写少:旁路 + SWR + Single-Flight + 负缓存
- 中等写压、需读你所写:写穿 + L1 失效广播
- 写峰值极高:写回(Redis Streams/队列 + 持久化日志)+ 回放与审计
5. 雪崩 / 穿透 / 击穿 治理
- 雪崩:通过 TTL 抖动、SWR 软过期、热点预热、分布式锁保护回源。
- 穿透:RedisBloom(布隆过滤器)+ 负缓存(短 TTL)+ 参数白名单。
- 击穿:Single-Flight(同 key 回源去重)+ 后台刷新。
Single-Flight(L2 保护示例)
public sealed class SingleFlight {private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new();public async Task<T> RunAsync<T>(string key, Func<Task<T>> loader) {var gate = _locks.GetOrAdd(key, _ => new SemaphoreSlim(1, 1));await gate.WaitAsync();try {return await loader();} finally {gate.Release();_locks.TryRemove(key, out _);}}
}
6. SWR:Soft-Expire
- 逻辑:当 SoftTTL 超过但 HardTTL 未到时,返回陈旧值并后台刷新数据。
- 优点:有效削峰、改善尾延迟;缺点:短时间内读到陈旧数据。
SWR 实现骨架(Redis + 单飞 + 失效广播)
public async Task<T?> GetWithSwrAsync<T>(string key, Func<Task<T>> dbLoader, CachePolicy p,IDistributedCache cache, SingleFlight sf, ISubscriber pub, string? invChannel = null)
{var bytes = await cache.GetAsync(key);var entry = bytes?.FromBytes<CacheEnvelope<T>>();// 新鲜if (entry is { } && entry.SoftExpireAt > DateTimeOffset.UtcNow)return entry.Value;// 软过期:先旧后新if (entry is { } && entry.HardExpireAt > DateTimeOffset.UtcNow) {_ = Task.Run(async () => {var v = await sf.RunAsync(key, dbLoader, CancellationToken.None);await cache.SetAsync(key, new CacheEnvelope<T>(v, p).ToBytes(), new DistributedCacheEntryOptions {AbsoluteExpirationRelativeToNow = p.HardTtl.WithJitter(p.JitterPct)});if (invChannel is not null) await pub.PublishAsync(invChannel, key);});return entry.Value;}// 完全过期:受单飞保护var fresh = await sf.RunAsync(key, dbLoader);await cache.SetAsync(key, new CacheEnvelope<T>(fresh, p).ToBytes(),new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = p.HardTtl.WithJitter(p.JitterPct) });return fresh;
}public readonly record struct CacheEnvelope<T>(T Value, DateTimeOffset SoftExpireAt, DateTimeOffset HardExpireAt) {public CacheEnvelope(T value, CachePolicy p) : this(value,DateTimeOffset.UtcNow + p.SoftTtl,DateTimeOffset.UtcNow + p.HardTtl) { }
}public static class TtlJitterExtensions {static readonly ThreadLocal<Random> R = new(() => new Random());public static TimeSpan WithJitter(this TimeSpan ttl, double pct) {var j = 1.0 + (R.Value!.NextDouble() * 2 - 1) * pct; // ±pctreturn TimeSpan.FromMilliseconds(ttl.TotalMilliseconds * j);}
}
7. 本地缓存(L1)与失效广播
- L1 使用
IMemoryCache
,TTL 应设置 短于 L2;写成功或回源后,通过 Redis Pub/Sub 广播失效。
订阅器(StackExchange.Redis)
public sealed class L1InvalidationSubscriber : IHostedService {private readonly IMemoryCache _l1;private readonly IConnectionMultiplexer _mux;private readonly string _channel;public L1InvalidationSubscriber(IMemoryCache l1, IConnectionMultiplexer mux, IOptions<AppCacheOptions> opt) {_l1 = l1; _mux = mux; _channel = opt.Value.InvalidationChannel ?? "inv:cache";}public Task StartAsync(CancellationToken ct) {_mux.GetSubscriber().Subscribe(_channel, (_, key) => _l1.Remove((string)key));return Task.CompletedTask;}public Task StopAsync(CancellationToken ct) => Task.CompletedTask;
}
8. 布隆过滤器与负缓存
- 布隆过滤器:按 租户/实体 维度构建,避免缓存穿透。
- 负缓存:对不存在的 Key 写入空标记(短 TTL)。
依赖 & using
// NuGet: StackExchange.Redis, Microsoft.Extensions.Caching.Abstractions
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using StackExchange.Redis;
using System.Text.Json;
统一序列化(JSON,生产可换 MessagePack)
public static class CacheSerExtensions
{public static byte[] ToBytes<T>(this T value) =>JsonSerializer.SerializeToUtf8Bytes(value);public static T? FromBytes<T>(this byte[]? bytes) =>bytes is null ? default : JsonSerializer.Deserialize<T>(bytes);
}
TTL 抖动扩展(用 Random.Shared)
public static class TtlJitterExtensions
{public static TimeSpan WithJitter(this TimeSpan ttl, double pct){var j = 1.0 + (Random.Shared.NextDouble() * 2 - 1) * pct; // ±pctreturn TimeSpan.FromMilliseconds(ttl.TotalMilliseconds * j);}
}
策略与缓存信封
public readonly record struct CacheEnvelope<T>(T Value, DateTimeOffset SoftExpireAt, DateTimeOffset HardExpireAt)
{public CacheEnvelope(T value, CachePolicy p) : this(value,DateTimeOffset.UtcNow + p.SoftTtl,DateTimeOffset.UtcNow + p.HardTtl) { }
}public sealed record CachePolicy(TimeSpan HardTtl,TimeSpan SoftTtl,double JitterPct,TimeSpan NegativeTtl,bool EnableSWR,bool EnableBloom);
RedisBloom 轻量客户端封装
public sealed class RedisBloomClient
{private readonly IDatabase _db;public RedisBloomClient(IConnectionMultiplexer mux) => _db = mux.GetDatabase();public async Task EnsureFilterAsync(string key, double errorRate = 0.01, long capacity = 1_000_000, int expansion = 2){if (await _db.KeyExistsAsync(key)) return;try{// BF.RESERVE key error_rate capacity [EXPANSION expansion]await _db.ExecuteAsync("BF.RESERVE", key, errorRate, capacity, "EXPANSION", expansion);}catch (RedisServerException ex) when (ex.Message.Contains("exists", StringComparison.OrdinalIgnoreCase)){// 过滤器已存在,无需处理}}public async Task<bool> MayExistAsync(string key, string item){var res = (long)await _db.ExecuteAsync("BF.EXISTS", key, item);return res == 1;}public Task AddAsync(string key, string item) =>_db.ExecuteAsync("BF.ADD", key, item);
}
关键服务:布隆 + 负缓存 + 正常缓存回写
- 顺序:先 Bloom → 再查负缓存 → 再查正缓存 → 回源 → 设置正缓存 / 写负缓存 → (存在时)把 ID 加入 Bloom
- Key 规范:
{prefix}:{tenant}:{entity}:{id}
;负缓存:neg
后缀;Bloom Keybf:{prefix}:{tenant}:{entity}
- 返回
T?
;null
代表 NotFound(与 Controller 层/业务约定)
public sealed class BloomNegativeCacheService
{private static readonly byte[] NegMarker = new byte[] { 1 };private readonly IDistributedCache _cache;private readonly IConnectionMultiplexer _mux;private readonly RedisBloomClient _bloom;private readonly string _prefix;public BloomNegativeCacheService(IDistributedCache cache,IConnectionMultiplexer mux,string keyPrefix = "myapp:prod"){_cache = cache;_mux = mux;_bloom = new RedisBloomClient(mux);_prefix = keyPrefix;}private string CacheKey(string tenant, string entity, string id) => $"{_prefix}:{tenant}:{entity}:{id}";private string NegKey(string key) => $"{key}:neg";private string BloomKey(string tenant, string entity) => $"bf:{_prefix}:{tenant}:{entity}";/// <summary>/// 读取带 Bloom/负缓存 的统一入口。/// loader: 回源方法(如 DB 查找),返回 null 代表不存在。/// </summary>public async Task<T?> GetByIdAsync<T>(string tenant,string entity,string id,Func<Task<T?>> loader,CachePolicy p,CancellationToken ct = default){var key = CacheKey(tenant, entity, id);var negKey = NegKey(key);var bfKey = BloomKey(tenant, entity);// 1) Bloom 预判(可选)if (p.EnableBloom){await _bloom.EnsureFilterAsync(bfKey);var mayExist = await _bloom.MayExistAsync(bfKey, id);if (!mayExist){// 一定不存在(在误判率内),直接返回 NotFoundreturn default;}}// 2) 负缓存检查(短 TTL 的 NotFound 标记)var negBytes = await _cache.GetAsync(negKey, ct);if (negBytes is not null){return default; // 命中负缓存:NotFound}// 3) 正常缓存var hitBytes = await _cache.GetAsync(key, ct);if (hitBytes is not null){// 与 SWR 体系兼容:存的是 CacheEnvelope<T>var env = hitBytes.FromBytes<CacheEnvelope<T>>();if (env is not null){// 给上层选择:直接返回值(是否检查 SoftTTL 由 SWR 装饰器处理)return env.Value;}// 若历史数据不是 Envelope,可兼容反序列化为 Tvar v2 = hitBytes.FromBytes<T>();if (v2 is not null) return v2;}// 4) 回源var dbVal = await loader();if (dbVal is null){// 写负缓存(短 TTL)await _cache.SetAsync(negKey, NegMarker,new DistributedCacheEntryOptions{AbsoluteExpirationRelativeToNow = p.NegativeTtl}, ct);return default;}// 5) 写正缓存(与 SWR 一致:写入 Envelope)var envFresh = new CacheEnvelope<T>(dbVal, p).ToBytes();await _cache.SetAsync(key, envFresh,new DistributedCacheEntryOptions{AbsoluteExpirationRelativeToNow = p.HardTtl.WithJitter(p.JitterPct)}, ct);// 6) Bloom 加入(让后续查询更快拒绝不存在分支;存在则持续巩固)if (p.EnableBloom){await _bloom.AddAsync(bfKey, id);}return dbVal;}
}
使用示例(Controller/Service 层)
// 假设从 DI 获取
// IDistributedCache _distributedCache
// IConnectionMultiplexer _mux
// var svc = new BloomNegativeCacheService(_distributedCache, _mux);// 示例策略(可从 ICachePolicyProvider 获取)
var policy = new CachePolicy(HardTtl: TimeSpan.FromMinutes(10),SoftTtl: TimeSpan.FromMinutes(2),JitterPct: 0.15,NegativeTtl: TimeSpan.FromSeconds(30),EnableSWR: true,EnableBloom: true);// 业务回源
Task<MyPost?> Loader(string id) => _db.Posts.FindAsync(id).AsTask(); // 示例// 调用
var post = await svc.GetByIdAsync<MyPost>(tenant: "t-hot",entity: "post",id: "100001",loader: () => Loader("100001"),p: policy,ct: HttpContext.RequestAborted);if (post is null) return NotFound();
// return Ok(post);
注意
MyPost
是你的实体类型。- 若你在更上层已经使用了 SWR 装饰器 +
CacheEnvelope<T>
,本文方法写入 Envelope 与之兼容;如果你只想写入原始对象,可将envFresh
改为dbVal.ToBytes()
。- 负缓存 Key 使用
:neg
后缀,TTL 请保持很短(如 10–60 秒),并且包含租户维度,避免越权缓存。
可选增强
- Single-Flight 合并回源:在
loader()
外再包一层单飞(引用计数实现),避免热点并发穿透 DB。 - 分布式锁(RedLock)保护 Bloom 首次 Ensure/大型重建 与SWR 背景刷新。
- 指标埋点:
bloom_may_exist_false_total
、neg_cache_hits_total
、origin_load_total
等。 - Bloom 预热/重建:批量把已有 ID 导入 Bloom;按日/周重建降低误判累积。
- 兼容 Pub/Sub/Streams 失效:写成功后通知 L1 失效(关键路径建议用 Streams,Pub/Sub 作“快通知”、Streams 作“兜底”)。
9. 写回(Write-Back)落地(选配)
- 结构:写入 → 写日志/队列(Redis Streams/Kafka/RabbitMQ)→ 后台批量落库 → 成功后刷新/失效缓存。
- 风险控制:崩溃恢复回放(XREADGROUP),幂等键/批次 ID,审计与重试。
Redis Streams 持久日志示例
// produce
await db.StreamAddAsync("write_log", new NameValueEntry[]{ new("op","inc-like"), new("tenant",t), new("postId",id), new("delta","1") });
// consume
await db.StreamReadGroupAsync("write_log", "workers", "w1", ">", count: 100);
10. 策略决策树
- 必须“读你所写”?是→ 写穿 + L1 失效广播 + 短 TTL;否→ ②
- 写入峰值 很高?是→ 写回 + 批写 + 回放审计;否→ 旁路 + SWR + 单飞
- 明显热点?是→ 单飞 + 预热 + 更长 SoftTTL;否→ 常规 TTL + 抖动
- 存在穿透风险?是→ 布隆 + 负缓存 + 参数白名单
11. “缓存治理中间件包”——ABP 模块设计
职责清单
- Key 规范/租户 TTL 策略(
ICachePolicyProvider
) - SWR 包装器(
ICacheSWR
) - 分布式锁(RedLock.net)
- 布隆 & 负缓存拦截器
- L1 失效广播(Redis Pub/Sub/Streams)
- OTel 指标与追踪埋点
模块骨架
using Volo.Abp.Modularity;
using Volo.Abp.Caching;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Caching.Distributed;
using StackExchange.Redis;
using RedLockNet.SERedis;[DependsOn(typeof(AbpCachingModule))]
public sealed class CacheGovernanceModule : AbpModule {public override void ConfigureServices(ServiceConfigurationContext ctx) {var svcs = ctx.Services;// Redis 连接svcs.AddSingleton<IConnectionMultiplexer>(_ =>ConnectionMultiplexer.Connect("localhost:6379"));// Redlock 分布式锁svcs.AddSingleton<IDistributedLockFactory>(sp => {var mux = sp.GetRequiredService<IConnectionMultiplexer>();return RedLockFactory.Create(new List<RedLockMultiplexer> { mux });});// 核心组件svcs.AddSingleton<ICachePolicyProvider, DefaultCachePolicyProvider>();svcs.AddSingleton<SingleFlight>();svcs.AddHostedService<L1InvalidationSubscriber>();svcs.AddSingleton<ICacheFacade, CacheFacade>(); // ABP 分布式缓存前缀svcs.AddOptions<Volo.Abp.Caching.AbpDistributedCacheOptions>().Configure(o => o.KeyPrefix = "myapp");}
}
12. 可观测性与 SLO
指标建议
cache_l1_hit_ratio
/cache_l2_hit_ratio
cache_swr_return_ratio
(SWR 返回占比)cache_singleflight_merge_rate
(合并率)cache_lock_wait_ms
(分布式锁等待)cache_origin_qps
(回源 QPS)cache_avalanche_guard_hits
/cache_penetration_blocked
13. 压测与验收(k6)
目标分布:Zipf 热点(θ≈0.8),并发 200–1000
场景:① 冷启动;② 热点同时失效(SWR+单飞验证);③ 空 Key 穿透攻击(布隆/负缓存验证)
验收线:回源 QPS 降低 ≥ 80%(相对基线);命中 p95 < 50ms / 回源 p95 < 200ms;雪崩模拟下无级联超时
k6 脚本(可直接运行)
import http from 'k6/http';
import { check, sleep } from 'k6';export let options = {scenarios: {cold_start: { executor: 'constant-vus', vus: 200, duration: '60s', startTime: '0s' },hotspot_expire: { executor: 'ramping-vus', startVUs: 100, stages: [{duration:'30s', target:800},{duration:'30s', target:0}], startTime: '70s' },penetration: { executor: 'constant-arrival-rate', rate: 500, timeUnit: '1s', duration: '30s', preAllocatedVUs: 300, startTime: '140s' }},thresholds: {http_req_duration: ['p(95)<2000'],'http_req_failed{scenario:penetration}': ['rate<0.01']}
};const TENANTS = ['t-hot','t-mid','t-cold'];
function zipf(n) {const r = Math.random(); const s = 0.8;return Math.floor(Math.pow(r, -1/(1+s))) % n;
}export default function () {const t = TENANTS[Math.floor(Math.random()\*TENANTS.length)];
const id = 100000 + zipf(5000);
const res = http.get(`http://localhost:5000/api/post/${t}/${id}`);
check(res, { '200': r => r.status === 200 || r.status===404 });
sleep(Math.random()\*0.05);
}
14. 常见坑与注意
- TTL 不分租户 → 热租户拖累冷租户
- L1/L2 TTL 颠倒(应 L1 < L2)
- 负缓存未绑定租户/权限 → 越权命中
- 写回队列未持久化 → 崩溃丢数据(请用 Streams/消息队列)
- SWR 后台刷新无单飞/锁 → 反而雪崩
- Pub/Sub 至多一次 → 关键失效建议 Streams/补偿扫描。