当前位置: 首页 > news >正文

ABP VNext + RediSearch:微服务级全文检索

ABP VNext + RediSearch:微服务级全文检索 🚀


📚 目录

  • ABP VNext + RediSearch:微服务级全文检索 🚀
    • 📚 一、背景与动机 🚀
    • 🛠️ 二、环境与依赖 🐳
      • 2.1 Docker Compose 启动 Redis Stack
      • 2.2 Kubernetes 部署(示例 Manifest)
      • 2.3 ABP VNext & NuGet 包
    • 🏗️ 三、架构与流程图 🏗️
    • 🔧 四、索引模型与依赖注入 🔧
      • 4.1 模型定义
      • 4.2 服务注册
    • 🛠️ 五、IndexService & SearchService 实现 🛠️
      • 接口
      • `RedisOmIndexService`
      • `RedisOmSearchService`
    • ⚙️ 六、数据同步策略 🔄
      • 6.1 EF Core 批量扩展
      • 6.2 实时新增/更新/删除
      • 6.3 批量重建:`RebuildIndexJob`
    • 📄 七、复杂查询示例 🔍
    • 📊 八、性能对比测试示例脚本 📈
    • 🚦 九、生产最佳实践 & 陷阱提示 ⚠️
    • 📂 参考资料 📚


✨ TL;DR

  • 🚀 利用 Redis Stack(内置 RediSearch)+ Redis.OM,在 ABP VNext 微服务中实现毫秒级全文检索
  • 🐳 Docker Compose & 🎯 Kubernetes Manifest:持久化、ACL 认证、RedisInsight 可视化
  • 🏷️ 全局 Prefix + 动态 IndexName,完美隔离多租户索引与数据
  • 🔄 完整功能:索引创建/删除/写入/批量、实时/删除同步、批量重建、Polly 重试
  • 🔍 支持全文、Tag、数值、地理、Facet 聚合;📈 性能对比 PostgreSQL LIKE/FTS vs. RediSearch
  • 🔒 生产建议:AOF/RDB、ACL、Pre-commit/SAST、监控 & 慢查询、Testcontainers 集成测试

📚 一、背景与动机 🚀

传统关系型数据库全文检索(LIKE '%关键词%' 或 FTS)在微服务、高并发场景下常遇:

  • 性能瓶颈:百万级文档延时 100+ ms;
  • 功能受限:地理半径、Facet 聚合需自研;
  • 扩展复杂:分片与高可用运维成本高。

RediSearch 基于内存倒排索引,支持次毫秒级响应实时更新地理 & 聚合,完美契合高吞吐、低延迟检索需求。


🛠️ 二、环境与依赖 🐳

2.1 Docker Compose 启动 Redis Stack

version: "3.8"
services:redis:image: redis/redis-stack:latestcontainer_name: redis-stackports:- "6379:6379"- "8001:8001"volumes:- redis-data:/datacommand:- redis-server- --requirepass YourStrong!Pass- --appendonly yes
volumes:redis-data:
  • 🔐 安全--requirepass 强制认证
  • 💾 持久化--appendonly yes 开启 AOF
  • 🔍 GUI:访问 http://localhost:8001 使用 RedisInsight

提示:Docker Compose v3 下资源限制字段无效,如需限内存请用 Swarm 或 CLI 参数 --memory

docker-compose up -d

2.2 Kubernetes 部署(示例 Manifest)

apiVersion: apps/v1
kind: Deployment
metadata:name: redis-stack
spec:replicas: 1selector: { matchLabels: { app: redis-stack } }template:metadata: { labels: { app: redis-stack } }spec:containers:- name: redisimage: redis/redis-stack:latestargs: ["redis-server", "--requirepass", "YourStrong!Pass", "--appendonly", "yes"]ports:- containerPort: 6379- containerPort: 8001volumeMounts:- mountPath: /dataname: redis-datavolumes:- name: redis-datapersistentVolumeClaim:claimName: redis-pvc
---
apiVersion: v1
kind: Service
metadata:name: redis-stack
spec:type: ClusterIPports:- port: 6379- port: 8001selector:app: redis-stack

2.3 ABP VNext & NuGet 包

dotnet add package Redis.OM
dotnet add package StackExchange.Redis
dotnet add package Volo.Abp.Caching.StackExchangeRedis
dotnet add package Polly

appsettings.json

{"Abp": {"DistributedCache": {"Redis": {"Configuration": "localhost:6379,password=YourStrong!Pass,allowAdmin=true","InstanceName": "MyApp:"}}}
}

🏗️ 三、架构与流程图 🏗️

服务端
SaveChanges
领域事件
Upsert/Delete
BulkInsert
SearchAsync
FT.SEARCH
API/ApplicationService
EF Core → PostgreSQL
RediSearch 索引 ← Redis.OM
DataSyncHandler
RebuildIndexJob
前端

🔧 四、索引模型与依赖注入 🔧

4.1 模型定义

using Redis.OM.Modeling;[Document(IndexName = "product-idx")]  // 基础索引名
public class ProductIndex
{[RedisIdField]                   // 主键public string Id { get; set; }[Searchable]                     // 全文public string Name { get; set; }[Indexed(IsTag = true)]          // Tagpublic string Category { get; set; }[Indexed(IsSortable = true)]     // 数值/排序public decimal Price { get; set; }[Indexed(IsGeo = true)]          // 地理public GeoLoc Location { get; set; }
}

4.2 服务注册

public override void ConfigureServices(ServiceConfigurationContext context)
{// 1. ABP Redis 缓存context.Services.AddStackExchangeRedisCache(options => {});// 2. ConnectionMultiplexercontext.Services.AddSingleton(sp =>ConnectionMultiplexer.Connect(sp.GetRequiredService<IConfiguration>().GetSection("Abp:DistributedCache:Redis:Configuration").Value));// 3. Redis.OM Providercontext.Services.AddSingleton(sp =>{var mux      = sp.GetRequiredService<ConnectionMultiplexer>();var tenantId = sp.GetService<ICurrentTenant>()?.GetId()?.ToString() ?? "global";return new RedisConnectionProvider(new RedisConnectionProviderOptions{RedisConnection = mux,Prefix          = $"tenant:{tenantId}:"});});// 4. 注入索引/搜索服务context.Services.AddTransient<IIndexService, RedisOmIndexService>();context.Services.AddTransient<ISearchService, RedisOmSearchService>();
}

注意Prefix 仅对文档 HashKey 生效,不会自动修改 FT.CREATE 的索引名。若需隔离多租户索引,需在 CreateIndexAsync/DropIndexAsync 中手动拼接:

var indexName = $"{prefix}product-idx";

🛠️ 五、IndexService & SearchService 实现 🛠️

接口

public interface IIndexService
{Task CreateIndexAsync<T>() where T : class;Task DropIndexAsync<T>()   where T : class;Task UpsertAsync<T>(T doc) where T : class;Task DeleteAsync<T>(string id) where T : class;Task BulkInsertAsync<T>(IEnumerable<T> docs) where T : class;
}public interface ISearchService
{Task<SearchResult<T>> SearchAsync<T>(string query, int skip = 0, int take = 20) where T : class;Task<SearchResult<T>> SearchAsync<T>(SearchDefinition def) where T : class;
}

RedisOmIndexService

public class RedisOmIndexService : IIndexService
{private readonly RedisConnectionProvider _prov;private readonly IDatabase _db;private readonly string _prefix;public RedisOmIndexService(RedisConnectionProvider prov,ConnectionMultiplexer mux){_prov   = prov;_db     = mux.GetDatabase();_prefix = prov.Prefix;  // 如 "tenant:1:"}public Task CreateIndexAsync<T>() where T : class{var baseIdx = _prov.RedisCollection<T>().IndexName;var idxName = $"{_prefix}{baseIdx}";// 使用 Redis.OM 默认 schemareturn _db.ExecuteAsync("FT.CREATE",idxName, "ON", "HASH","PREFIX", "1", $"{_prefix}{typeof(T).Name.ToLowerInvariant()}:","SCHEMA", /* ... schema args ... */);}public async Task DropIndexAsync<T>() where T : class{var idxName = $"{_prefix}{_prov.RedisCollection<T>().IndexName}";var rl      = (RedisResult[])await _db.ExecuteAsync("FT._LIST");var list    = rl.Select(r => (string)r).ToArray();if (list.Contains(idxName))await _db.ExecuteAsync("FT.DROPINDEX", idxName, "DD");}public Task UpsertAsync<T>(T doc) where T : class=> _prov.RedisCollection<T>().InsertAsync(doc);public Task DeleteAsync<T>(string id) where T : class=> _prov.RedisCollection<T>().DeleteAsync(id);public async Task BulkInsertAsync<T>(IEnumerable<T> docs) where T : class{// 限制并发,防止瞬时打垮 Redisusing var sem = new SemaphoreSlim(50);var tasks = docs.Select(async d =>{await sem.WaitAsync();try { await _prov.RedisCollection<T>().InsertAsync(d); }finally { sem.Release(); }});await Task.WhenAll(tasks);}
}

RedisOmSearchService

public class RedisOmSearchService : ISearchService
{private readonly RedisConnectionProvider _prov;public RedisOmSearchService(RedisConnectionProvider prov) => _prov = prov;public async Task<SearchResult<T>> SearchAsync<T>(string query, int skip = 0, int take = 20) where T : class{var col = _prov.RedisCollection<T>();var res = await col.SearchAsync(new SearchDefinition(query).Limit(skip, take));return new SearchResult<T>{Items = res.Documents.Select(d => d.Object).ToList(),Total = res.TotalResults};}public async Task<SearchResult<T>> SearchAsync<T>(SearchDefinition def) where T : class{var col = _prov.RedisCollection<T>();var res = await col.SearchAsync(def);return new SearchResult<T>{Items = res.Documents.Select(d => d.Object).ToList(),Total = res.TotalResults};}
}

⚙️ 六、数据同步策略 🔄

6.1 EF Core 批量扩展

public static class IQueryableExtensions
{public static async IAsyncEnumerable<List<T>> BatchAsync<T>(this IQueryable<T> source, int size){var total = await source.CountAsync();for (int i = 0; i < total; i += size)yield return await source.Skip(i).Take(size).ToListAsync();}
}

6.2 实时新增/更新/删除

// 新增/更新
public class ProductChangedHandler: ILocalEventHandler<EntityChangedEventData<Product>>
{private readonly IIndexService _idx;private readonly AsyncPolicy _retry = Policy.Handle<Exception>().WaitAndRetryAsync(new[]{TimeSpan.FromMilliseconds(50),TimeSpan.FromMilliseconds(100)});public ProductChangedHandler(IIndexService idx) => _idx = idx;public async Task HandleEventAsync(EntityChangedEventData<Product> e){var doc = new ProductIndex {Id       = e.Entity.Id.ToString(),Name     = e.Entity.Name,Category = e.Entity.Category,Price    = e.Entity.Price,Location = new GeoLoc(e.Entity.Lat, e.Entity.Lng)};await _retry.ExecuteAsync(() => _idx.UpsertAsync(doc));}
}// 删除
public class ProductDeletedHandler: ILocalEventHandler<EntityDeletedEventData<Product>>
{private readonly IIndexService _idx;public ProductDeletedHandler(IIndexService idx) => _idx = idx;public Task HandleEventAsync(EntityDeletedEventData<Product> e)=> _idx.DeleteAsync<ProductIndex>(e.EntityId.ToString());
}

6.3 批量重建:RebuildIndexJob

public class RebuildIndexJob : IBackgroundJob
{private readonly IRepository<Product, Guid> _repo;private readonly IIndexService _idx;public RebuildIndexJob(IRepository<Product, Guid> repo, IIndexService idx){_repo = repo; _idx = idx;}public async Task ExecuteAsync(){await _idx.DropIndexAsync<ProductIndex>();await _idx.CreateIndexAsync<ProductIndex>();var q = _repo.WithDetails().Select(p => new ProductIndex {Id       = p.Id.ToString(),Name     = p.Name,Category = p.Category,Price    = p.Price,Location = new GeoLoc(p.Lat, p.Lng)});await foreach (var batch in q.BatchAsync(500))await _idx.BulkInsertAsync(batch);}
}

📄 七、复杂查询示例 🔍

// 1. 简单全文
var r1 = await _search.SearchAsync<ProductIndex>("\"wireless headphones\"", 0, 20);// 2. Tag + Range + Geo + 排序
var def = new SearchDefinition().FilterByTag(nameof(ProductIndex.Category), "Audio").FilterByRange(nameof(ProductIndex.Price), 50, 200).FilterByGeo(nameof(ProductIndex.Location), lat, lng, 10).OrderByDescending(nameof(ProductIndex.Price)).Limit(0, 20);
var r2 = await _search.SearchAsync<ProductIndex>(def);// 3. Facet 聚合
var fdef = new SearchDefinition("headphones").AddFacet(nameof(ProductIndex.Category));
var agg = await _search.SearchAsync<ProductIndex>(fdef);

📊 八、性能对比测试示例脚本 📈

public async Task TestPerformanceAsync()
{var db  = new MyAppDbContext();var sw  = new Stopwatch();var idx = _search;sw.Start();await db.Products.Where(p => EF.Functions.Like(p.Name, "%headphones%")).ToListAsync();Console.WriteLine($"SQL LIKE: {sw.ElapsedMilliseconds} ms");sw.Restart();await db.Products.Where(p => EF.Functions.ToTsVector("english", p.Name).Matches(EF.Functions.PlainToTsQuery("english", "headphones"))).ToListAsync();Console.WriteLine($"PostgreSQL FTS: {sw.ElapsedMilliseconds} ms");sw.Restart();await idx.SearchAsync<ProductIndex>("headphones");Console.WriteLine($"RediSearch: {sw.ElapsedMilliseconds} ms");
}

🚦 九、生产最佳实践 & 陷阱提示 ⚠️

  1. 持久化 & 安全

    • --appendonly yes + 挂载 /data
    • ACL/requirepass + 客户端配置密码;
  2. 多租户索引隔离

    • Prefix 仅对文档 Key 生效;

    • 手动拼接索引名:

      var idxName = $"{prefix}product-idx";
      
  3. 异常 & 重试

    • Polly 重试 + CancellationToken 超时;
  4. 监控 & 告警

    • FT.SLOWLOG、Redis slowlog;
    • RedisInsight/Prometheus Exporter;
  5. 安全扫描 & 质量

    • Pre-commitdotnet-format、StyleCop;
    • 依赖扫描:OWASP Dependency-Check;
    • SAST:GitHub CodeQL/SonarQube;
  6. 集成测试

    • Testcontainers 启动 Redis Stack,覆盖 CRUD/Search;

📂 参考资料 📚

  • Redis Stack
  • Redis OM .NET
  • ABP 文档
http://www.dtcms.com/a/266625.html

相关文章:

  • Java项目:基于SSM框架实现的在线投稿管理系统【ssm+B/S架构+源码+数据库+毕业论文】
  • 供应链管理:指标评估方式分类与详解
  • JVM 简介与作用
  • Unity HDRP + Azure IoT 的 Python 后端实现与集成方案
  • git教程-pycharm使用tag打标签
  • 云渲染时,电脑能关机吗?关键阶段操作指南
  • Android课程前言
  • Vue-19-前端框架Vue之应用基础组件通信(二)
  • Linux基本命令篇 —— uname命令
  • HarmonyOS学习记录3
  • 【技术架构解析】国产化双复旦微FPGA+飞腾D2000核心板架构
  • 「源力觉醒 创作者计划」_文心 4.5 开源模型玩出花——教育场景下 Scratch 积木自动化生成的部署实践与优化
  • 【算法刷题记录001】整型数组合并(java代码实现)
  • 转Go学习笔记
  • RT‑DETRv2 详解:Real‑Time DETR 的 Bag‑of‑Freebies 与部署优化
  • PNG图像压缩优化工具
  • 钉钉小程序开发技巧:getSystemInfo 系统信息获取全解析
  • IRIV算法详解 | 变量选择的迭代保留法
  • 全星稽核管理软件系统——企业智能化稽核管理的最佳解决方案
  • zxing去白边
  • 督皇口粮酱酒 平价不平质
  • 第十五节:第三部分:特殊文件:XML概述、解析
  • C语言中的输入输出函数:构建程序交互的基石
  • Linux的压缩与解压缩
  • WPF 右键菜单 MenuItem 绑定图片时只显示最后一个 Icon
  • OpenCV 相机标定中的畸变系数及调试硬件
  • 前端渲染大量图片的首屏加载优化方案
  • 刷题笔记--串联所有单词的子串
  • [附源码+数据库+毕业论文]基于Spring+MyBatis+MySQL+Maven+jsp实现的个人财务管理系统,推荐!
  • [附源码+数据库+毕业论文]基于Spring+MyBatis+MySQL+Maven+jsp实现的电影小说网站管理系统,推荐!