当前位置: 首页 > news >正文

go-zero(十八)结合Elasticsearch实现高效数据检索

go-zero结合Elasticsearch实现高效数据检索

1. Elasticsearch简单介绍

Elasticsearch(简称 ES) 是一个基于 Lucene 库 构建的 分布式、开源、实时搜索与分析引擎,采用 Apache 2.0 协议。它支持水平扩展,能高效处理大规模数据的存储、搜索、分析和可视化,是 ELK 栈(Elasticsearch、Logstash、Kibana)EFK 栈(Elasticsearch、Fluentd、Kibana) 的核心组件。

1.1核心特点

1. 分布式架构,天生可扩展
  • 自动分片(Sharding):数据自动分片到多个节点,单个索引可拆分为多个分片(Shard),支持水平扩展,轻松处理 PB 级数据。
  • 高可用性(Replication):每个分片支持多个副本(Replica),节点故障时自动切换,保证服务不中断。
  • 去中心化:无主节点依赖(7.x 后引入选举机制),节点自动发现和管理,简化集群运维。
2. 实时搜索与分析
  • 近实时(Near Real-Time):文档写入后默认 1 秒内可被搜索到,满足实时数据查询需求。
  • 全文搜索能力:基于 Lucene 实现高效的全文索引,支持分词、模糊搜索、短语匹配、高亮显示等。
3. 灵活的数据模型
  • JSON 文档存储:数据以 JSON 格式存储,支持动态映射(Dynamic Mapping),无需预定义严格 Schema(但推荐显式定义以优化性能)。
  • 多数据类型支持:处理文本、数值、日期、地理位置、二进制等数据类型,支持复杂嵌套结构。
4. 强大的查询与聚合 DSL
  • Query DSL(Domain-Specific Language):通过 JSON 格式的查询语言,支持布尔逻辑、范围查询、正则匹配、地理空间查询(如附近搜索)等。
  • 聚合分析(Aggregation):支持分组(Terms Aggregation)、统计(Avg/Max/Min)、桶分析(Bucket Aggregation)、管道聚合(Pipeline Aggregation),用于数据统计和可视化。
5. 生态系统丰富
  • 数据摄入:支持 Logstash(ETL 管道)、Beats(轻量级数据采集器)、Kafka 等数据源,以及直接通过 REST API 写入。
  • 可视化与分析:集成 Kibana 实现仪表盘、图表、日志分析;支持与 Grafana、Tableau 等工具对接。
  • 插件扩展:支持分词器(如中文分词 IK Analyzer)、安防(X-Pack Security)、机器学习(X-Pack ML)等插件。
6. 高性能与高可靠
  • 倒排索引优化:基于 Lucene 的倒排索引,查询速度随数据量增长保持稳定。
  • 分布式协调:通过集群节点自动负载均衡,支持自动故障转移和恢复。

1.2 应用场景

1. 日志管理与分析(最经典场景)
  • 场景:收集、存储、分析海量日志(如服务器日志、应用日志、微服务日志)。
  • 实现:通过 Logstash/Beats 采集日志,写入 ES,用 Kibana 进行日志检索、统计(如错误日志高频分析)、异常检测。
  • 优势:秒级查询亿级日志,支持按时间、服务名、错误码等多维度过滤和聚合。
2. 电商搜索与推荐
  • 场景:商品搜索(如淘宝、京东的搜索栏)、智能补全、相关商品推荐。
  • 功能:支持商品名称全文搜索、价格范围筛选、品牌 / 类目过滤、销量排序,结合地理位置搜索附近门店。
  • 技术点:分词器优化(如中文分词)、拼音搜索(解决输入错误)、搜索相关性排序(BM25 算法)。
3. 企业级搜索(内部知识库、文档检索)
  • 场景:企业内部文档搜索(如 Confluence、SharePoint 集成)、代码搜索、法律合同检索。
  • 优势:支持多语言文本处理、文档元数据过滤(如作者、创建时间)、权限控制(通过 X-Pack Security)。
4. 实时数据分析与仪表盘
  • 场景:业务指标监控(如网站 PV/UV、订单量实时统计)、用户行为分析(漏斗模型、留存率)。
  • 实现:将业务数据实时写入 ES,通过 Kibana 生成动态仪表盘,支持下钻分析(Drill Down)和预警通知。
5. 地理空间分析
  • 场景:物流轨迹追踪、共享单车位置查询、疫情传播热力图。
  • 功能:支持地理坐标(Geo-point)、地理区域(Geo-rectangle)、地理距离(Distance Query)查询,结合聚合生成热力图。
6. 安全与 SIEM(安全信息与事件管理)
  • 场景:网络安全日志分析、入侵检测、合规审计。
  • 实现:采集防火墙、WAF、IDS 等设备的日志,通过 ES 进行关联分析(如多维度事件关联)、异常流量检测。
7. 监控与运维(APM、基础设施监控)
  • 场景:应用性能监控(APM)、服务器指标(CPU / 内存 / 磁盘)监控、微服务链路追踪。
  • 工具链:结合 Elastic APM 采集指标数据,用 ES 存储,Kibana 可视化服务调用链、慢查询定位。

2. 环境部署

2.1部署 elasticsearch 和kibana

注意:elasticsearch 和kibana 版本号尽量一致, elasticsearch用8.x版本,kibana 也要用8.x版本,不然无法一起使用

创建 docker-compose.yml 文件:

version: '3'
services:elasticsearch:container_name: elasticsearchimage: bitnami/elasticsearch:8.9.0environment:- TZ=Asia/Shanghai- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"privileged: trueports:- "9200:9200"restart: alwaysnetworks:- go_zero_netkibana:container_name: kibanaimage: bitnami/kibana:8.9.0restart: alwaysenvironment:- TZ=Asia/Shanghai- I18N_LOCALE=zh-CN- ELASTICSEARCH_HOSTS=http://elasticsearch:9200  # 修正服务名引用ports:- "5601:5601"depends_on:- elasticsearch  # 显式声明依赖networks:- go_zero_netnetworks:go_zero_net:driver: bridgevolumes:esdata:driver: local

环境部署完成后,验证 Elasticsearch 是否启动成功:

curl http://localhost:9200

应该会返回类似以下的 JSON 响应:

{"name" : "d95962b2abfe","cluster_name" : "elasticsearch","cluster_uuid" : "TtO-vhldRGmlrZ6U1cIgQw","version" : {"number" : "8.9.0","build_flavor" : "default","build_type" : "tar","build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d","build_date" : "2023-07-19T14:43:58.555259655Z","build_snapshot" : false,"lucene_version" : "9.7.0","minimum_wire_compatibility_version" : "7.17.0","minimum_index_compatibility_version" : "7.0.0"},"tagline" : "You Know, for Search"
}

使用浏览器访问 http://localhost:5601/ 如果访问成功,说明kibana正常启动

在这里插入图片描述

3.项目构建

3.1 编写 API 文件

创建 api/search/search.api 文件,定义搜索服务 API:

syntax = "v1"type (SearchRequest {Keyword   string  `json:"keyword"`Page      int     `json:"page,optional,default=1"`PageSize  int     `json:"pageSize,optional,default=10"`Category  string  `json:"category,optional"`MinPrice  float64 `json:"minPrice,optional"`MaxPrice  float64 `json:"maxPrice,optional"`SortField string  `json:"sortField,optional"`SortOrder string  `json:"sortOrder,optional,options=asc|desc"`}ProductItem {ID          string   `json:"id"`Name        string   `json:"name"`Description string   `json:"description"`Price       float64  `json:"price"`Category    string   `json:"category"`Tags        []string `json:"tags"`CreatedAt   int64    `json:"createdAt"`}SearchResponse {Total    int64         `json:"total"`Products []ProductItem `json:"products"`}IndexProductRequest {Product ProductItem `json:"product"`}IndexProductResponse {Success bool   `json:"success"`Message string `json:"message,optional"`}deleteProductRequest {ID string `json:"id"`}
)service search-api {@handler SearchProductspost /api/search/products (SearchRequest) returns (SearchResponse)@handler IndexProductpost /api/products/index (IndexProductRequest) returns (IndexProductResponse)@handler DeleteProductpost /api/delete/products (deleteProductRequest) returns (IndexProductResponse)
}

这个 API 定义文件包含了三个主要 API :

  • 搜索商品
  • 索引(创建/更新)商品
  • 删除商品

现在,切换到项目目录,使用 goctl 工具根据 API 定义生成代码:

# 生成代码
goctl api go -api api/search/search.api -dir .

3.2 封装 Elasticsearch 服务

接下来,我们需要将 Elasticsearch 集成到生成的代码中。

安装 Elasticsearch Go 客户端,因为我们环境部署的是8.X版本,所以这里go-elasticsearch也选择v8版本

go get github.com/elastic/go-elasticsearch/v8

添加 Elasticsearch 配置

修改 internal/config/config.go 文件,添加 Elasticsearch 配置:

package configimport ("github.com/zeromicro/go-zero/rest"
)type Config struct {rest.RestConfElasticsearch struct {Addresses []stringUsername  stringPassword  string}
}

同时,修改 etc/search-api.yaml 配置文件,添加 Elasticsearch 配置:

Name: search-api
Host: 0.0.0.0
Port: 8888Elasticsearch:Addresses:- http://localhost:9200Username: ""Password: ""

创建 internal/pkg/es/es.go 文件,实现 Elasticsearch 客户端的封装:

package esimport ("context""encoding/json""errors""log""strings""github.com/elastic/go-elasticsearch/v8""github.com/elastic/go-elasticsearch/v8/esapi"
)// ElasticsearchClient 封装ES客户端
type ElasticsearchClient struct {client *elasticsearch.Client
}// NewElasticsearchClient 创建新的ES客户端
func NewElasticsearchClient(addresses []string, username, password string) (*ElasticsearchClient, error) {cfg := elasticsearch.Config{Addresses: addresses,Username:  username,Password:  password,}client, err := elasticsearch.NewClient(cfg)if err != nil {return nil, err}// 测试连接res, err := client.Info()if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {return nil, errors.New("Elasticsearch connection failed")}return &ElasticsearchClient{client: client,}, nil
}// CreateIndex 创建索引
func (e *ElasticsearchClient) CreateIndex(index string, mapping string) error {res, err := e.client.Indices.Create(index,e.client.Indices.Create.WithBody(strings.NewReader(mapping)),)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to create index")}return nil
}// IndexExists 检查索引是否存在
func (e *ElasticsearchClient) IndexExists(index string) (bool, error) {res, err := e.client.Indices.Exists([]string{index})if err != nil {return false, err}defer res.Body.Close()return res.StatusCode == 200, nil
}// IndexDocument 索引单个文档
func (e *ElasticsearchClient) IndexDocument(index, id string, document interface{}) error {data, err := json.Marshal(document)if err != nil {return err}req := esapi.IndexRequest{Index:      index,DocumentID: id,Body:       strings.NewReader(string(data)),Refresh:    "true",}res, err := req.Do(context.Background(), e.client)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to index document")}return nil
}// Search 执行搜索请求
func (e *ElasticsearchClient) Search(index string, query map[string]interface{}) (map[string]interface{}, error) {var buf strings.Builderif err := json.NewEncoder(&buf).Encode(query); err != nil {return nil, err}res, err := e.client.Search(e.client.Search.WithContext(context.Background()),e.client.Search.WithIndex(index),e.client.Search.WithBody(strings.NewReader(buf.String())),e.client.Search.WithTrackTotalHits(true),)if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {return nil, err}log.Printf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])return nil, errors.New("search error")}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {return nil, err}return r, nil
}// DeleteDocument 删除文档
func (e *ElasticsearchClient) DeleteDocument(index, id string) error {req := esapi.DeleteRequest{Index:      index,DocumentID: id,Refresh:    "true",}res, err := req.Do(context.Background(), e.client)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to delete document")}return nil
}// BulkIndex 批量索引文档
func (e *ElasticsearchClient) BulkIndex(index string, documents []map[string]interface{}) error {var buf strings.Builderfor _, doc := range documents {// 获取文档IDid, ok := doc["id"].(string)if !ok {id = ""}// 从文档中删除ID字段,避免重复delete(doc, "id")// 创建索引操作元数据meta := map[string]interface{}{"index": map[string]interface{}{"_index": index,"_id":    id,},}// 将元数据写入缓冲区if err := json.NewEncoder(&buf).Encode(meta); err != nil {return err}// 将文档数据写入缓冲区if err := json.NewEncoder(&buf).Encode(doc); err != nil {return err}}// 执行批量请求res, err := e.client.Bulk(strings.NewReader(buf.String()), e.client.Bulk.WithIndex(index), e.client.Bulk.WithRefresh("true"))if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to bulk index documents")}return nil
}

3.3定义商品索引映射

创建 internal/model/product.go 文件,定义商品索引映射:

package modelconst (ProductIndex = "products"
)// ProductIndexMapping 商品索引映射
var ProductIndexMapping = `{"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": {"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding"]}}}},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text","analyzer": "text_analyzer","fields": {"keyword": {"type": "keyword"}}},"description": {"type": "text","analyzer": "text_analyzer"},"price": {"type": "double"},"category": {"type": "keyword"},"tags": {"type": "keyword"},"createdAt": {"type": "date","format": "epoch_millis"}}}
}`// Product 商品模型
type Product struct {ID          string   `json:"id"`Name        string   `json:"name"`Description string   `json:"description"`Price       float64  `json:"price"`Category    string   `json:"category"`Tags        []string `json:"tags"`CreatedAt   int64    `json:"createdAt"`
}

Mapping介绍

  1. Settings(索引设置)
"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": { ... }
}
  • 分片设置

    • number_of_shards: 1 个主分片(适用于小规模或开发环境)。
    • number_of_replicas: 0 个副本(无冗余,生产环境建议至少设为 1)。
  • 分析器配置

    "analysis": {"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding"]}}
    }
    
    • 文本分析器text_analyzer):
      • 使用 standard 分词器(按词边界分词,适合大多数语言)。
      • 应用 lowercase 过滤器(转为小写)和 asciifolding 过滤器(将非 ASCII 字符转为等效 ASCII,如 ée)。
  1. Mappings(字段映射)
"mappings": {"properties": { ... }
}
  • 字段类型与用途
字段名类型用途与特点
idkeyword精确匹配(如商品 ID),不分析,用于过滤、排序或聚合。
nametext全文搜索字段,使用 text_analyzer 处理(支持分词、小写和 ASCII 折叠)。
.keyword子字段,保留原始值,用于精确匹配(如聚合品牌名)。
descriptiontext长文本描述,同样使用 text_analyzer 进行全文搜索。
pricedouble浮点型数值,支持范围查询(如 price > 100)。
categorykeyword分类标签(如 “electronics”),用于过滤和聚合。
tagskeyword标签数组(如 ["popular", "new"]),支持多值精确匹配。
createdAtdate日期类型(Unix 毫秒时间戳),支持范围查询(如按时间筛选)。

3.4 扩展 ServiceContext

修改 internal/svc/servicecontext.go 文件,添加 Elasticsearch 客户端:

package svcimport ("go-zero-es-demo/internal/config""go-zero-es-demo/internal/pkg/es"
)type ServiceContext struct {Config   config.ConfigEsClient *es.ElasticsearchClient
}func NewServiceContext(c config.Config) *ServiceContext {esClient, err := es.NewElasticsearchClient(c.Elasticsearch.Addresses,c.Elasticsearch.Username,c.Elasticsearch.Password,)if err != nil {panic(err)}return &ServiceContext{Config:   c,EsClient: esClient,}
}
//定义索引初始化函数
func InitElasticsearch(client *es.ElasticsearchClient) error {// 检查商品索引是否存在exists, err := client.IndexExists(model.ProductIndex)if err != nil {return err}// 如果索引不存在,则创建if !exists {err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)if err != nil {return err}}return nil
}

3.5 实现搜索逻辑

修改 internal/logic/searchproductslogic.go 文件,实现商品搜索功能:

package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type SearchProductsLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewSearchProductsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchProductsLogic {return &SearchProductsLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *SearchProductsLogic) SearchProducts(req *types.SearchRequest) (resp *types.SearchResponse, err error) {// 构建搜索查询from := (req.Page - 1) * req.PageSizesize := req.PageSize// 基本查询结构query := map[string]interface{}{"from": from,"size": size,}// 构建搜索条件boolQuery := map[string]interface{}{}mustClauses := []map[string]interface{}{}// 关键词搜索if req.Keyword != "" {mustClauses = append(mustClauses, map[string]interface{}{"multi_match": map[string]interface{}{"query":  req.Keyword,"fields": []string{"name^3", "description", "tags"},},})}// 分类过滤if req.Category != "" {mustClauses = append(mustClauses, map[string]interface{}{"term": map[string]interface{}{"category": req.Category,},})}// 价格范围过滤if req.MinPrice > 0 || req.MaxPrice > 0 {rangeQuery := map[string]interface{}{}if req.MinPrice > 0 {rangeQuery["gte"] = req.MinPrice}if req.MaxPrice > 0 {rangeQuery["lte"] = req.MaxPrice}mustClauses = append(mustClauses, map[string]interface{}{"range": map[string]interface{}{"price": rangeQuery,},})}// 添加bool查询if len(mustClauses) > 0 {boolQuery["must"] = mustClausesquery["query"] = map[string]interface{}{"bool": boolQuery,}} else {query["query"] = map[string]interface{}{"match_all": map[string]interface{}{},}}// 排序if req.SortField != "" {order := "asc"if req.SortOrder == "desc" {order = "desc"}query["sort"] = []map[string]interface{}{{req.SortField: map[string]interface{}{"order": order,},},}}// 执行搜索请求result, err := l.svcCtx.EsClient.Search(model.ProductIndex, query)if err != nil {return nil, err}// 解析搜索结果total := int64(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))hits := result["hits"].(map[string]interface{})["hits"].([]interface{})products := make([]types.ProductItem, 0, len(hits))for _, hit := range hits {source := hit.(map[string]interface{})["_source"].(map[string]interface{})// 提取标签数组tags := []string{}if tagsRaw, ok := source["tags"].([]interface{}); ok {for _, tag := range tagsRaw {if tagStr, ok := tag.(string); ok {tags = append(tags, tagStr)}}}product := types.ProductItem{ID:          source["id"].(string),Name:        source["name"].(string),Description: source["description"].(string),Price:       source["price"].(float64),Category:    source["category"].(string),Tags:        tags,CreatedAt:   int64(source["createdAt"].(float64)),}products = append(products, product)}return &types.SearchResponse{Total:    total,Products: products,}, nil
}

3.6 实现索引和删除逻辑

修改 internal/logic/indexproductlogic.go 文件,实现商品索引功能:

package logicimport ("context""time""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type IndexProductLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewIndexProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *IndexProductLogic {return &IndexProductLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *IndexProductLogic) IndexProduct(req *types.IndexProductRequest) (resp *types.IndexProductResponse, err error) {// 如果未提供创建时间,则使用当前时间if req.Product.CreatedAt == 0 {req.Product.CreatedAt = time.Now().UnixMilli()}// 索引文档err = l.svcCtx.EsClient.IndexDocument(model.ProductIndex, req.Product.ID, req.Product)if err != nil {return &types.IndexProductResponse{Success: false,Message: "Failed to index product: " + err.Error(),}, nil}return &types.IndexProductResponse{Success: true,Message: "Product indexed successfully",}, nil
}

修改 internal/logic/deleteproductlogic.go 文件,实现商品删除功能:

package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type DeleteProductLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewDeleteProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteProductLogic {return &DeleteProductLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *DeleteProductLogic) DeleteProduct(req *types.DeleteProductRequest) (resp *types.IndexProductResponse, err error) {// todo: add your logic here and delete this line// 删除文档err = l.svcCtx.EsClient.DeleteDocument(model.ProductIndex, req.ID)if err != nil {return &types.IndexProductResponse{Success: false,Message: "Failed to delete product: " + err.Error(),}, nil}return &types.IndexProductResponse{Success: true,Message: "Product deleted successfully",}, nil
}

最后,修改主程序入口 search.go 文件,添加 Elasticsearch 索引初始化逻辑:


func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)// 初始化 Elasticsearch 索引if err := svc.InitElasticsearch(ctx.EsClient); err != nil {panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))}handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}

4. 项目测试

现在开始启动项目来测试

go run search.go

测试添加索引,我这里只粘贴了一个,其他数据自行补充:

curl --location '127.0.0.1:8888/api/products/index' \
--header 'Content-Type: application/json' \
--data '{"product":{"id": "1","name": "陶瓷马克杯","description": "简约北欧风格陶瓷马克杯,采用高温烧制,釉面光滑易清洗,适合日常咖啡或茶饮。","price": 39.9,"category": "家居","tags": ["厨房用品", "创意礼品", "陶瓷制品"],"createdAt": 1689321456}
}

运行结果:


{"success": true,"message": "Product indexed successfully"
}

测试索引搜索

curl --location '127.0.0.1:8888/api/search/products' \
--header 'Content-Type: application/json' \
--data '{"keyword" :"面包"
}
'

得到以下类似的结果:

{"total": 4,"products": [{"id": "5","name": "全麦面包","description": "无添加全麦面包,富含膳食纤维和蛋白质,口感松软,适合早餐搭配果酱或奶酪食用。","price": 19.8,"category": "食品","tags": ["健康食品","早餐必备","全麦谷物"],"createdAt": 1690001234},{"id": "4","name": "保湿面霜","description": "深层保湿面霜,含透明质酸和胶原蛋白成分,有效改善干燥肌肤,适合所有肤质日常护理。","price": 129,"category": "美妆","tags": ["护肤用品","保湿补水","温和配方"],"createdAt": 1687654321}]
}

测试索引删除

curl --location '127.0.0.1:8888/api/delete/products' \
--header 'Content-Type: application/json' \
--data '
{"id" :"666"
}'

5 功能拓展

完成基本功能后,我们可以添加一些高级功能,使搜索服务更加强大。

6.1 聚合查询

首先,在 API 定义文件 api/search/search.api 中添加新的类型和接口:

type (CategoryStat {Category  string  `json:"category"`Count     int64   `json:"count"`AvgPrice  float64 `json:"avgPrice"`MaxPrice  float64 `json:"maxPrice"`MinPrice  float64 `json:"minPrice"`}CategoryStatsResponse {CategoryStats []CategoryStat `json:"categoryStats"`}
)service search-api {// ...existing endpoints...@handler GetCategoryStatsget /api/stats/categories returns (CategoryStatsResponse)
}

使用 goctl 更新生成的代码:

goctl api go -api api/search/search.api -dir .

internal/pkg/es/es.go 中添加聚合查询方法:

// Aggregate 执行聚合查询
func (e *ElasticsearchClient) Aggregate(index string, query map[string]interface{}) (map[string]interface{}, error) {var buf strings.Builderif err := json.NewEncoder(&buf).Encode(query); err != nil {return nil, err}res, err := e.client.Search(e.client.Search.WithContext(context.Background()),e.client.Search.WithIndex(index),e.client.Search.WithBody(strings.NewReader(buf.String())),e.client.Search.WithSize(0), // 聚合查询通常不需要返回文档)if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {return nil, err}return nil, errors.New("aggregate error")}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {return nil, err}return r, nil
}

实现统计逻辑,在 internal/logic/getcategorystatslogic.go 文件中:

package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type GetCategoryStatsLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewGetCategoryStatsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCategoryStatsLogic {return &GetCategoryStatsLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *GetCategoryStatsLogic) GetCategoryStats() (resp *types.CategoryStatsResponse, err error) {// 构建聚合查询query := map[string]interface{}{"aggs": map[string]interface{}{"categories": map[string]interface{}{"terms": map[string]interface{}{"field": "category","size":  20,},"aggs": map[string]interface{}{"avg_price": map[string]interface{}{"avg": map[string]interface{}{"field": "price",},},"max_price": map[string]interface{}{"max": map[string]interface{}{"field": "price",},},"min_price": map[string]interface{}{"min": map[string]interface{}{"field": "price",},},},},},}// 执行聚合查询result, err := l.svcCtx.EsClient.Aggregate(model.ProductIndex, query)if err != nil {return nil, err}// 解析结果aggregations := result["aggregations"].(map[string]interface{})categories := aggregations["categories"].(map[string]interface{})buckets := categories["buckets"].([]interface{})stats := make([]types.CategoryStat, 0, len(buckets))for _, bucket := range buckets {b := bucket.(map[string]interface{})key := b["key"].(string)docCount := int64(b["doc_count"].(float64))avgPrice := b["avg_price"].(map[string]interface{})["value"].(float64)maxPrice := b["max_price"].(map[string]interface{})["value"].(float64)minPrice := b["min_price"].(map[string]interface{})["value"].(float64)stats = append(stats, types.CategoryStat{Category:  key,Count:     docCount,AvgPrice:  avgPrice,MaxPrice:  maxPrice,MinPrice:  minPrice,})}return &types.CategoryStatsResponse{CategoryStats: stats,}, nil
}

运行测试


curl --location '127.0.0.1:8888/api/stats/categories'

会得到如下的类似的结果:

{"categoryStats": [{"category": "家居","count": 2,"avgPrice": 26.4,"maxPrice": 39.9,"minPrice": 12.9},{"category": "服饰","count": 1,"avgPrice": 59.99,"maxPrice": 59.99,"minPrice": 59.99},{"category": "电子","count": 1,"avgPrice": 89.5,"maxPrice": 89.5,"minPrice": 89.5},{"category": "美妆","count": 1,"avgPrice": 129,"maxPrice": 129,"minPrice": 129},{"category": "运动","count": 1,"avgPrice": 79.5,"maxPrice": 79.5,"minPrice": 79.5},{"category": "食品","count": 1,"avgPrice": 19.8,"maxPrice": 19.8,"minPrice": 19.8}]
}

6.2 同义词搜索

要启用同义词搜索,需要修改索引映射。首先,修改 internal/model/product.go 文件中的索引映射:

// 修改 ProductIndexMapping 变量
var ProductIndexMapping = `{"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": {"filter": {"synonym_filter": {"type": "synonym","synonyms": ["音响, 音箱, 音像","衣服, 服装, 服饰","首饰, 手饰, 饰品"]}},"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding", "synonym_filter"]}}}},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text","analyzer": "text_analyzer","fields": {"keyword": {"type": "keyword"}}},"description": {"type": "text","analyzer": "text_analyzer"},"price": {"type": "double"},"category": {"type": "keyword"},"tags": {"type": "keyword"},"createdAt": {"type": "date","format": "epoch_millis"}}}
}`

增加ynonym_filter: 同义词过滤器,定义了3组同义词:

"音响, 音箱, 音像",
"衣服, 服装, 服饰",
"首饰, 手饰, 饰品"

当修改索引映射后,需要重建索引。因此在 internal/pkg/es/es.go 中添加删除索引的方法:

// DeleteIndex 删除索引
func (e *ElasticsearchClient) DeleteIndex(index string) error {res, err := e.client.Indices.Delete([]string{index})if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to delete index")}return nil
}

/svc/servicecontext.go 中添加更新映射的逻辑:

// UpdateElasticsearchIndex 更新索引映射(需要重建索引)
func UpdateElasticsearchIndex(client *es.ElasticsearchClient) error {// 检查索引是否存在exists, err := client.IndexExists(model.ProductIndex)if err != nil {return err}// 如果索引已存在,则删除并重建if exists {// 获取原索引的所有文档query := map[string]interface{}{"query": map[string]interface{}{"match_all": map[string]interface{}{},},"size": 10000, // 注意:实际应用中应使用滚动API处理大量数据}result, err := client.Search(model.ProductIndex, query)if err != nil {return err}// 提取文档hits := result["hits"].(map[string]interface{})["hits"].([]interface{})documents := make([]map[string]interface{}, 0, len(hits))for _, hit := range hits {hitMap := hit.(map[string]interface{})source := hitMap["_source"].(map[string]interface{})id := hitMap["_id"].(string)// 确保文档有IDsource["id"] = iddocuments = append(documents, source)}// 删除索引err = client.DeleteIndex(model.ProductIndex)if err != nil {return err}// 创建新索引err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)if err != nil {return err}// 如果有文档,重新索引它们if len(documents) > 0 {err = client.BulkIndex(model.ProductIndex, documents)if err != nil {return err}}return nil}// 如果索引不存在,则创建return client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
}

修改mian函数:


func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)//更新并初始化索引err := svc.UpdateElasticsearchIndex(ctx.EsClient)if err != nil {return}/*// 初始化 Elasticsearch 索引if err := svc.InitElasticsearch(ctx.EsClient); err != nil {panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))}*/handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}

为了方便测试,我们增加几个新的数据:

[{"id": "1","name": "豪华音像","description": "","price": 999.99,"category": "数码","tags": ["smartphone", "popular", "new"],"createdAt": 1715788800000  // 2025-05-14},{"id": "2","name": "智能音箱","description": "续航长达18小时","price": 1199.99,"category": "数码","tags": ["laptop", "apple", "portable"],"createdAt": 1715702400000  // 2025-05-13},{"id": "3","name": "轻便音响","description": "","price": 1799.99,"category": "数码","tags": ["tv", "4k", "oled"],"createdAt": 1715616000000  // 2025-05-12}
]

相关文章:

  • 在线文档管理系统 spring boot➕vue|源码+数据库+部署教程
  • Git - 1( 14000 字详解 )
  • JVM方法区核心技术解析:从方法区到执行引擎
  • 雾锁王国开服联机教程-专用服务器
  • 以项目的方式学QT开发(三)——超详细讲解(120000多字详细讲解,涵盖qt大量知识)逐步更新!
  • PaddleClas 车辆属性模型vehicle_attribute_model转onnx并部署
  • VirtualiSurg使用SenseGlove触觉手套开发XR手术培训体验
  • 「彻底卸载 Quay 容器仓库」:干净移除服务、镜像与配置的全流程指南
  • 使用GoLang版MySQLDiff对比表结构
  • OpenSSH 漏洞-SSH 服务器面临 MitM 攻击和拒绝服务攻击的风险
  • vue插槽的实例详解
  • 使用PEFT库将原始模型与LoRA权重合并
  • C++ asio网络编程(6)利用C11模拟伪闭包实现连接的安全回收
  • web3 前端常见错误类型以及错误捕获处理
  • WPS 调整多级编号
  • 【漫话机器学习系列】260.在前向神经网络中初始权重(Initializing Weights In Feedforward Neural Networks)
  • 驱动-Linux定时-timer_list
  • 嵌软面试每日一阅----通信协议篇(二)之TCP
  • ProceedingJoinPoint的认识
  • free void* 指令
  • 马上评|重病老人取款身亡,如何避免类似悲剧?
  • 外交部部长助理兼礼宾司司长洪磊接受美国新任驻华大使递交国书副本
  • 广西壮族自治区政府主席蓝天立任上被查
  • 上海市重大工程一季度开局良好,多项生态类项目按计划实施
  • 新任国防部新闻发言人蒋斌正式亮相
  • 著名词作家陈哲逝世,代表作《让世界充满爱》《同一首歌》等