当前位置: 首页 > news >正文

ElasticSearch:不停机更新索引类型(未验证)

文章目录

      • **一、前期准备**
        • **1. 集群健康检查**
        • **2. 备份数据**
        • **3. 监控系统准备**
      • **二、创建新索引并配置**
        • **1. 设计新索引映射**
        • **2. 创建读写别名**
      • **三、全量数据迁移**
        • **1. 执行初始 Reindex**
        • **2. 监控 Reindex 进度**
      • **四、增量数据同步**
        • **1. 方案选择**
      • **五、双写切换**
        • **1. 修改应用代码实现双写**
        • **2. 验证双写一致性**
      • **六、流量切换**
        • **1. 只读流量切换**
        • **2. 验证查询结果一致性**
        • **3. 写入流量切换**
      • **七、收尾工作**
        • **1. 恢复新索引配置**
        • **2. 验证性能和稳定性**
        • **3. 删除旧索引(可选)**
      • **八、回滚策略**
      • **九、优化建议**
      • **十、风险控制**

一、前期准备

1. 集群健康检查
GET /_cluster/health

确保:

  • statusgreen
  • number_of_nodes 符合预期
  • unassigned_shards 为 0
2. 备份数据
# 注册快照仓库
PUT /_snapshot/my_backup
{"type": "fs","settings": {"location": "/path/to/snapshots"}
}# 创建全量快照
PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true
3. 监控系统准备
  • 开启 ES 性能监控(如使用 Elastic APM、Prometheus + Grafana)
  • 设置关键指标告警(如集群负载、JVM 内存、磁盘使用率)

二、创建新索引并配置

1. 设计新索引映射
PUT /new_products
{"settings": {"index.number_of_shards": 5,  // 与旧索引保持一致"index.number_of_replicas": 1,"index.refresh_interval": "30s",  // 临时调大,提升写入性能"index.translog.durability": "async",  // 临时使用异步持久化"index.translog.sync_interval": "30s"},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text"},"price": {"type": "double"},  // 假设原字段为 integer"create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"},"tags": {"type": "keyword"}}}
}
2. 创建读写别名
POST /_aliases
{"actions": [{ "add": { "alias": "products_read", "index": "old_products" } },{ "add": { "alias": "products_write", "index": "old_products" } }]
}

三、全量数据迁移

1. 执行初始 Reindex
POST /_reindex?wait_for_completion=false
{"source": {"index": "old_products","size": 5000,  // 每次查询 5000 条"sort": ["_doc"]  // 按文档顺序处理,避免遗漏},"dest": {"index": "new_products","op_type": "create"},"script": {"source": """// 类型转换逻辑if (ctx._source.containsKey("price")) {ctx._source.price = Double.parseDouble(ctx._source.price.toString());}// 日期格式转换if (ctx._source.containsKey("create_time")) {try {ctx._source.create_time = new Date(ctx._source.create_time).getTime();} catch (Exception e) {// 处理异常日期格式ctx._source.create_time = System.currentTimeMillis();}}"""}
}
2. 监控 Reindex 进度
GET /_tasks?detailed=true&actions=*reindex

四、增量数据同步

1. 方案选择
  • 方案 A:基于时间戳的定时同步(适合有 update_time 字段的场景)
POST /_reindex?wait_for_completion=false
{"source": {"index": "old_products","query": {"range": {"update_time": {"gte": "{{last_sync_time}}",  // 上次同步时间"lt": "now"}}}},"dest": {"index": "new_products"}
}
  • 方案 B:基于 Canal 的 binlog 订阅(适合 ES 作为 MySQL 从库的场景)
# 部署 Canal 客户端
canal.deployer-1.1.5/bin/startup.sh# 配置 canal.instance.filter.regex=.*\\..* 监听全量变更

五、双写切换

1. 修改应用代码实现双写

在应用层同时写入新旧索引:

// 伪代码示例
public void indexProduct(Product product) {// 写入旧索引esClient.index("products_write", product);// 写入新索引(带类型转换)Product newProduct = convertProduct(product);esClient.index("new_products", newProduct);
}
2. 验证双写一致性
// 对比同一文档在新旧索引中的差异
GET old_products/_doc/1
GET new_products/_doc/1

六、流量切换

1. 只读流量切换
POST /_aliases
{"actions": [{ "remove": { "alias": "products_read", "index": "old_products" } },{ "add": { "alias": "products_read", "index": "new_products" } }]
}
2. 验证查询结果一致性
// 对比相同查询在新旧索引中的结果
GET products_read/_search?q=name:iphone
GET old_products/_search?q=name:iphone
3. 写入流量切换
POST /_aliases
{"actions": [{ "remove": { "alias": "products_write", "index": "old_products" } },{ "add": { "alias": "products_write", "index": "new_products" } }]
}

七、收尾工作

1. 恢复新索引配置
PUT /new_products/_settings
{"index.refresh_interval": "1s","index.translog.durability": "request","index.translog.sync_interval": "5s"
}
2. 验证性能和稳定性
  • 监控集群负载
  • 验证业务查询性能
  • 验证写入吞吐量
3. 删除旧索引(可选)
DELETE /old_products

八、回滚策略

若出现问题,可快速回滚:

POST /_aliases
{"actions": [{ "remove": { "alias": "products_read", "index": "new_products" } },{ "add": { "alias": "products_read", "index": "old_products" } },{ "remove": { "alias": "products_write", "index": "new_products" } },{ "add": { "alias": "products_write", "index": "old_products" } }]
}

九、优化建议

  1. 分批次迁移:对百万级数据,按时间或 ID 范围分批 Reindex,避免单次任务过大
  2. 限流控制
POST /_reindex?wait_for_completion=false
{"source": { "index": "old_products" },"dest": { "index": "new_products" },"requests_per_second": 100  // 每秒处理 100 个请求
}
  1. 临时扩容:迁移期间增加专用协调节点,减轻数据节点压力
  2. 预热缓存:迁移后对热点数据执行预热查询
  3. 自动化脚本:使用 Python 脚本编排整个流程:
import requests
import timedef reindex_with_progress():# 启动 reindexresponse = requests.post("http://localhost:9200/_reindex?wait_for_completion=false",json={"source": {"index": "old_products"},"dest": {"index": "new_products"}})task_id = response.json()["task"]# 监控进度while True:status = requests.get(f"http://localhost:9200/_tasks/{task_id}").json()completed = status["task"]["status"]["completed"]total = status["task"]["status"]["total"]print(f"进度: {completed}/{total} ({completed/total*100:.2f}%)")if status["completed"]:breaktime.sleep(5)reindex_with_progress()

十、风险控制

  1. 灰度发布:先迁移部分数据进行验证
  2. 熔断机制:设置错误率阈值,超过则自动停止迁移
  3. 预留资源:确保集群有 30% 以上的空闲资源
  4. 夜间执行:选择业务低峰期执行核心操作

http://www.dtcms.com/a/290137.html

相关文章:

  • git switch
  • (LeetCode 面试经典 150 题) 219. 存在重复元素 II (哈希表)
  • taro微信小程序的tsconfig.json文件说明
  • 自动化与安全 - 将 Terraform 集成到 CI/CD
  • 编译支持cuda硬件加速的ffmpeg
  • 数据库和数据仓库的区别
  • day27 力扣332.重新安排行程 力扣51. N皇后 力扣37. 解数独 力扣455.分发饼干 力扣376. 摆动序列 力扣53. 最大子序和
  • 云原生周刊:K8s 中的后量子密码学
  • OpenCV计算机视觉实战(16)——图像分割技术
  • 微服务的编程测评系统-身份认证-管理员登录前端
  • LeetCode|Day21|204. 计数质数|Python刷题笔记
  • 【黑马SpringCloud微服务开发与实战】(四)微服务02
  • 随笔20250721 PostgreSQL实体类生成器
  • 【TVM 教程】TVM 代码库实例讲解
  • Spring AI 集成阿里云百炼与 RAG 知识库,实现专属智能助手(框架思路)
  • 若依前后端部署
  • Linux进程核心机制:状态、优先级与上下文切换详解
  • 基于Python flask的电影数据分析及可视化系统的设计与实现,可视化内容很丰富
  • 信息整合注意力IIA,通过双方向注意力机制重构空间位置信息,动态增强目标关键特征并抑制噪声
  • 文本数据分析
  • 数据分析的尽头是什么?是洞察,而非数字!
  • Car Kit重构车机开发体验,让车载应用开发驶入快车道
  • 分布式定时任务系列13:死循环是任务触发的银弹?
  • Mac上安装Claude Code的步骤
  • Python Locust库详解:从入门到分布式压力测试实战
  • 【web自动化】-5- fixture集中管理和项目重构
  • 2025最新版PyCharm for Mac统一版安装使用指南
  • Q10900H6迷你电脑:集成双10G+四2.5G网口,支持多系统网络部署
  • Python高效入门指南
  • 详解 @property 装饰器与模型数据类型检测