ES常识8:ES8.X如何实现热词统计
文章目录
- 一、数据采集与存储设计
- 1. 确定需记录的字段
- 2. 设计搜索日志索引
- 二、数据写入与采集
- 三、热门搜索词统计(核心逻辑)
- 1. 基础版:近 7 天热门搜索词(按出现次数排序)
- 2. 进阶版:加权热门词(结合点击量与搜索时长)
- 3. 高基数优化:避免内存溢出
- 四、缓存与实时性优化
- 1. 定时预计算(推荐方案)
- 2. 实时查询(轻量场景)
- 五、推荐接口实现
- 六、监控与调优
- 1. 监控聚合性能
- 2. 调整索引配置
- 3. 处理异常词
- 总结
基于 Elasticsearch 8.x 实现热门搜索词推荐,需结合 数据采集、索引设计、聚合统计、缓存优化 等环节。以下是分步骤的详细实现方案:
一、数据采集与存储设计
1. 确定需记录的字段
为了统计热门搜索词,需采集用户搜索行为的基础信息。推荐记录以下字段:
字段名 | 类型 | 说明 |
---|---|---|
search_keyword | keyword | 用户输入的搜索词(必须使用 keyword 类型,避免分词影响统计)。 |
timestamp | date | 搜索时间(用于时间窗口统计,如“近 7 天热门词”)。 |
user_id | keyword | 用户唯一标识(可选,用于去重统计独立用户搜索量)。 |
is_clicked | boolean | 是否点击搜索结果(可选,作为热门度的加权指标)。 |
search_duration | integer | 搜索耗时(毫秒,可选,用于评估搜索质量)。 |
2. 设计搜索日志索引
在 ES 中创建索引 search_logs
,存储用户搜索行为数据。索引配置示例:
PUT /search_logs
{"settings": {"index": {"number_of_shards": 3, // 根据数据量调整分片数(建议单分片 10-50GB)"number_of_replicas": 1, // 1 副本保证高可用"refresh_interval": "1s" // 近实时搜索(可调整为 5s 平衡写入性能)}},"mappings": {"properties": {"search_keyword": { "type": "keyword", "doc_values": true }, // 启用 doc_values 加速聚合"timestamp": { "type": "date", "format": "epoch_millis" },"user_id": { "type": "keyword", "index": false }, // 不参与搜索,仅用于聚合"is_clicked": { "type": "boolean" },"search_duration": { "type": "integer" }}}
}
二、数据写入与采集
用户每次搜索时,将搜索行为数据写入 search_logs
索引。示例(使用 ES 客户端):
from elasticsearch import Elasticsearch
from datetime import datetimees = Elasticsearch(["http://es-node-1:9200"])def log_search(keyword: str, user_id: str, is_clicked: bool, duration: int):doc = {"search_keyword": keyword,"timestamp": int(datetime.now().timestamp() * 1000), # 毫秒级时间戳"user_id": user_id,"is_clicked": is_clicked,"search_duration": duration}es.index(index="search_logs", document=doc)# 模拟用户搜索行为(调用示例)
log_search("机器学习教程", "user_123", is_clicked=True, duration=850)
三、热门搜索词统计(核心逻辑)
使用 ES 的 terms
聚合统计搜索词的频率,并结合时间窗口、加权指标优化结果。
1. 基础版:近 7 天热门搜索词(按出现次数排序)
统计近 7 天搜索次数最多的前 10 个关键词:
GET /search_logs/_search
{"size": 0, // 不返回原始文档"query": {"range": {"timestamp": {"gte": "now-7d", // 近 7 天"lte": "now"}}},"aggs": {"top_keywords": {"terms": {"field": "search_keyword","size": 10, // 返回前 10 个词"order": { "_count": "desc" }, // 按次数降序"min_doc_count": 3 // 过滤出现次数少于 3 次的词(避免长尾词)}}}
}
2. 进阶版:加权热门词(结合点击量与搜索时长)
若需考虑搜索词的“质量”(如用户点击了结果或搜索耗时较短),可通过 sum
聚合加权:
GET /search_logs/_search
{"size": 0,"query": { "range": { "timestamp": { "gte": "now-7d" } } },"aggs": {"top_weighted_keywords": {"terms": {"field": "search_keyword","size": 10,"order": { "score": "desc" } // 按自定义分数排序},"aggs": {"click_count": { "sum": { "field": "is_clicked" } }, // 点击次数(true=1,false=0)"avg_duration": { "avg": { "field": "search_duration" } }, // 平均耗时"score": {"script": {"source": "params.click_count * 2 + (1000 - params.avg_duration) * 0.1", // 自定义分数公式(点击权重更高,耗时越短得分越高)"params": {"click_count": "sum(is_clicked)","avg_duration": "avg(search_duration)"}}}}}}
}
3. 高基数优化:避免内存溢出
当搜索词数量极大(如每天百万级不同词),terms
聚合可能消耗大量内存。此时可通过 shard_size
参数优化:
{"aggs": {"top_keywords": {"terms": {"field": "search_keyword","size": 10, // 最终返回 10 个词"shard_size": 100, // 每个分片预取 100 个词(提升准确性)"min_doc_count": 5 // 过滤低频词}}}
}
四、缓存与实时性优化
直接调用 ES 聚合接口可能因计算耗时影响推荐接口性能(尤其在高并发时)。推荐结合 缓存机制 优化:
1. 定时预计算(推荐方案)
- 逻辑:定期(如每分钟)执行聚合查询,将结果缓存到 Redis 或 ES 的
hot_keywords
索引中。 - 优势:减少实时聚合的计算压力,保证推荐接口的低延迟。
示例(Python 定时任务):
import redis
from elasticsearch import Elasticsearches = Elasticsearch(["http://es-node-1:9200"])
redis_client = redis.Redis(host="redis-host", port=6379)def update_hot_keywords():# 执行 ES 聚合查询response = es.search(index="search_logs",body={"size": 0,"query": {"range": {"timestamp": {"gte": "now-7d"}}},"aggs": {"top_keywords": {"terms": {"field": "search_keyword", "size": 10, "order": {"_count": "desc"}}}}})# 提取结果并缓存到 Redishot_keywords = [bucket["key"] for bucket in response["aggregations"]["top_keywords"]["buckets"]]redis_client.set("hot_keywords", str(hot_keywords), ex=60) # 缓存 60 秒# 每 60 秒执行一次预计算
update_hot_keywords()
2. 实时查询(轻量场景)
若数据量较小或实时性要求极高(如秒级更新),可直接调用 ES 聚合接口,但需限制 size
和 shard_size
以降低计算量。
五、推荐接口实现
提供一个 HTTP 接口,从缓存或 ES 读取热门词并返回。示例(使用 Flask):
from flask import Flask, jsonify
import redisapp = Flask(__name__)
redis_client = redis.Redis(host="redis-host", port=6379)@app.route("/hot_keywords")
def get_hot_keywords():# 从 Redis 读取缓存(若存在)hot_keywords = redis_client.get("hot_keywords")if hot_keywords:return jsonify(eval(hot_keywords.decode()))# 若缓存失效,直接查询 ES(备用逻辑)# (此处省略 ES 查询代码,实际应避免高频调用)return jsonify(["机器学习", "Python教程", "大数据分析"]) # 默认值if __name__ == "__main__":app.run(host="0.0.0.0", port=5000)
六、监控与调优
1. 监控聚合性能
通过 ES 的 _nodes/stats/indices/aggregations
监控聚合耗时,确保 top_keywords
聚合的执行时间在合理范围(如 < 500ms)。
2. 调整索引配置
refresh_interval
:若写入量极大,可调整为30s
减少段合并开销,但会降低实时性。index.codec
:使用best_compression
压缩存储,减少磁盘占用(适合日志类数据)。
3. 处理异常词
通过 exclude
参数过滤无意义词(如“测试”“null”):
{"terms": {"field": "search_keyword","exclude": ["测试", "null", "undefined"]}
}
总结
基于 ES8 实现热门搜索词推荐的核心步骤为:
- 数据采集:记录搜索词及关联信息(时间、用户、点击等)。
- 索引设计:使用
keyword
类型存储搜索词,优化聚合性能。 - 聚合统计:通过
terms
聚合计算词频,结合时间窗口和加权指标。 - 缓存优化:定时预计算结果并缓存,降低实时查询压力。
- 接口实现:提供 HTTP 接口返回缓存或实时聚合结果。
通过以上步骤,可高效实现一个兼顾实时性与性能的热门搜索词推荐系统。