【ElasticSearch】数据同步
【ElasticSearch】数据同步
- 【一】es数据同步方案
- 【1】应用层双写(同步/异步)
- 【2】使用数据库事务日志(CDC)
- 【3】定时任务同步
- 【4】使用Logstash
- 【5】使用Data Integration工具
- 【二】选型对比
- 【1】选型建议
- 【2】总结
- 【3】性能瓶颈
- 【4】注意事项
- 【三】kafka实现es数据同步
- 【1】需求说明
- 【2】es文档CRUD方法
- 【3】创建主题Topic
- 【4】kafka生产者
- 【5】商品业务层发送消息
- 【6】kafka消费者
- (1)创建商品
- (2)编辑商品
- (3)删除商品
- 【7】测试案例
- (1)编辑或者新增商品
- (2)查询验证
【一】es数据同步方案
【1】应用层双写(同步/异步)
原理:在业务代码中,当数据写入数据库的同时,也写入ES。
(1)同步双写:在同一个事务中,先写数据库,再写ES。保证强一致性,但性能较低,且存在耦合。
(2)异步双写:写数据库后,通过消息队列、线程池等方式异步写ES。提高性能,解耦,但可能数据不一致。
适用场景:
(1)数据量不大,对实时性要求高。
(2)业务逻辑简单,对性能要求不高(同步双写)。
(3)允许短暂的数据不一致(异步双写)。
实现案例
(1)异步双写
@Service
public class UserService {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate TaskExecutor taskExecutor; // 线程池@Transactionalpublic User createUser(User user) {// 保存到数据库User savedUser = userRepository.save(user);// 异步同步到EStaskExecutor.execute(() -> {elasticsearchRestTemplate.save(savedUser);});return savedUser;}
}
优化技巧:
(1)使用AOP解耦同步逻辑
(2)添加重试机制
(3)异步写入ES(使用@Async)
【2】使用数据库事务日志(CDC)
原理:通过捕获数据库的事务日志(如MySQL的binlog),解析日志并同步到ES。
常用工具:
(1)Debezium:开源的CDC工具,支持多种数据库。
(2)Canal:阿里巴巴开源的MySQL binlog解析工具。
(3)Maxwell:另一个MySQL binlog解析工具。
流程:
(1)数据库开启binlog。
(2)CDC工具读取binlog,解析数据变更。
(3)将变更事件发送到消息队列(如Kafka)。
(4)消费者从消息队列获取事件,写入ES。
适用场景:
(1)数据量大,对实时性要求较高。
(2)需要解耦,不影响业务代码。
(3)支持多种数据源同步。
优势:
(1)业务代码无侵入。
(2)高性能,不影响主业务。
(3)支持全量和增量同步。
实现步骤:
(1)部署Debezium(或Canal)连接MySQL。
(2)配置Debezium将变更发送到Kafka。
(3)编写Spring Boot消费者,从Kafka读取消息并更新ES。
【3】定时任务同步
原理:通过定时任务(如Spring Scheduler)定期扫描数据库,将新增或修改的数据同步到ES。
适用场景:
(1)数据量不大,对实时性要求不高(如允许分钟级延迟)。
(2)作为兜底方案,与其他方案结合。
@Component
public class EsSyncScheduler {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Scheduled(fixedDelay = 60000) // 每分钟同步一次public void syncDataToEs() {// 查询最近更新的数据(根据更新时间戳)LocalDateTime lastSyncTime = getLastSyncTime(); // 从缓存/数据库获取上次同步时间List<User> users = userRepository.findByUpdatedAtAfter(lastSyncTime);// 同步到ESusers.forEach(user -> elasticsearchRestTemplate.save(user));// 更新同步时间updateLastSyncTime(LocalDateTime.now());}
}
【4】使用Logstash
原理:Logstash是一个数据处理管道,支持从多种来源(如JDBC)采集数据,处理后输出到ES。
步骤:
(1)配置Logstash的JDBC输入插件,定期查询数据库。
(2)配置输出插件为Elasticsearch。
适用场景:
(1)数据量中等,对实时性要求不高(分钟级)。
(2)需要从多种数据源同步到ES。
input {jdbc {jdbc_driver_library => "mysql-connector-java-8.0.25.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "root"jdbc_password => "password"schedule => "* * * * *" # 每分钟执行一次statement => "SELECT * FROM users WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"}
}
output {elasticsearch {hosts => ["http://localhost:9200"]index => "users"document_id => "%{id}"}
}
【5】使用Data Integration工具
工具:Apache Nifi、Flink、Spark等。
原理:通过大数据处理工具实现数据同步,支持复杂的数据转换和清洗。
适用场景:
(1)大数据量,需要复杂的ETL处理。
(2)需要将多个数据源的数据整合到ES。
【二】选型对比
【1】选型建议
【2】总结
(1)追求实时性和解耦:选择CDC方案(如Debezium+Kafka),适合大型项目。
(2)简单业务,快速实现:选择应用层异步双写或定时任务同步。
(3)已有大数据平台:使用Flink、Spark等工具同步。
(4)需要简单ETL且实时性要求不高:Logstash是较好选择。
【3】性能瓶颈
【4】注意事项
(1)数据一致性:异步方案可能导致数据不一致,需根据业务容忍度选择。
(2)性能影响:双写方案可能影响数据库性能,需评估。
(3)错误处理:异步方案需考虑重试机制和错误处理,避免数据丢失。
(4)索引管理:同步前需设计好ES索引的映射和分片策略。
【三】kafka实现es数据同步
【1】需求说明
业务层对商品进行增删改的时候,会通过kafka消息队列异步通知es业务层同步数据
【2】es文档CRUD方法
使用productESRepository实现文档的管理方法,简单易用
注意:文档操作之前要根据id判断是否存在,否则会报错
高版本的es客户端对接口的Response解析会报错,但是不影响执行效果,直接捕捉异常处理掉就行
package com.allen.study.application.elasticSearch.es_service;import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.allen.study.application.api.request.ProductInfoQueryRequest;
import com.allen.study.application.api.response.ProductInfoQueryResponse;
import com.allen.study.application.elasticSearch.ElasticsearchTemplateUtil;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.AdvanceAggreEsResult;
import com.allen.study.application.elasticSearch.es_entity.AdvanceHighLightEsResult;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_repository.ProductESRepository;
import com.allen.study.application.elasticSearch.es_request.ProductEsQueryRequest;
import com.allen.study.application.repository.IProductInfoReadModelRepo;
import com.allen.study.common.base.ApiPageResponse;
import com.allen.study.common.base.ApiResponse;
import com.allen.study.common.base.Pagination;
import com.allen.study.common.utils.redis.RedissonConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.concurrent.TimeUnit;/*** @ClassName: ProductSearchService* @Author: AllenSun* @Date: 2025/8/24 17:01*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductSearchService {private final ProductESRepository productESRepository;private final IProductInfoReadModelRepo productInfoReadModelRepo;private final ProductInfoEsEntityAssembler assembler;private final RedissonConfig redissonConfig;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ElasticsearchTemplateUtil<ProductES> esUtil;public ApiResponse<Boolean> documentExists(ProductES productES) {boolean existsById = productESRepository.existsById(productES.getId());// boolean documentExists = esUtil.documentExists(productES.getId(), ProductES.class);return ApiResponse.ok(existsById);}public ApiResponse<Void> saveDocument(ProductES productES) {if(!documentExists(productES).getData()){// esUtil.saveDocument(productES);try {productESRepository.save(productES);} catch (Exception e) {log.info("文档保存异常:{}",e.getMessage());}} else {log.info("数据已存在");}return ApiResponse.ok();}public ApiResponse<Void> deleteDocument(ProductES productES) {if(documentExists(productES).getData()){// esUtil.deleteById(productES.getId(),ProductES.class);try {productESRepository.deleteById(productES.getId());} catch (Exception e) {log.info("文档删除异常:{}",e.getMessage());}} else {log.info("数据不存在");}return ApiResponse.ok();}}
【3】创建主题Topic
【4】kafka生产者
package com.allen.study.domain.utils.kafka.producer;import com.alibaba.fastjson.JSON;
import com.allen.study.domain.entity.ProductInfo;
import com.allen.study.domain.vo.EmployeeInfoVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** MQ 消息发送服务* @MethodName: KafkaProducer* @Author: AllenSun* @Date: 2025/3/5 22:21*/
@Component
@Slf4j
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;public static final String ADD_PRODUCT = "add_product";public static final String DEL_PRODUCT = "del_product";public static final String EDIT_PRODUCT = "edit_product";public ListenableFuture<SendResult<String, Object>> sendAddProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(创建商品信息) topic:{} name:{} message:{}", ADD_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(ADD_PRODUCT, objJson);}public ListenableFuture<SendResult<String, Object>> sendEditProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(编辑商品信息) topic:{} name:{} message:{}", EDIT_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(EDIT_PRODUCT, objJson);}public ListenableFuture<SendResult<String, Object>> sendDelProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(删除商品信息) topic:{} name:{} message:{}", DEL_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(DEL_PRODUCT, objJson);}}
【5】商品业务层发送消息
package com.allen.study.domain.service;import cn.hutool.core.util.IdUtil;
import com.allen.study.common.base.DomainResponse;
import com.allen.study.common.utils.kafka.KafkaUtils;
import com.allen.study.domain.entity.ProductInfo;
import com.allen.study.domain.repository.IProductInfoRepo;
import com.allen.study.domain.utils.kafka.producer.KafkaProducer;
import lombok.AllArgsConstructor;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;/*** 商品信息表领域服务** @author AllenSun* @since 2025-08-24 13:47*/
@Service
@AllArgsConstructor
public class ProductInfoService {/*** 商品信息表资源库*/private final IProductInfoRepo productInfoRepo;@Autowiredprivate KafkaProducer kafkaProducer;@Autowiredprivate KafkaUtils kafkaUtils;/*** 创建商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> create(@NotNull ProductInfo productInfo) {// 保存商品信息表productInfo.setId(IdUtil.getSnowflakeNextIdStr());productInfoRepo.create(productInfo);kafkaProducer.sendAddProduct(productInfo);return DomainResponse.ok();}/*** 创建商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> createBatch(@NotNull List<ProductInfo> productInfo) {// 保存商品信息表productInfoRepo.createBatch(productInfo);for (ProductInfo info : productInfo) {kafkaProducer.sendAddProduct(info);}return DomainResponse.ok();}/*** 根据主键删除商品信息表** @param productInfoId 商品信息表主键* @return 响应结果*/public DomainResponse<Void> deleteById(@NotNull String productInfoId) {// 根据主键查询商品信息表ProductInfo productInfo = productInfoRepo.queryById(productInfoId);// 商品信息表数据存在且租户正确,删除商品信息表数据if (ObjectUtils.isNotEmpty(productInfo) && productInfo.checkTenant()) {productInfoRepo.deleteById(productInfoId);kafkaProducer.sendDelProduct(productInfo);}return DomainResponse.ok();}/*** 根据主键更新商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> updateById(@NotNull ProductInfo productInfo) {// 根据主键查询商品信息表ProductInfo exists = productInfoRepo.queryById(productInfo.getId());// 商品信息表数据存在且租户正确,更新商品信息表数据if (ObjectUtils.isNotEmpty(exists) && exists.checkTenant()) {productInfoRepo.updateById(productInfo);kafkaProducer.sendEditProduct(productInfo);}return DomainResponse.ok();}/*** 根据主键查询商品信息表** @param productInfoId 商品信息表主键* @return 响应结果*/public DomainResponse<ProductInfo> queryById(@NotNull String productInfoId) {ProductInfo productInfo = productInfoRepo.queryById(productInfoId);return DomainResponse.ok(productInfo);}/*** 根据主键查询商品信息表** @param productInfoIds 商品信息表主键* @return 响应结果*/public DomainResponse<List<ProductInfo>> queryByIds(@NotNull List<String> productInfoIds) {List<ProductInfo> productInfo = productInfoRepo.queryByIds(productInfoIds);return DomainResponse.ok(productInfo);}/*** 根据主键集合查询商品信息表Map** @param productInfoIds 商品信息表主键* @return 响应结果*/public DomainResponse<Map<String,ProductInfo>> queryId2InfoMap(@NotNull List<String> productInfoIds) {List<ProductInfo> productInfos = productInfoRepo.queryByIds(productInfoIds);Map<String, ProductInfo> id2InfoMap = Optional.ofNullable(productInfos).orElse(Lists.newArrayList()).stream().collect(Collectors.toMap(ProductInfo::getId, Function.identity(), (a, b) -> a));return DomainResponse.ok(id2InfoMap);}}
【6】kafka消费者
(1)创建商品
package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class AddProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public AddProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "add_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理新建后的一系列操作productSearchService.saveDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 新建商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}
(2)编辑商品
package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class EditProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public EditProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "edit_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理修改后的一系列操作productSearchService.deleteDocument(productES);productSearchService.saveDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 修改商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}
(3)删除商品
package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class DelProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public DelProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "del_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理删除后的一系列操作productSearchService.deleteDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 删除商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}
【7】测试案例
(1)编辑或者新增商品
新增、编辑、删除商品业务信息,应该会通过kafka同步到es文档中