MySQL同步ES的6种方案!
大家好,我是苏三,又跟大家见面了。
引言
在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。
然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。
这篇文章跟大家一起聊聊MySQL同步ES的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。
方案一:同步双写
场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。
在业务代码中同时写入MySQL与ES。
代码如下:
@Transactional
public void createOrder(Order order) { // 写入MySQL orderMapper.insert(order); // 同步写入ES IndexRequest request = new IndexRequest("orders") .id(order.getId()) .source(JSON.toJSONString(order), XContentType.JSON); client.index(request, RequestOptions.DEFAULT);
}
痛点:
-
硬编码侵入:所有涉及写操作的地方均需添加ES写入逻辑。
-
性能瓶颈:双写操作导致事务时间延长,TPS下降30%以上。
-
数据一致性风险:若ES写入失败,需引入补偿机制(如本地事务表+定时重试)。
方案二:异步双写
场景:电商订单状态更新后需同步至ES供客服系统检索。
我们可以使用MQ进行解耦。
架构图如下:
代码示例如下:
// 生产者端
public void updateProduct(Product product) { productMapper.update(product); kafkaTemplate.send("product-update", product.getId());
} // 消费者端
@KafkaListener(topics = "product-update")
public void syncToEs(String productId) { Product product = productMapper.selectById(productId); esClient.index(product);
}
优势:
-
吞吐量提升:通过MQ削峰填谷,可承载万级QPS。
-
故障隔离:ES宕机不影响主业务链路。
缺陷:
-
消息堆积:突发流量可能导致消费延迟(需监控Lag值)。
-
顺序性问题:需通过分区键保证同一数据的顺序消费。
方案三:Logstash定时拉取
场景:用户行为日志的T+1分析场景。
该方案低侵入但高延迟。
配置示例如下:
input {
jdbc{jdbc_driver=>"com.mysql.jdbc.Driver"jdbc_url=>"jdbc:mysql://localhost:3306/log_db"schedule=>"*/5 * * * *"# 每5分钟执行 statement=>"SELECT * FROM user_log WHERE update_time > :sql_last_value"
}
}
output{
elasticsearch{hosts=>["es-host:9200"]index=>"user_logs"
}
}
适用性分析:
-
优点:零代码改造,适合历史数据迁移。
-
致命伤:
-
分钟级延迟(无法满足实时搜索)
-
全表扫描压力大(需优化增量字段索引)
-
-
最近建了一些工作内推群,各大城市都有,欢迎各位HR和找工作的小伙伴进群交流,群里目前已经收集了不少的工作内推岗位。加苏三的微信:li_su223,备注:所在城市,即可进群。
方案四:Canal监听Binlog
场景:社交平台动态实时搜索(如微博热搜更新)。
技术栈:Canal + RocketMQ + ES
该方案高实时,并且低侵入。
架构流程如下:
关键配置:
# canal.properties
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=canal.es.sync
避坑指南:
-
数据漂移:需处理DDL变更(通过Schema Registry管理映射)。
-
幂等消费:通过
_id
唯一键避免重复写入。
方案五:DataX批量同步
场景:将历史订单数据从分库分表MySQL迁移至ES。
该方案是大数据迁移的首选。
配置文件如下:
{
"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"}},"writer":{"name":"elasticsearchwriter","parameter":{"endpoint":"http://es-host:9200","index":"orders"}}}]
}
}
性能调优:
-
调整
channel
数提升并发(建议与分片数对齐) -
启用
limit
分批查询避免OOM
方案六:Flink流处理
场景:商品价格变更时,需关联用户画像计算实时推荐评分。
该方案适合于复杂的ETL场景。
代码片段如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CanalSource()) .map(record -> parseToPriceEvent(record)) .keyBy(event -> event.getProductId()) .connect(userProfileBroadcastStream) .process(new PriceRecommendationProcess()) .addSink(new ElasticsearchSink());
优势:
-
状态管理:精准处理乱序事件(Watermark机制)
-
维表关联:通过Broadcast State实现实时画像关联
总结:
对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?
下面用一张表格做对比:
方案 | 实时性 | 侵入性 | 复杂度 | 适用阶段 |
---|---|---|---|---|
同步双写 | 秒级 | 高 | 低 | 小型单体项目 |
MQ异步 | 秒级 | 中 | 中 | 中型分布式系统 |
Logstash | 分钟级 | 无 | 低 | 离线分析 |
Canal | 毫秒级 | 无 | 高 | 高并发生产环境 |
DataX | 小时级 | 无 | 中 | 历史数据迁移 |
Flink | 毫秒级 | 低 | 极高 | 实时数仓 |
苏三的建议:
-
若团队无运维中间件能力 → 选择Logstash或同步双写
-
需秒级延迟且允许改造 → MQ异步 + 本地事务表
-
追求极致实时且资源充足 → Canal + Flink双保险