go-zero(十八)结合Elasticsearch实现高效数据检索
go-zero结合Elasticsearch实现高效数据检索
1. Elasticsearch简单介绍
Elasticsearch(简称 ES) 是一个基于 Lucene 库 构建的 分布式、开源、实时搜索与分析引擎,采用 Apache 2.0 协议。它支持水平扩展,能高效处理大规模数据的存储、搜索、分析和可视化,是 ELK 栈(Elasticsearch、Logstash、Kibana) 和 EFK 栈(Elasticsearch、Fluentd、Kibana) 的核心组件。
1.1核心特点
1. 分布式架构,天生可扩展
- 自动分片(Sharding):数据自动分片到多个节点,单个索引可拆分为多个分片(Shard),支持水平扩展,轻松处理 PB 级数据。
- 高可用性(Replication):每个分片支持多个副本(Replica),节点故障时自动切换,保证服务不中断。
- 去中心化:无主节点依赖(7.x 后引入选举机制),节点自动发现和管理,简化集群运维。
2. 实时搜索与分析
- 近实时(Near Real-Time):文档写入后默认 1 秒内可被搜索到,满足实时数据查询需求。
- 全文搜索能力:基于 Lucene 实现高效的全文索引,支持分词、模糊搜索、短语匹配、高亮显示等。
3. 灵活的数据模型
- JSON 文档存储:数据以 JSON 格式存储,支持动态映射(Dynamic Mapping),无需预定义严格 Schema(但推荐显式定义以优化性能)。
- 多数据类型支持:处理文本、数值、日期、地理位置、二进制等数据类型,支持复杂嵌套结构。
4. 强大的查询与聚合 DSL
- Query DSL(Domain-Specific Language):通过 JSON 格式的查询语言,支持布尔逻辑、范围查询、正则匹配、地理空间查询(如附近搜索)等。
- 聚合分析(Aggregation):支持分组(Terms Aggregation)、统计(Avg/Max/Min)、桶分析(Bucket Aggregation)、管道聚合(Pipeline Aggregation),用于数据统计和可视化。
5. 生态系统丰富
- 数据摄入:支持 Logstash(ETL 管道)、Beats(轻量级数据采集器)、Kafka 等数据源,以及直接通过 REST API 写入。
- 可视化与分析:集成 Kibana 实现仪表盘、图表、日志分析;支持与 Grafana、Tableau 等工具对接。
- 插件扩展:支持分词器(如中文分词 IK Analyzer)、安防(X-Pack Security)、机器学习(X-Pack ML)等插件。
6. 高性能与高可靠
- 倒排索引优化:基于 Lucene 的倒排索引,查询速度随数据量增长保持稳定。
- 分布式协调:通过集群节点自动负载均衡,支持自动故障转移和恢复。
1.2 应用场景
1. 日志管理与分析(最经典场景)
- 场景:收集、存储、分析海量日志(如服务器日志、应用日志、微服务日志)。
- 实现:通过 Logstash/Beats 采集日志,写入 ES,用 Kibana 进行日志检索、统计(如错误日志高频分析)、异常检测。
- 优势:秒级查询亿级日志,支持按时间、服务名、错误码等多维度过滤和聚合。
2. 电商搜索与推荐
- 场景:商品搜索(如淘宝、京东的搜索栏)、智能补全、相关商品推荐。
- 功能:支持商品名称全文搜索、价格范围筛选、品牌 / 类目过滤、销量排序,结合地理位置搜索附近门店。
- 技术点:分词器优化(如中文分词)、拼音搜索(解决输入错误)、搜索相关性排序(BM25 算法)。
3. 企业级搜索(内部知识库、文档检索)
- 场景:企业内部文档搜索(如 Confluence、SharePoint 集成)、代码搜索、法律合同检索。
- 优势:支持多语言文本处理、文档元数据过滤(如作者、创建时间)、权限控制(通过 X-Pack Security)。
4. 实时数据分析与仪表盘
- 场景:业务指标监控(如网站 PV/UV、订单量实时统计)、用户行为分析(漏斗模型、留存率)。
- 实现:将业务数据实时写入 ES,通过 Kibana 生成动态仪表盘,支持下钻分析(Drill Down)和预警通知。
5. 地理空间分析
- 场景:物流轨迹追踪、共享单车位置查询、疫情传播热力图。
- 功能:支持地理坐标(Geo-point)、地理区域(Geo-rectangle)、地理距离(Distance Query)查询,结合聚合生成热力图。
6. 安全与 SIEM(安全信息与事件管理)
- 场景:网络安全日志分析、入侵检测、合规审计。
- 实现:采集防火墙、WAF、IDS 等设备的日志,通过 ES 进行关联分析(如多维度事件关联)、异常流量检测。
7. 监控与运维(APM、基础设施监控)
- 场景:应用性能监控(APM)、服务器指标(CPU / 内存 / 磁盘)监控、微服务链路追踪。
- 工具链:结合 Elastic APM 采集指标数据,用 ES 存储,Kibana 可视化服务调用链、慢查询定位。
2. 环境部署
2.1部署 elasticsearch 和kibana
注意:elasticsearch 和kibana 版本号尽量一致, elasticsearch用8.x版本,kibana 也要用8.x版本,不然无法一起使用
创建 docker-compose.yml
文件:
version: '3'
services:elasticsearch:container_name: elasticsearchimage: bitnami/elasticsearch:8.9.0environment:- TZ=Asia/Shanghai- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"privileged: trueports:- "9200:9200"restart: alwaysnetworks:- go_zero_netkibana:container_name: kibanaimage: bitnami/kibana:8.9.0restart: alwaysenvironment:- TZ=Asia/Shanghai- I18N_LOCALE=zh-CN- ELASTICSEARCH_HOSTS=http://elasticsearch:9200 # 修正服务名引用ports:- "5601:5601"depends_on:- elasticsearch # 显式声明依赖networks:- go_zero_netnetworks:go_zero_net:driver: bridgevolumes:esdata:driver: local
环境部署完成后,验证 Elasticsearch 是否启动成功:
curl http://localhost:9200
应该会返回类似以下的 JSON 响应:
{"name" : "d95962b2abfe","cluster_name" : "elasticsearch","cluster_uuid" : "TtO-vhldRGmlrZ6U1cIgQw","version" : {"number" : "8.9.0","build_flavor" : "default","build_type" : "tar","build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d","build_date" : "2023-07-19T14:43:58.555259655Z","build_snapshot" : false,"lucene_version" : "9.7.0","minimum_wire_compatibility_version" : "7.17.0","minimum_index_compatibility_version" : "7.0.0"},"tagline" : "You Know, for Search"
}
使用浏览器访问 http://localhost:5601/
如果访问成功,说明kibana正常启动
3.项目构建
3.1 编写 API 文件
创建 api/search/search.api
文件,定义搜索服务 API:
syntax = "v1"type (SearchRequest {Keyword string `json:"keyword"`Page int `json:"page,optional,default=1"`PageSize int `json:"pageSize,optional,default=10"`Category string `json:"category,optional"`MinPrice float64 `json:"minPrice,optional"`MaxPrice float64 `json:"maxPrice,optional"`SortField string `json:"sortField,optional"`SortOrder string `json:"sortOrder,optional,options=asc|desc"`}ProductItem {ID string `json:"id"`Name string `json:"name"`Description string `json:"description"`Price float64 `json:"price"`Category string `json:"category"`Tags []string `json:"tags"`CreatedAt int64 `json:"createdAt"`}SearchResponse {Total int64 `json:"total"`Products []ProductItem `json:"products"`}IndexProductRequest {Product ProductItem `json:"product"`}IndexProductResponse {Success bool `json:"success"`Message string `json:"message,optional"`}deleteProductRequest {ID string `json:"id"`}
)service search-api {@handler SearchProductspost /api/search/products (SearchRequest) returns (SearchResponse)@handler IndexProductpost /api/products/index (IndexProductRequest) returns (IndexProductResponse)@handler DeleteProductpost /api/delete/products (deleteProductRequest) returns (IndexProductResponse)
}
这个 API 定义文件包含了三个主要 API :
- 搜索商品
- 索引(创建/更新)商品
- 删除商品
现在,切换到项目目录,使用 goctl 工具根据 API 定义生成代码:
# 生成代码
goctl api go -api api/search/search.api -dir .
3.2 封装 Elasticsearch 服务
接下来,我们需要将 Elasticsearch 集成到生成的代码中。
安装 Elasticsearch Go 客户端,因为我们环境部署的是8.X版本,所以这里go-elasticsearch也选择v8版本
go get github.com/elastic/go-elasticsearch/v8
添加 Elasticsearch 配置
修改 internal/config/config.go
文件,添加 Elasticsearch 配置:
package configimport ("github.com/zeromicro/go-zero/rest"
)type Config struct {rest.RestConfElasticsearch struct {Addresses []stringUsername stringPassword string}
}
同时,修改 etc/search-api.yaml
配置文件,添加 Elasticsearch 配置:
Name: search-api
Host: 0.0.0.0
Port: 8888Elasticsearch:Addresses:- http://localhost:9200Username: ""Password: ""
创建 internal/pkg/es/es.go
文件,实现 Elasticsearch 客户端的封装:
package esimport ("context""encoding/json""errors""log""strings""github.com/elastic/go-elasticsearch/v8""github.com/elastic/go-elasticsearch/v8/esapi"
)// ElasticsearchClient 封装ES客户端
type ElasticsearchClient struct {client *elasticsearch.Client
}// NewElasticsearchClient 创建新的ES客户端
func NewElasticsearchClient(addresses []string, username, password string) (*ElasticsearchClient, error) {cfg := elasticsearch.Config{Addresses: addresses,Username: username,Password: password,}client, err := elasticsearch.NewClient(cfg)if err != nil {return nil, err}// 测试连接res, err := client.Info()if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {return nil, errors.New("Elasticsearch connection failed")}return &ElasticsearchClient{client: client,}, nil
}// CreateIndex 创建索引
func (e *ElasticsearchClient) CreateIndex(index string, mapping string) error {res, err := e.client.Indices.Create(index,e.client.Indices.Create.WithBody(strings.NewReader(mapping)),)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to create index")}return nil
}// IndexExists 检查索引是否存在
func (e *ElasticsearchClient) IndexExists(index string) (bool, error) {res, err := e.client.Indices.Exists([]string{index})if err != nil {return false, err}defer res.Body.Close()return res.StatusCode == 200, nil
}// IndexDocument 索引单个文档
func (e *ElasticsearchClient) IndexDocument(index, id string, document interface{}) error {data, err := json.Marshal(document)if err != nil {return err}req := esapi.IndexRequest{Index: index,DocumentID: id,Body: strings.NewReader(string(data)),Refresh: "true",}res, err := req.Do(context.Background(), e.client)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to index document")}return nil
}// Search 执行搜索请求
func (e *ElasticsearchClient) Search(index string, query map[string]interface{}) (map[string]interface{}, error) {var buf strings.Builderif err := json.NewEncoder(&buf).Encode(query); err != nil {return nil, err}res, err := e.client.Search(e.client.Search.WithContext(context.Background()),e.client.Search.WithIndex(index),e.client.Search.WithBody(strings.NewReader(buf.String())),e.client.Search.WithTrackTotalHits(true),)if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {return nil, err}log.Printf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])return nil, errors.New("search error")}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {return nil, err}return r, nil
}// DeleteDocument 删除文档
func (e *ElasticsearchClient) DeleteDocument(index, id string) error {req := esapi.DeleteRequest{Index: index,DocumentID: id,Refresh: "true",}res, err := req.Do(context.Background(), e.client)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to delete document")}return nil
}// BulkIndex 批量索引文档
func (e *ElasticsearchClient) BulkIndex(index string, documents []map[string]interface{}) error {var buf strings.Builderfor _, doc := range documents {// 获取文档IDid, ok := doc["id"].(string)if !ok {id = ""}// 从文档中删除ID字段,避免重复delete(doc, "id")// 创建索引操作元数据meta := map[string]interface{}{"index": map[string]interface{}{"_index": index,"_id": id,},}// 将元数据写入缓冲区if err := json.NewEncoder(&buf).Encode(meta); err != nil {return err}// 将文档数据写入缓冲区if err := json.NewEncoder(&buf).Encode(doc); err != nil {return err}}// 执行批量请求res, err := e.client.Bulk(strings.NewReader(buf.String()), e.client.Bulk.WithIndex(index), e.client.Bulk.WithRefresh("true"))if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to bulk index documents")}return nil
}
3.3定义商品索引映射
创建 internal/model/product.go
文件,定义商品索引映射:
package modelconst (ProductIndex = "products"
)// ProductIndexMapping 商品索引映射
var ProductIndexMapping = `{"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": {"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding"]}}}},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text","analyzer": "text_analyzer","fields": {"keyword": {"type": "keyword"}}},"description": {"type": "text","analyzer": "text_analyzer"},"price": {"type": "double"},"category": {"type": "keyword"},"tags": {"type": "keyword"},"createdAt": {"type": "date","format": "epoch_millis"}}}
}`// Product 商品模型
type Product struct {ID string `json:"id"`Name string `json:"name"`Description string `json:"description"`Price float64 `json:"price"`Category string `json:"category"`Tags []string `json:"tags"`CreatedAt int64 `json:"createdAt"`
}
Mapping介绍
- Settings(索引设置)
"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": { ... }
}
-
分片设置:
number_of_shards
: 1 个主分片(适用于小规模或开发环境)。number_of_replicas
: 0 个副本(无冗余,生产环境建议至少设为 1)。
-
分析器配置:
"analysis": {"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding"]}} }
- 文本分析器(
text_analyzer
):- 使用
standard
分词器(按词边界分词,适合大多数语言)。 - 应用
lowercase
过滤器(转为小写)和asciifolding
过滤器(将非 ASCII 字符转为等效 ASCII,如é
→e
)。
- 使用
- 文本分析器(
- Mappings(字段映射)
"mappings": {"properties": { ... }
}
- 字段类型与用途:
字段名 | 类型 | 用途与特点 |
---|---|---|
id | keyword | 精确匹配(如商品 ID),不分析,用于过滤、排序或聚合。 |
name | text | 全文搜索字段,使用 text_analyzer 处理(支持分词、小写和 ASCII 折叠)。 |
.keyword | 子字段,保留原始值,用于精确匹配(如聚合品牌名)。 | |
description | text | 长文本描述,同样使用 text_analyzer 进行全文搜索。 |
price | double | 浮点型数值,支持范围查询(如 price > 100 )。 |
category | keyword | 分类标签(如 “electronics”),用于过滤和聚合。 |
tags | keyword | 标签数组(如 ["popular", "new"] ),支持多值精确匹配。 |
createdAt | date | 日期类型(Unix 毫秒时间戳),支持范围查询(如按时间筛选)。 |
3.4 扩展 ServiceContext
修改 internal/svc/servicecontext.go
文件,添加 Elasticsearch 客户端:
package svcimport ("go-zero-es-demo/internal/config""go-zero-es-demo/internal/pkg/es"
)type ServiceContext struct {Config config.ConfigEsClient *es.ElasticsearchClient
}func NewServiceContext(c config.Config) *ServiceContext {esClient, err := es.NewElasticsearchClient(c.Elasticsearch.Addresses,c.Elasticsearch.Username,c.Elasticsearch.Password,)if err != nil {panic(err)}return &ServiceContext{Config: c,EsClient: esClient,}
}
//定义索引初始化函数
func InitElasticsearch(client *es.ElasticsearchClient) error {// 检查商品索引是否存在exists, err := client.IndexExists(model.ProductIndex)if err != nil {return err}// 如果索引不存在,则创建if !exists {err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)if err != nil {return err}}return nil
}
3.5 实现搜索逻辑
修改 internal/logic/searchproductslogic.go
文件,实现商品搜索功能:
package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type SearchProductsLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContext
}func NewSearchProductsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchProductsLogic {return &SearchProductsLogic{Logger: logx.WithContext(ctx),ctx: ctx,svcCtx: svcCtx,}
}func (l *SearchProductsLogic) SearchProducts(req *types.SearchRequest) (resp *types.SearchResponse, err error) {// 构建搜索查询from := (req.Page - 1) * req.PageSizesize := req.PageSize// 基本查询结构query := map[string]interface{}{"from": from,"size": size,}// 构建搜索条件boolQuery := map[string]interface{}{}mustClauses := []map[string]interface{}{}// 关键词搜索if req.Keyword != "" {mustClauses = append(mustClauses, map[string]interface{}{"multi_match": map[string]interface{}{"query": req.Keyword,"fields": []string{"name^3", "description", "tags"},},})}// 分类过滤if req.Category != "" {mustClauses = append(mustClauses, map[string]interface{}{"term": map[string]interface{}{"category": req.Category,},})}// 价格范围过滤if req.MinPrice > 0 || req.MaxPrice > 0 {rangeQuery := map[string]interface{}{}if req.MinPrice > 0 {rangeQuery["gte"] = req.MinPrice}if req.MaxPrice > 0 {rangeQuery["lte"] = req.MaxPrice}mustClauses = append(mustClauses, map[string]interface{}{"range": map[string]interface{}{"price": rangeQuery,},})}// 添加bool查询if len(mustClauses) > 0 {boolQuery["must"] = mustClausesquery["query"] = map[string]interface{}{"bool": boolQuery,}} else {query["query"] = map[string]interface{}{"match_all": map[string]interface{}{},}}// 排序if req.SortField != "" {order := "asc"if req.SortOrder == "desc" {order = "desc"}query["sort"] = []map[string]interface{}{{req.SortField: map[string]interface{}{"order": order,},},}}// 执行搜索请求result, err := l.svcCtx.EsClient.Search(model.ProductIndex, query)if err != nil {return nil, err}// 解析搜索结果total := int64(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))hits := result["hits"].(map[string]interface{})["hits"].([]interface{})products := make([]types.ProductItem, 0, len(hits))for _, hit := range hits {source := hit.(map[string]interface{})["_source"].(map[string]interface{})// 提取标签数组tags := []string{}if tagsRaw, ok := source["tags"].([]interface{}); ok {for _, tag := range tagsRaw {if tagStr, ok := tag.(string); ok {tags = append(tags, tagStr)}}}product := types.ProductItem{ID: source["id"].(string),Name: source["name"].(string),Description: source["description"].(string),Price: source["price"].(float64),Category: source["category"].(string),Tags: tags,CreatedAt: int64(source["createdAt"].(float64)),}products = append(products, product)}return &types.SearchResponse{Total: total,Products: products,}, nil
}
3.6 实现索引和删除逻辑
修改 internal/logic/indexproductlogic.go
文件,实现商品索引功能:
package logicimport ("context""time""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type IndexProductLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContext
}func NewIndexProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *IndexProductLogic {return &IndexProductLogic{Logger: logx.WithContext(ctx),ctx: ctx,svcCtx: svcCtx,}
}func (l *IndexProductLogic) IndexProduct(req *types.IndexProductRequest) (resp *types.IndexProductResponse, err error) {// 如果未提供创建时间,则使用当前时间if req.Product.CreatedAt == 0 {req.Product.CreatedAt = time.Now().UnixMilli()}// 索引文档err = l.svcCtx.EsClient.IndexDocument(model.ProductIndex, req.Product.ID, req.Product)if err != nil {return &types.IndexProductResponse{Success: false,Message: "Failed to index product: " + err.Error(),}, nil}return &types.IndexProductResponse{Success: true,Message: "Product indexed successfully",}, nil
}
修改 internal/logic/deleteproductlogic.go
文件,实现商品删除功能:
package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type DeleteProductLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContext
}func NewDeleteProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteProductLogic {return &DeleteProductLogic{Logger: logx.WithContext(ctx),ctx: ctx,svcCtx: svcCtx,}
}func (l *DeleteProductLogic) DeleteProduct(req *types.DeleteProductRequest) (resp *types.IndexProductResponse, err error) {// todo: add your logic here and delete this line// 删除文档err = l.svcCtx.EsClient.DeleteDocument(model.ProductIndex, req.ID)if err != nil {return &types.IndexProductResponse{Success: false,Message: "Failed to delete product: " + err.Error(),}, nil}return &types.IndexProductResponse{Success: true,Message: "Product deleted successfully",}, nil
}
最后,修改主程序入口 search.go
文件,添加 Elasticsearch 索引初始化逻辑:
func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)// 初始化 Elasticsearch 索引if err := svc.InitElasticsearch(ctx.EsClient); err != nil {panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))}handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}
4. 项目测试
现在开始启动项目来测试
go run search.go
测试添加索引,我这里只粘贴了一个,其他数据自行补充:
curl --location '127.0.0.1:8888/api/products/index' \
--header 'Content-Type: application/json' \
--data '{"product":{"id": "1","name": "陶瓷马克杯","description": "简约北欧风格陶瓷马克杯,采用高温烧制,釉面光滑易清洗,适合日常咖啡或茶饮。","price": 39.9,"category": "家居","tags": ["厨房用品", "创意礼品", "陶瓷制品"],"createdAt": 1689321456}
}
运行结果:
{"success": true,"message": "Product indexed successfully"
}
测试索引搜索
curl --location '127.0.0.1:8888/api/search/products' \
--header 'Content-Type: application/json' \
--data '{"keyword" :"面包"
}
'
得到以下类似的结果:
{"total": 4,"products": [{"id": "5","name": "全麦面包","description": "无添加全麦面包,富含膳食纤维和蛋白质,口感松软,适合早餐搭配果酱或奶酪食用。","price": 19.8,"category": "食品","tags": ["健康食品","早餐必备","全麦谷物"],"createdAt": 1690001234},{"id": "4","name": "保湿面霜","description": "深层保湿面霜,含透明质酸和胶原蛋白成分,有效改善干燥肌肤,适合所有肤质日常护理。","price": 129,"category": "美妆","tags": ["护肤用品","保湿补水","温和配方"],"createdAt": 1687654321}]
}
测试索引删除
curl --location '127.0.0.1:8888/api/delete/products' \
--header 'Content-Type: application/json' \
--data '
{"id" :"666"
}'
5 功能拓展
完成基本功能后,我们可以添加一些高级功能,使搜索服务更加强大。
6.1 聚合查询
首先,在 API 定义文件 api/search/search.api
中添加新的类型和接口:
type (CategoryStat {Category string `json:"category"`Count int64 `json:"count"`AvgPrice float64 `json:"avgPrice"`MaxPrice float64 `json:"maxPrice"`MinPrice float64 `json:"minPrice"`}CategoryStatsResponse {CategoryStats []CategoryStat `json:"categoryStats"`}
)service search-api {// ...existing endpoints...@handler GetCategoryStatsget /api/stats/categories returns (CategoryStatsResponse)
}
使用 goctl 更新生成的代码:
goctl api go -api api/search/search.api -dir .
在 internal/pkg/es/es.go
中添加聚合查询方法:
// Aggregate 执行聚合查询
func (e *ElasticsearchClient) Aggregate(index string, query map[string]interface{}) (map[string]interface{}, error) {var buf strings.Builderif err := json.NewEncoder(&buf).Encode(query); err != nil {return nil, err}res, err := e.client.Search(e.client.Search.WithContext(context.Background()),e.client.Search.WithIndex(index),e.client.Search.WithBody(strings.NewReader(buf.String())),e.client.Search.WithSize(0), // 聚合查询通常不需要返回文档)if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {return nil, err}return nil, errors.New("aggregate error")}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {return nil, err}return r, nil
}
实现统计逻辑,在 internal/logic/getcategorystatslogic.go
文件中:
package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type GetCategoryStatsLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContext
}func NewGetCategoryStatsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCategoryStatsLogic {return &GetCategoryStatsLogic{Logger: logx.WithContext(ctx),ctx: ctx,svcCtx: svcCtx,}
}func (l *GetCategoryStatsLogic) GetCategoryStats() (resp *types.CategoryStatsResponse, err error) {// 构建聚合查询query := map[string]interface{}{"aggs": map[string]interface{}{"categories": map[string]interface{}{"terms": map[string]interface{}{"field": "category","size": 20,},"aggs": map[string]interface{}{"avg_price": map[string]interface{}{"avg": map[string]interface{}{"field": "price",},},"max_price": map[string]interface{}{"max": map[string]interface{}{"field": "price",},},"min_price": map[string]interface{}{"min": map[string]interface{}{"field": "price",},},},},},}// 执行聚合查询result, err := l.svcCtx.EsClient.Aggregate(model.ProductIndex, query)if err != nil {return nil, err}// 解析结果aggregations := result["aggregations"].(map[string]interface{})categories := aggregations["categories"].(map[string]interface{})buckets := categories["buckets"].([]interface{})stats := make([]types.CategoryStat, 0, len(buckets))for _, bucket := range buckets {b := bucket.(map[string]interface{})key := b["key"].(string)docCount := int64(b["doc_count"].(float64))avgPrice := b["avg_price"].(map[string]interface{})["value"].(float64)maxPrice := b["max_price"].(map[string]interface{})["value"].(float64)minPrice := b["min_price"].(map[string]interface{})["value"].(float64)stats = append(stats, types.CategoryStat{Category: key,Count: docCount,AvgPrice: avgPrice,MaxPrice: maxPrice,MinPrice: minPrice,})}return &types.CategoryStatsResponse{CategoryStats: stats,}, nil
}
运行测试
curl --location '127.0.0.1:8888/api/stats/categories'
会得到如下的类似的结果:
{"categoryStats": [{"category": "家居","count": 2,"avgPrice": 26.4,"maxPrice": 39.9,"minPrice": 12.9},{"category": "服饰","count": 1,"avgPrice": 59.99,"maxPrice": 59.99,"minPrice": 59.99},{"category": "电子","count": 1,"avgPrice": 89.5,"maxPrice": 89.5,"minPrice": 89.5},{"category": "美妆","count": 1,"avgPrice": 129,"maxPrice": 129,"minPrice": 129},{"category": "运动","count": 1,"avgPrice": 79.5,"maxPrice": 79.5,"minPrice": 79.5},{"category": "食品","count": 1,"avgPrice": 19.8,"maxPrice": 19.8,"minPrice": 19.8}]
}
6.2 同义词搜索
要启用同义词搜索,需要修改索引映射。首先,修改 internal/model/product.go
文件中的索引映射:
// 修改 ProductIndexMapping 变量
var ProductIndexMapping = `{"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": {"filter": {"synonym_filter": {"type": "synonym","synonyms": ["音响, 音箱, 音像","衣服, 服装, 服饰","首饰, 手饰, 饰品"]}},"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding", "synonym_filter"]}}}},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text","analyzer": "text_analyzer","fields": {"keyword": {"type": "keyword"}}},"description": {"type": "text","analyzer": "text_analyzer"},"price": {"type": "double"},"category": {"type": "keyword"},"tags": {"type": "keyword"},"createdAt": {"type": "date","format": "epoch_millis"}}}
}`
增加ynonym_filter
: 同义词过滤器,定义了3组同义词:
"音响, 音箱, 音像",
"衣服, 服装, 服饰",
"首饰, 手饰, 饰品"
当修改索引映射后,需要重建索引。因此在 internal/pkg/es/es.go
中添加删除索引的方法:
// DeleteIndex 删除索引
func (e *ElasticsearchClient) DeleteIndex(index string) error {res, err := e.client.Indices.Delete([]string{index})if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to delete index")}return nil
}
在 /svc/servicecontext.go
中添加更新映射的逻辑:
// UpdateElasticsearchIndex 更新索引映射(需要重建索引)
func UpdateElasticsearchIndex(client *es.ElasticsearchClient) error {// 检查索引是否存在exists, err := client.IndexExists(model.ProductIndex)if err != nil {return err}// 如果索引已存在,则删除并重建if exists {// 获取原索引的所有文档query := map[string]interface{}{"query": map[string]interface{}{"match_all": map[string]interface{}{},},"size": 10000, // 注意:实际应用中应使用滚动API处理大量数据}result, err := client.Search(model.ProductIndex, query)if err != nil {return err}// 提取文档hits := result["hits"].(map[string]interface{})["hits"].([]interface{})documents := make([]map[string]interface{}, 0, len(hits))for _, hit := range hits {hitMap := hit.(map[string]interface{})source := hitMap["_source"].(map[string]interface{})id := hitMap["_id"].(string)// 确保文档有IDsource["id"] = iddocuments = append(documents, source)}// 删除索引err = client.DeleteIndex(model.ProductIndex)if err != nil {return err}// 创建新索引err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)if err != nil {return err}// 如果有文档,重新索引它们if len(documents) > 0 {err = client.BulkIndex(model.ProductIndex, documents)if err != nil {return err}}return nil}// 如果索引不存在,则创建return client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
}
修改mian函数:
func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)//更新并初始化索引err := svc.UpdateElasticsearchIndex(ctx.EsClient)if err != nil {return}/*// 初始化 Elasticsearch 索引if err := svc.InitElasticsearch(ctx.EsClient); err != nil {panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))}*/handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}
为了方便测试,我们增加几个新的数据:
[{"id": "1","name": "豪华音像","description": "","price": 999.99,"category": "数码","tags": ["smartphone", "popular", "new"],"createdAt": 1715788800000 // 2025-05-14},{"id": "2","name": "智能音箱","description": "续航长达18小时","price": 1199.99,"category": "数码","tags": ["laptop", "apple", "portable"],"createdAt": 1715702400000 // 2025-05-13},{"id": "3","name": "轻便音响","description": "","price": 1799.99,"category": "数码","tags": ["tv", "4k", "oled"],"createdAt": 1715616000000 // 2025-05-12}
]