当前位置: 首页 > news >正文

elasticsearch索引多长时间刷新一次(智能刷新索引根据数据条数去更新)

这个版本使用的Spring Boot 2.5.10 + JDK 1.8 + Elasticsearch 7.12

1. 修正的 AdaptiveRefreshService

java

package com.example.esdemo.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Slf4j
@Service
public class AdaptiveRefreshService {@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;/*** 获取索引文档数量 - 修正版本*/public long getIndexDocumentCount(String indexName) {try {// 创建索引坐标IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);// 构建计数查询NativeSearchQuery countQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchAllQuery()).build();// 设置查询选项,只获取总数countQuery.setMaxResults(0); // 不返回实际文档,只计数// 执行查询SearchHits<Document> searchHits = elasticsearchRestTemplate.search(countQuery, Document.class, indexCoordinates);return searchHits.getTotalHits();} catch (Exception e) {log.error("获取索引 {} 文档数量失败: {}", indexName, e.getMessage());// 返回默认值return getIndexDocumentCountByAlternative(indexName);}}/*** 备选方法获取文档数量*/private long getIndexDocumentCountByAlternative(String indexName) {try {// 方法2: 使用 repository 计数(如果配置了对应的 Repository)// 这里简化处理,返回估计值log.warn("使用备选方法获取索引 {} 文档数量", indexName);return 1000L; // 默认值} catch (Exception e) {log.error("备选方法也失败: {}", e.getMessage());return 0L;}}/*** 获取索引存储大小(估算)*/public long getIndexStorageSize(String indexName) {try {long docCount = getIndexDocumentCount(indexName);// 简单估算:假设平均每个文档 1KBreturn docCount * 1024;} catch (Exception e) {log.warn("获取索引存储大小失败: {}", e.getMessage());return 0L;}}/*** 获取推荐的刷新间隔(毫秒)*/public long getRecommendedRefreshInterval(String indexName) {try {long docCount = getIndexDocumentCount(indexName);log.info("索引 {} 文档数量: {}", indexName, docCount);// 根据文档数量推荐刷新间隔if (docCount > 10_000_000L) { // 超过1000万文档log.info("索引 {} 文档数超过1000万,推荐刷新间隔: 30秒", indexName);return 30000L; // 30秒} else if (docCount > 1_000_000L) { // 超过100万文档log.info("索引 {} 文档数超过100万,推荐刷新间隔: 15秒", indexName);return 15000L; // 15秒} else if (docCount > 100_000L) { // 超过10万文档log.info("索引 {} 文档数超过10万,推荐刷新间隔: 10秒", indexName);return 10000L; // 10秒} else if (docCount > 10_000L) { // 超过1万文档log.info("索引 {} 文档数超过1万,推荐刷新间隔: 5秒", indexName);return 5000L; // 5秒} else if (docCount > 1_000L) { // 超过1000文档log.info("索引 {} 文档数超过1000,推荐刷新间隔: 3秒", indexName);return 3000L; // 3秒} else {log.info("索引 {} 文档数较少,推荐刷新间隔: 1秒", indexName);return 1000L; // 1秒}} catch (Exception e) {log.warn("获取推荐刷新间隔失败,使用默认值: {}", e.getMessage());return 10000L; // 默认10秒}}/*** 获取索引是否存在*/public boolean indexExists(String indexName) {try {IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName));return indexOperations.exists();} catch (Exception e) {log.error("检查索引是否存在失败: {}", indexName, e);return false;}}/*** 获取详细的索引统计信息*/public Map<String, Object> getIndexStats(String indexName) {Map<String, Object> stats = new HashMap<>();try {// 检查索引是否存在if (!indexExists(indexName)) {stats.put("error", "索引不存在: " + indexName);return stats;}long docCount = getIndexDocumentCount(indexName);long storageSize = getIndexStorageSize(indexName);long recommendedInterval = getRecommendedRefreshInterval(indexName);stats.put("indexName", indexName);stats.put("documentCount", docCount);stats.put("estimatedStorageSize", storageSize);stats.put("formattedStorageSize", formatBytes(storageSize));stats.put("recommendedRefreshInterval", recommendedInterval);stats.put("formattedRefreshInterval", formatInterval(recommendedInterval));stats.put("timestamp", System.currentTimeMillis());stats.put("exists", true);} catch (Exception e) {log.error("获取索引统计信息失败: {}", e.getMessage());stats.put("error", e.getMessage());stats.put("exists", false);}return stats;}/*** 格式化字节大小*/private String formatBytes(long bytes) {if (bytes < 1024) return bytes + " B";if (bytes < 1024 * 1024) return String.format("%.2f KB", bytes / 1024.0);if (bytes < 1024 * 1024 * 1024) return String.format("%.2f MB", bytes / (1024.0 * 1024));return String.format("%.2f GB", bytes / (1024.0 * 1024 * 1024));}/*** 格式化时间间隔*/private String formatInterval(long milliseconds) {if (milliseconds < 1000) return milliseconds + "ms";if (milliseconds < 60000) return String.format("%.1f秒", milliseconds / 1000.0);return String.format("%.1f分钟", milliseconds / 60000.0);}
}

2. 修正的 EnhancedIndexRefreshService

java

package com.example.esdemo.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class EnhancedIndexRefreshService {@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate AdaptiveRefreshService adaptiveRefreshService;// 记录刷新历史private final Map<String, RefreshHistory> refreshHistory = new ConcurrentHashMap<>();/*** 智能刷新 - 根据索引大小自适应调整*/public boolean smartRefresh(String indexName) {try {// 检查索引是否存在if (!adaptiveRefreshService.indexExists(indexName)) {log.warn("索引 {} 不存在,跳过刷新", indexName);return false;}long recommendedInterval = adaptiveRefreshService.getRecommendedRefreshInterval(indexName);RefreshHistory history = refreshHistory.computeIfAbsent(indexName, k -> new RefreshHistory());long currentTime = System.currentTimeMillis();long timeSinceLastRefresh = currentTime - history.getLastRefreshTime();// 检查是否应该刷新if (timeSinceLastRefresh < recommendedInterval) {log.debug("索引 {} 跳过刷新,推荐间隔: {}ms,距离上次刷新: {}ms", indexName, recommendedInterval, timeSinceLastRefresh);return false;}// 执行刷新long startTime = System.currentTimeMillis();IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName));indexOperations.refresh();long duration = System.currentTimeMillis() - startTime;// 记录刷新历史history.recordRefresh(currentTime, duration);log.info("索引 {} 刷新完成,耗时: {}ms,文档数: {}", indexName, duration, adaptiveRefreshService.getIndexDocumentCount(indexName));return true;} catch (Exception e) {log.error("智能刷新索引失败: {}", indexName, e);return false;}}/*** 强制刷新索引(不受间隔限制)*/public boolean forceRefresh(String indexName) {try {if (!adaptiveRefreshService.indexExists(indexName)) {log.warn("索引 {} 不存在,跳过强制刷新", indexName);return false;}long startTime = System.currentTimeMillis();IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName));indexOperations.refresh();long duration = System.currentTimeMillis() - startTime;// 记录刷新历史RefreshHistory history = refreshHistory.computeIfAbsent(indexName, k -> new RefreshHistory());history.recordRefresh(System.currentTimeMillis(), duration);log.info("索引 {} 强制刷新完成,耗时: {}ms", indexName, duration);return true;} catch (Exception e) {log.error("强制刷新索引失败: {}", indexName, e);return false;}}/*** 定时刷新任务 - 每30秒检查一次*/@Scheduled(fixedRate = 30000)public void scheduledSmartRefresh() {try {log.debug("执行定时智能刷新检查");// 这里可以配置需要自动刷新的索引列表String[] targetIndices = {"user_es_index", "product_index", "order_index"};for (String indexName : targetIndices) {if (adaptiveRefreshService.indexExists(indexName)) {smartRefresh(indexName);}}} catch (Exception e) {log.error("定时刷新任务执行失败", e);}}/*** 获取刷新历史统计*/public Map<String, Object> getRefreshStats(String indexName) {RefreshHistory history = refreshHistory.get(indexName);Map<String, Object> stats = new HashMap<>();if (history != null) {stats.put("lastRefreshTime", history.getLastRefreshTime());stats.put("lastRefreshTimeFormatted", formatTimestamp(history.getLastRefreshTime()));stats.put("averageRefreshDuration", history.getAverageDuration());stats.put("refreshCount", history.getRefreshCount());stats.put("totalRefreshDuration", history.getTotalDuration());} else {stats.put("lastRefreshTime", 0);stats.put("lastRefreshTimeFormatted", "从未刷新");stats.put("averageRefreshDuration", 0);stats.put("refreshCount", 0);stats.put("totalRefreshDuration", 0);}return stats;}/*** 格式化时间戳*/private String formatTimestamp(long timestamp) {if (timestamp == 0) return "从未刷新";return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp));}/*** 刷新历史记录类*/private static class RefreshHistory {private long lastRefreshTime = 0;private final List<Long> refreshDurations = new ArrayList<>();private int refreshCount = 0;public void recordRefresh(long timestamp, long duration) {this.lastRefreshTime = timestamp;this.refreshDurations.add(duration);this.refreshCount++;// 只保留最近50次记录if (refreshDurations.size() > 50) {refreshDurations.remove(0);}}public long getLastRefreshTime() {return lastRefreshTime;}public double getAverageDuration() {if (refreshDurations.isEmpty()) return 0.0;return refreshDurations.stream().mapToLong(Long::longValue).average().orElse(0.0);}public long getTotalDuration() {return refreshDurations.stream().mapToLong(Long::longValue).sum();}public int getRefreshCount() {return refreshCount;}}
}

3. 更新 Controller 接口

java

/*** 智能刷新索引*/
@PostMapping("/index/refresh/smart")
public ResponseEntity<Map<String, Object>> smartRefreshIndex(@RequestParam(defaultValue = "user_es_index") String indexName) {log.info("请求智能刷新索引: {}", indexName);try {boolean success = enhancedIndexRefreshService.smartRefresh(indexName);Map<String, Object> refreshStats = enhancedIndexRefreshService.getRefreshStats(indexName);Map<String, Object> result = createSuccessResult(success, success ? "索引刷新成功" : "索引刷新跳过(根据推荐间隔)");result.put("indexName", indexName);result.put("refreshStats", refreshStats);return ResponseEntity.ok(result);} catch (Exception e) {log.error("智能刷新索引异常", e);return ResponseEntity.status(500).body(createErrorResult("智能刷新索引失败: " + e.getMessage()));}
}/*** 获取索引统计信息*/
@GetMapping("/index/stats")
public ResponseEntity<Map<String, Object>> getIndexStats(@RequestParam(defaultValue = "user_es_index") String indexName) {log.info("请求获取索引统计: {}", indexName);try {Map<String, Object> indexStats = adaptiveRefreshService.getIndexStats(indexName);Map<String, Object> refreshStats = enhancedIndexRefreshService.getRefreshStats(indexName);Map<String, Object> result = createSuccessResult(true, "获取索引统计成功");result.put("indexStats", indexStats);result.put("refreshStats", refreshStats);return ResponseEntity.ok(result);} catch (Exception e) {log.error("获取索引统计异常", e);return ResponseEntity.status(500).body(createErrorResult("获取索引统计失败: " + e.getMessage()));}
}

4. 使用示例

测试智能刷新:

bash

# 智能刷新索引
curl -X POST "http://localhost:8080/api/es/users/index/refresh/smart?indexName=user_es_index"# 获取索引统计
curl -X GET "http://localhost:8080/api/es/users/index/stats?indexName=user_es_index"

响应示例:

json

{"success": true,"message": "索引刷新成功","indexName": "user_es_index","refreshStats": {"lastRefreshTime": 1696060800000,"averageRefreshDuration": 45.2,"refreshCount": 15,"documentCount": 12500,"estimatedStorageSize": 12800000,"formattedStorageSize": "12.21 MB","recommendedRefreshInterval": 5000,"formattedRefreshInterval": "5.0秒"}
}

现在 getIndexSize 方法已经完整实现了,它可以根据索引的实际文档数量智能推荐刷新间隔,避免频繁刷新带来的性能开销。

http://www.dtcms.com/a/465757.html

相关文章:

  • 脑电模型实战系列(二):PyTorch实现简单DNN模型
  • 脑电模型实战系列(二):为什么从简单DNN开始脑电情绪识别?
  • 哪个网站做h5比较好看金华手机建站模板
  • 制作网站源码电子商务网站建设课后习题答案
  • Google 智能体设计模式:模型上下文协议 (MCP)
  • 智能 DAG 编辑器:从基础功能到创新应用的全方位探索
  • 多语言建站系统深圳做网站比较好的公司有哪些
  • 基于OpenCV的智能疲劳检测系统:原理、实现与创新
  • Google 智能体设计模式:多智能体协作
  • 建设企业网站目的杭州网站建设q479185700惠
  • 自己建网站百度到吗网站建设与维护功能意义
  • Oracle 数据库多实例配置
  • 任天堂3DS模拟器最新版 Azahar Emulator 2123.3 开源游戏模拟器
  • 深圳福田网站建设公司共享ip网站 排名影响
  • 【AI安全】Anthropic推出AI安全工具Petri:通过自主Agent研究大模型行为
  • 云南做网站哪家便宜wordpress单页下载
  • 深度掌握 Git 分支体系:从基础操作到高级策略
  • CTF — ZIP 文件密码恢复
  • AI编程 | 基于即梦AI-Seedream 4.0模型,搭建人脸生成系统
  • 找设计案例的网站网站 设计
  • 医院项目:IBMS 集成系统 + 楼宇自控系统 + 智能照明系统协同解决方案
  • JavaEE初阶5.0
  • 一个企业做网站推广的优势手机网站怎么制作内容
  • 有代码怎么做网站做网站用源码
  • linux 环境下mysql 数据库自动备份和清库 通过crontab 创建定时任务实现mysql数据库备份
  • 每天一个设计模式——开闭原则
  • C++协程版本网络框架:快速构建一个高效极致简洁的HTTP服务器
  • 福州台江区网站建设网页怎么做链接
  • 单片机图形化编程:课程目录介绍 总纲
  • Redis-集合(Set)类型