当前位置: 首页 > news >正文

基于 Elasticsearch 解决分库分表查询难题

1.1 业务场景
某电商平台订单系统日均交易量达 50 万单,历史订单数据已超 1.6 亿条。为解决关系型数据库的性能瓶颈,系统采用分库分表架构,但随之而来的跨库跨表查询问题严重影响业务效率。

1.2 分库分表架构
分库策略:按用户 ID 哈希值分为 4 个数据库(db0-db3)
分表策略:每个数据库内按订单 ID 哈希分为 8 张表(order_0-order_7)
总表数量:4 库 ×8 表 = 32 张表
单表数据量:约 500 万条
数据增长:日均新增 50 万条订单记录

1.3 面临的查询挑战
跨库跨表查询需遍历多张表,性能极差(平均响应时间 10-30 秒)
复杂统计分析(如各省份订单量、金额分布)难以实现
历史数据与实时数据查询性能差异大
分库分表规则变更时,查询逻辑需同步修改

2.解决方案概述
本方案引入 Elasticsearch(ES)作为统一检索层,整合分散在分库分表中的数据,提供高效的跨表检索能力。整体架构如下:
描述:
用户请求 → 应用服务 → Elasticsearch → 响应结果;
分库分表数据通过 Canal→Kafka→Logstash 同步至 ES

核心流程:
分库分表中的订单数据通过同步管道实时写入 ES
应用系统通过 ES API 实现跨库跨表的高效查询
复杂统计分析通过 ES 的聚合功能直接完成

3. 技术栈与中间件介绍
3.1 Elasticsearch
简介:分布式搜索引擎,基于 Lucene 构建,提供近实时的全文检索、分析能力。
核心特性:
分布式架构:自动分片和副本机制,保证高可用和扩展性
实时搜索:数据写入后秒级可查
强大的聚合分析:支持复杂的统计分析和数据聚合
RESTful API:易于各种语言集成
水平扩展:可通过增加节点无缝扩展集群容量
在本方案中的作用
存储订单数据的索引,提供高效查询
支持跨库分表的复杂条件查询
提供聚合分析能力,满足业务统计需求
版本选择:7.14.0(稳定版,支持向量搜索和安全功能)

3.2 Canal
简介:阿里巴巴开源的数据库 binlog 同步工具,伪装成 MySQL 从节点,实时抓取 binlog 日志。
核心特性:
低延迟:毫秒级同步延迟
高可靠:支持断点续传
多数据源支持:主要支持 MySQL,也支持 MariaDB 等
灵活的过滤机制:可按库、表、字段过滤数据
在本方案中的作用:
实时监听分库分表的 binlog 日志
解析数据库变更事件(INSERT/UPDATE/DELETE)
将解析后的数据发送到 Kafka 消息队列
版本选择:1.1.5(稳定版,支持 MySQL 8.0)

3.3 Kafka
简介:分布式流处理平台,高吞吐量的消息队列系统。
核心特性:
高吞吐量:单机可处理每秒数十万条消息
持久化:消息持久化到磁盘,支持数据重放
分区机制:支持消息分区,提高并行处理能力
多消费者模式:支持多个消费者组并行消费
在本方案中的作用:
作为 Canal 和 Logstash 之间的缓冲层
削峰填谷,应对数据同步的流量波动
提供消息持久化,确保数据不丢失
版本选择:2.8.1(稳定版,社区活跃)

3.4 Logstash
简介:开源的数据收集、处理和传输工具,属于 Elastic Stack 的一部分。
核心特性:
丰富的输入输出插件:支持多种数据源和目标存储
强大的过滤功能:可对数据进行转换、过滤和 enrichment
管道处理:支持复杂的数据处理流程
在本方案中的作用:
从 Kafka 消费订单数据
对数据进行清洗、转换和格式调整
将处理后的数据写入 Elasticsearch
版本选择:7.14.0(与 ES 版本保持一致)

4.1 数据模型设计
4.1.1 MySQL 订单表结构

-- 每张订单表的结构(以order_0为例)
CREATE TABLE `order_0` (`order_id` bigint(20) NOT NULL COMMENT '订单ID',`user_id` bigint(20) NOT NULL COMMENT '用户ID',`amount` decimal(10,2) NOT NULL COMMENT '订单金额',`status` tinyint(4) NOT NULL COMMENT '状态(0-待支付,1-已支付,2-退货)',`province` varchar(20) NOT NULL COMMENT '省份',`city` varchar(20) NOT NULL COMMENT '城市',`product_ids` varchar(100) NOT NULL COMMENT '商品ID列表,逗号分隔',`create_time` datetime NOT NULL COMMENT '创建时间',`update_time` datetime NOT NULL COMMENT '更新时间',PRIMARY KEY (`order_id`),KEY `idx_user_id` (`user_id`),KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4.1.2 Elasticsearch 索引设计
索引模板:

PUT _index_template/order_template
{"index_patterns": ["order_*"],  // 匹配所有order开头的索引"template": {"settings": {"number_of_shards": 3,       // 每个索引3个主分片"number_of_replicas": 1      // 每个分片1个副本},"mappings": {"properties": {"order_id": { "type": "keyword" },        // 订单ID精确匹配"user_id": { "type": "keyword" },         // 用户ID精确匹配"amount": { "type": "double" },           // 金额支持范围查询"status": { "type": "integer" },          // 订单状态"province": { "type": "keyword" },        // 省份(支持聚合分析)"city": { "type": "keyword" },            // 城市(支持聚合分析)"product_ids": { "type": "keyword" },     // 商品ID(支持聚合分析)"create_time": { "type": "date" },        // 时间(支持范围查询)"update_time": { "type": "date" }         // 更新时间}}},"priority": 500,"version": 1,"allow_auto_create": true
}

索引命名策略:按月份创建索引,如order_202310表示 2023 年 10 月的订单数据

4.2 数据同步实现
4.2.1 Canal 配置
canal.properties 核心配置:

# 配置 Canal 服务端口
canal.port = 11111# 配置 ZooKeeper 地址(集群模式)
canal.zkServers = zk1:2181,zk2:2181,zk3:2181# 配置 MQ 模式为 Kafka
canal.serverMode = kafka# 配置 Kafka 集群地址
kafka.bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092

instance.properties 核心配置

# 配置 MySQL 主库地址
canal.instance.master.address = mysql-master:3306# 配置 MySQL 账号密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal@123# 配置监听的数据库
canal.instance.dbNames = db0,db1,db2,db3# 配置表过滤规则,只监听订单表
canal.instance.filter.regex = .*\\.order_.*# 配置 Kafka 主题
canal.mq.topic = order_data# 配置分区策略(按表名哈希)
canal.mq.partition = true
canal.mq.partitionsNum = 8
canal.mq.partitionHash = .*\\.order_.*\\..*=order_id

4.2.2 Kafka 配置
server.properties 核心配置

# 集群唯一标识
broker.id=0# 监听地址
listeners=PLAINTEXT://kafka1:9092# 日志存储路径
log.dirs=/data/kafka/logs# Zookeeper 连接地址
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181# 主题分区数
num.partitions=8# 副本数
default.replication.factor=2# 日志保留时间(7天)
log.retention.hours=168

创建订单数据主题:

kafka-topics.sh --create --bootstrap-server kafka1:9092 --topic order_data --partitions 8 --replication-factor 2

4.2.3 Logstash 配置

# order-sync.conf
input {kafka {bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"topics => ["order_data"]group_id => "logstash-order-group"consumer_threads => 4codec => "json"}
}filter {# 只处理订单数据if [table] =~ /^order_/ {# 解析create_time为日期格式date {match => ["[data][create_time]", "yyyy-MM-dd HH:mm:ss"]target => "[data][create_time]"timezone => "Asia/Shanghai"}# 解析update_time为日期格式date {match => ["[data][update_time]", "yyyy-MM-dd HH:mm:ss"]target => "[data][update_time]"timezone => "Asia/Shanghai"}# 提取年份和月份,用于动态生成索引名ruby {code => 'time = event.get("[data][create_time]")if timeevent.set("index_suffix", time.strftime("%Y%m"))else# 如果没有create_time,使用当前时间event.set("index_suffix", Time.now.strftime("%Y%m"))end'}# 根据操作类型设置ES的操作类型if [type] == "INSERT" or [type] == "UPDATE" {mutate {add_field => { "[@metadata][action]" => "index" }}} else if [type] == "DELETE" {mutate {add_field => { "[@metadata][action]" => "delete" }}}} else {# 非订单数据丢弃drop {}}
}output {# 输出到Elasticsearchelasticsearch {hosts => ["es1:9200", "es2:9200", "es3:9200"]index => "order_%{[index_suffix]}"document_id => "%{[data][order_id]}"action => "%{[@metadata][action]}"doc_as_upsert => trueretry_on_conflict => 3}# 调试日志输出stdout {codec => rubydebug { metadata => false }}
}

4.3 Elasticsearch 查询实现
4.3.1 基础查询示例:跨库查询最近 30 天的退货订单

public List<OrderDTO> queryRecentReturnedOrders() {// 构建查询请求SearchRequest searchRequest = new SearchRequest("order_*");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();// 构建查询条件BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();// 时间范围:最近30天long thirtyDaysAgo = System.currentTimeMillis() - 30L * 24 * 60 * 60 * 1000;boolQuery.must(QueryBuilders.rangeQuery("create_time").gte(new Date(thirtyDaysAgo)).lte(new Date()));// 状态:退货(2)boolQuery.must(QueryBuilders.termQuery("status", 2));// 金额:大于1000boolQuery.must(QueryBuilders.rangeQuery("amount").gt(1000));sourceBuilder.query(boolQuery);// 排序:按创建时间倒序sourceBuilder.sort("create_time", SortOrder.DESC);// 分页:第1页,10条记录sourceBuilder.from(0);sourceBuilder.size(10);searchRequest.source(sourceBuilder);// 执行查询try {SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);// 解析结果List<OrderDTO> orders = new ArrayList<>();for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> sourceMap = hit.getSourceAsMap();OrderDTO order = new OrderDTO();order.setOrderId(sourceMap.get("order_id").toString());order.setUserId(sourceMap.get("user_id").toString());order.setAmount(Double.parseDouble(sourceMap.get("amount").toString()));order.setStatus(Integer.parseInt(sourceMap.get("status").toString()));order.setProvince(sourceMap.get("province").toString());order.setCity(sourceMap.get("city").toString());order.setProductIds(sourceMap.get("product_ids").toString());order.setCreateTime((Date) sourceMap.get("create_time"));order.setUpdateTime((Date) sourceMap.get("update_time"));orders.add(order);}return orders;} catch (IOException e) {log.error("查询ES订单数据失败", e);throw new RuntimeException("查询订单失败", e);}
}

4.3.2 聚合查询示例:各省份订单统计

public List<ProvinceOrderStats> statsByProvince() {// 构建查询请求SearchRequest searchRequest = new SearchRequest("order_*");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();// 不返回具体文档,只返回聚合结果sourceBuilder.size(0);// 构建查询条件:最近7天的数据BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();boolQuery.must(QueryBuilders.rangeQuery("create_time").gte("now-7d"));sourceBuilder.query(boolQuery);// 构建聚合:按省份分组统计TermsAggregationBuilder provinceAgg = AggregationBuilders.terms("group_by_province").field("province").size(34); // 中国34个省级行政区// 每个省份内统计订单数量ValueCountAggregationBuilder countAgg = AggregationBuilders.valueCount("total_count").field("order_id");// 每个省份内统计订单总金额SumAggregationBuilder amountAgg = AggregationBuilders.sum("total_amount").field("amount");// 添加子聚合provinceAgg.subAggregation(countAgg);provinceAgg.subAggregation(amountAgg);// 添加到查询sourceBuilder.aggregation(provinceAgg);searchRequest.source(sourceBuilder);// 执行查询try {SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);// 解析聚合结果Terms terms = response.getAggregations().get("group_by_province");List<ProvinceOrderStats> statsList = new ArrayList<>();for (Terms.Bucket bucket : terms.getBuckets()) {ProvinceOrderStats stats = new ProvinceOrderStats();stats.setProvince(bucket.getKeyAsString());stats.setOrderCount(((ValueCount) bucket.getAggregations().get("total_count")).getValue());stats.setTotalAmount(((Sum) bucket.getAggregations().get("total_amount")).getValue());statsList.add(stats);}// 按订单数量排序statsList.sort((s1, s2) -> Long.compare(s2.getOrderCount(), s1.getOrderCount()));return statsList;} catch (IOException e) {log.error("统计省份订单数据失败", e);throw new RuntimeException("统计订单失败", e);}
}

5. 索引生命周期管理
为优化存储和性能,使用 ES 的索引生命周期管理(ILM)自动管理订单索引的生命周期。

# 创建生命周期策略
PUT _ilm/policy/order_policy
{"policy": {"phases": {"hot": {"min_age": "0ms","actions": {"rollover": {"max_age": "30d","max_docs": 5000000}}},"warm": {"min_age": "30d","actions": {"shrink": {"number_of_shards": 1},"forcemerge": {"max_num_segments": 1}}},"cold": {"min_age": "90d","actions": {"freeze": {}}},"delete": {"min_age": "365d","actions": {"delete": {}}}}}
}# 应用生命周期策略到索引模板
PUT _index_template/order_template
{"index_patterns": ["order_*"],"template": {"settings": {"number_of_shards": 3,"number_of_replicas": 1,"index.lifecycle.name": "order_policy",  // 应用生命周期策略"index.lifecycle.rollover_alias": "order_current"  // 滚动别名},// 映射配置...}
}

6. 性能优化策略
ES 集群优化
合理设置分片数量:每个分片大小控制在 20-50GB
为不同阶段的索引设置不同的副本数:热数据 2 副本,冷数据 1 副本
配置专用的协调节点,分离查询和数据节点
查询优化
避免使用*通配符匹配过多索引,尽量指定时间范围
使用过滤上下文(filter)而非查询上下文(query),避免评分计算
对频繁查询的字段设置合适的类型和分词器
数据同步优化
调整 Canal 的批量提交参数,平衡延迟和吞吐量
为 Kafka 设置合适的分区数,提高并行处理能力
优化 Logstash 的 JVM 参数,避免 OOM

7. 监控与运维
监控指标
ES 集群健康状态、节点资源使用率
索引大小、文档数量、查询延迟
数据同步延迟、同步成功率
各中间件的吞吐量和错误率
告警配置
集群状态异常(red/yellow)告警
同步延迟超过 30 秒告警
磁盘使用率超过 85% 告警
查询延迟超过 500ms 告警
备份策略
ES 索引每日快照备份
备份数据保留 30 天
每周进行一次恢复测试

8. 注意事项与风险
数据一致性:
同步延迟可能导致 ES 数据与 MySQL 数据短暂不一致
建议关键业务采用 “查询主库 + 非关键业务查询 ES” 的混合策略
资源消耗:
ES 集群需要额外的服务器资源
建议至少 3 节点,每节点 8 核 16G 配置
数据安全:
配置 ES 的安全认证,防止未授权访问
敏感字段需在同步过程中脱敏处理
版本兼容性:
保持 Elastic Stack 各组件版本一致
升级前做好充分测试,避免兼容性问题

http://www.dtcms.com/a/351035.html

相关文章:

  • [Maven 基础课程]Maven 是什么
  • 【Linux操作系统】简学深悟启示录:环境变量进程地址
  • Java基础第5天总结(final关键字,枚举,抽象类)
  • Redis-数据类型与常用命令
  • Java数据结构——9.排序
  • 【OpenAI】ChatGPT-4o 全能AI-omni的详细介绍+API KET的使用教程!
  • Stream API 新玩法:从 teeing()到 mapMulti()
  • 多种“找不到vcruntime140.dll,无法继续执行代码”提示的解决方法:从原理到实操,轻松修复系统故障
  • 【Delphi】中通过索引动态定位并创建对应窗体类实例
  • CMake构建学习笔记20-iconv库的构建
  • MATLAB在生态环境数据处理与分析中的应用,生态系统模型构建与数值模拟等
  • 简述滚珠丝杆升降机的结构和原理
  • CSS 结构伪类选择器
  • 【BUG排查】调试瑞萨RH850F1KMS1时候随机出现进入到unused_isr
  • 一款基于 .NET 开源、功能强大的 Windows 搜索工具
  • GD32VW553-IOT开发板测评 搭建环境到电灯(QA分享)
  • 使用提供的 YAML 文件在 Conda 中创建环境
  • Conda的配置
  • 实时平台Flink热更新技术——实现不停机升级!
  • Caddy + CoreDNS 深度解析:从功能架构到性能优化实践(上)
  • webrtc音频QOS方法一.1(NetEQ之音频网络延时DelayManager计算补充)
  • 设计模式学习笔记-----抽象策略模式
  • 【Ansible】Ansible部署K8s集群--准备环境--配置网络
  • 主流的 AI Agent 开发框架
  • 论文阅读(四)| 软件运行时配置研究综述
  • 游戏玩家批量多开挂机如何选择:云手机还是模拟器
  • LabVIEW 场效应晶体管仿真实验平台
  • 工业自动化系统架构-(多动子磁悬浮生产流水线 规划调度执行与协调)
  • 从下载到运行:MySQL 详细安装配置完整教程
  • 【Vue3】Cesium实现卫星及无人机轨迹跟踪