当前位置: 首页 > news >正文

【ElasticSearch】数据同步

【ElasticSearch】数据同步

  • 【一】es数据同步方案
    • 【1】应用层双写(同步/异步)
    • 【2】使用数据库事务日志(CDC)
    • 【3】定时任务同步
    • 【4】使用Logstash
    • 【5】使用Data Integration工具
  • 【二】选型对比
    • 【1】选型建议
    • 【2】总结
    • 【3】性能瓶颈
    • 【4】注意事项
  • 【三】kafka实现es数据同步
    • 【1】需求说明
    • 【2】es文档CRUD方法
    • 【3】创建主题Topic
    • 【4】kafka生产者
    • 【5】商品业务层发送消息
    • 【6】kafka消费者
      • (1)创建商品
      • (2)编辑商品
      • (3)删除商品
    • 【7】测试案例
      • (1)编辑或者新增商品
      • (2)查询验证

【一】es数据同步方案

【1】应用层双写(同步/异步)

​原理​:在业务代码中,当数据写入数据库的同时,也写入ES。

(1)​同步双写​:在同一个事务中,先写数据库,再写ES。保证强一致性,但性能较低,且存在耦合。
(2)异步双写​:写数据库后,通过消息队列、线程池等方式异步写ES。提高性能,解耦,但可能数据不一致。

​适用场景​:
(1)数据量不大,对实时性要求高。
(2)业务逻辑简单,对性能要求不高(同步双写)。
(3)允许短暂的数据不一致(异步双写)。

实现案例
(1)异步双写

@Service
public class UserService {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate TaskExecutor taskExecutor; // 线程池@Transactionalpublic User createUser(User user) {// 保存到数据库User savedUser = userRepository.save(user);// 异步同步到EStaskExecutor.execute(() -> {elasticsearchRestTemplate.save(savedUser);});return savedUser;}
}

优化技巧:​​
(1)使用AOP解耦同步逻辑
(2)添加重试机制
(3)异步写入ES(使用@Async)

【2】使用数据库事务日志(CDC)

原理​:通过捕获数据库的事务日志(如MySQL的binlog),解析日志并同步到ES。

常用工具:
(1)Debezium​:开源的CDC工具,支持多种数据库。
(2)Canal​:阿里巴巴开源的MySQL binlog解析工具。
(3)Maxwell​:另一个MySQL binlog解析工具。

​流程​:
(1)数据库开启binlog。
(2)CDC工具读取binlog,解析数据变更。
(3)将变更事件发送到消息队列(如Kafka)。
(4)消费者从消息队列获取事件,写入ES。

​适用场景​:
(1)数据量大,对实时性要求较高。
(2)需要解耦,不影响业务代码。
(3)支持多种数据源同步。

优势​:
(1)业务代码无侵入。
(2)高性能,不影响主业务。
(3)支持全量和增量同步。

实现步骤​:
(1)部署Debezium(或Canal)连接MySQL。
(2)配置Debezium将变更发送到Kafka。
(3)编写Spring Boot消费者,从Kafka读取消息并更新ES。

【3】定时任务同步

​原理​:通过定时任务(如Spring Scheduler)定期扫描数据库,将新增或修改的数据同步到ES。

​适用场景​:
(1)数据量不大,对实时性要求不高(如允许分钟级延迟)。
(2)作为兜底方案,与其他方案结合。

@Component
public class EsSyncScheduler {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Scheduled(fixedDelay = 60000) // 每分钟同步一次public void syncDataToEs() {// 查询最近更新的数据(根据更新时间戳)LocalDateTime lastSyncTime = getLastSyncTime(); // 从缓存/数据库获取上次同步时间List<User> users = userRepository.findByUpdatedAtAfter(lastSyncTime);// 同步到ESusers.forEach(user -> elasticsearchRestTemplate.save(user));// 更新同步时间updateLastSyncTime(LocalDateTime.now());}
}

【4】使用Logstash

​原理​:Logstash是一个数据处理管道,支持从多种来源(如JDBC)采集数据,处理后输出到ES。

​步骤​:
(1)配置Logstash的JDBC输入插件,定期查询数据库。
(2)配置输出插件为Elasticsearch。

适用场景​:
(1)数据量中等,对实时性要求不高(分钟级)。
(2)需要从多种数据源同步到ES。

input {jdbc {jdbc_driver_library => "mysql-connector-java-8.0.25.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "root"jdbc_password => "password"schedule => "* * * * *" # 每分钟执行一次statement => "SELECT * FROM users WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"}
}
output {elasticsearch {hosts => ["http://localhost:9200"]index => "users"document_id => "%{id}"}
}

【5】使用Data Integration工具

​工具​:Apache Nifi、Flink、Spark等。

​原理​:通过大数据处理工具实现数据同步,支持复杂的数据转换和清洗。

​适用场景​:
(1)大数据量,需要复杂的ETL处理。
(2)需要将多个数据源的数据整合到ES。

【二】选型对比

【1】选型建议

在这里插入图片描述

【2】总结

(1)追求实时性和解耦​:选择CDC方案(如Debezium+Kafka),适合大型项目。
(2)简单业务,快速实现​:选择应用层异步双写或定时任务同步。
(3)已有大数据平台​:使用Flink、Spark等工具同步。
(4)需要简单ETL且实时性要求不高​:Logstash是较好选择。

【3】性能瓶颈

在这里插入图片描述

【4】注意事项

(1)数据一致性​:异步方案可能导致数据不一致,需根据业务容忍度选择。
(2)性能影响​:双写方案可能影响数据库性能,需评估。
(3)错误处理​:异步方案需考虑重试机制和错误处理,避免数据丢失。
(4)索引管理​:同步前需设计好ES索引的映射和分片策略。

【三】kafka实现es数据同步

【1】需求说明

业务层对商品进行增删改的时候,会通过kafka消息队列异步通知es业务层同步数据

【2】es文档CRUD方法

使用productESRepository实现文档的管理方法,简单易用

注意:文档操作之前要根据id判断是否存在,否则会报错

高版本的es客户端对接口的Response解析会报错,但是不影响执行效果,直接捕捉异常处理掉就行

package com.allen.study.application.elasticSearch.es_service;import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.allen.study.application.api.request.ProductInfoQueryRequest;
import com.allen.study.application.api.response.ProductInfoQueryResponse;
import com.allen.study.application.elasticSearch.ElasticsearchTemplateUtil;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.AdvanceAggreEsResult;
import com.allen.study.application.elasticSearch.es_entity.AdvanceHighLightEsResult;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_repository.ProductESRepository;
import com.allen.study.application.elasticSearch.es_request.ProductEsQueryRequest;
import com.allen.study.application.repository.IProductInfoReadModelRepo;
import com.allen.study.common.base.ApiPageResponse;
import com.allen.study.common.base.ApiResponse;
import com.allen.study.common.base.Pagination;
import com.allen.study.common.utils.redis.RedissonConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.concurrent.TimeUnit;/*** @ClassName: ProductSearchService* @Author: AllenSun* @Date: 2025/8/24 17:01*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductSearchService {private final ProductESRepository productESRepository;private final IProductInfoReadModelRepo productInfoReadModelRepo;private final ProductInfoEsEntityAssembler assembler;private final RedissonConfig redissonConfig;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ElasticsearchTemplateUtil<ProductES> esUtil;public ApiResponse<Boolean> documentExists(ProductES productES) {boolean existsById = productESRepository.existsById(productES.getId());// boolean documentExists = esUtil.documentExists(productES.getId(), ProductES.class);return ApiResponse.ok(existsById);}public ApiResponse<Void> saveDocument(ProductES productES) {if(!documentExists(productES).getData()){// esUtil.saveDocument(productES);try {productESRepository.save(productES);} catch (Exception e) {log.info("文档保存异常:{}",e.getMessage());}} else {log.info("数据已存在");}return ApiResponse.ok();}public ApiResponse<Void> deleteDocument(ProductES productES) {if(documentExists(productES).getData()){// esUtil.deleteById(productES.getId(),ProductES.class);try {productESRepository.deleteById(productES.getId());} catch (Exception e) {log.info("文档删除异常:{}",e.getMessage());}} else {log.info("数据不存在");}return ApiResponse.ok();}}

【3】创建主题Topic

在这里插入图片描述

【4】kafka生产者

package com.allen.study.domain.utils.kafka.producer;import com.alibaba.fastjson.JSON;
import com.allen.study.domain.entity.ProductInfo;
import com.allen.study.domain.vo.EmployeeInfoVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** MQ 消息发送服务* @MethodName: KafkaProducer* @Author: AllenSun* @Date: 2025/3/5 22:21*/
@Component
@Slf4j
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;public static final String ADD_PRODUCT = "add_product";public static final String DEL_PRODUCT = "del_product";public static final String EDIT_PRODUCT = "edit_product";public ListenableFuture<SendResult<String, Object>> sendAddProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(创建商品信息) topic:{} name:{} message:{}", ADD_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(ADD_PRODUCT, objJson);}public ListenableFuture<SendResult<String, Object>> sendEditProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(编辑商品信息) topic:{} name:{} message:{}", EDIT_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(EDIT_PRODUCT, objJson);}public ListenableFuture<SendResult<String, Object>> sendDelProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(删除商品信息) topic:{} name:{} message:{}", DEL_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(DEL_PRODUCT, objJson);}}

【5】商品业务层发送消息

package com.allen.study.domain.service;import cn.hutool.core.util.IdUtil;
import com.allen.study.common.base.DomainResponse;
import com.allen.study.common.utils.kafka.KafkaUtils;
import com.allen.study.domain.entity.ProductInfo;
import com.allen.study.domain.repository.IProductInfoRepo;
import com.allen.study.domain.utils.kafka.producer.KafkaProducer;
import lombok.AllArgsConstructor;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;/*** 商品信息表领域服务** @author AllenSun* @since 2025-08-24 13:47*/
@Service
@AllArgsConstructor
public class ProductInfoService {/*** 商品信息表资源库*/private final IProductInfoRepo productInfoRepo;@Autowiredprivate KafkaProducer kafkaProducer;@Autowiredprivate KafkaUtils kafkaUtils;/*** 创建商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> create(@NotNull ProductInfo productInfo) {// 保存商品信息表productInfo.setId(IdUtil.getSnowflakeNextIdStr());productInfoRepo.create(productInfo);kafkaProducer.sendAddProduct(productInfo);return DomainResponse.ok();}/*** 创建商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> createBatch(@NotNull List<ProductInfo> productInfo) {// 保存商品信息表productInfoRepo.createBatch(productInfo);for (ProductInfo info : productInfo) {kafkaProducer.sendAddProduct(info);}return DomainResponse.ok();}/*** 根据主键删除商品信息表** @param productInfoId 商品信息表主键* @return 响应结果*/public DomainResponse<Void> deleteById(@NotNull String productInfoId) {// 根据主键查询商品信息表ProductInfo productInfo = productInfoRepo.queryById(productInfoId);// 商品信息表数据存在且租户正确,删除商品信息表数据if (ObjectUtils.isNotEmpty(productInfo) && productInfo.checkTenant()) {productInfoRepo.deleteById(productInfoId);kafkaProducer.sendDelProduct(productInfo);}return DomainResponse.ok();}/*** 根据主键更新商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> updateById(@NotNull ProductInfo productInfo) {// 根据主键查询商品信息表ProductInfo exists = productInfoRepo.queryById(productInfo.getId());// 商品信息表数据存在且租户正确,更新商品信息表数据if (ObjectUtils.isNotEmpty(exists) && exists.checkTenant()) {productInfoRepo.updateById(productInfo);kafkaProducer.sendEditProduct(productInfo);}return DomainResponse.ok();}/*** 根据主键查询商品信息表** @param productInfoId 商品信息表主键* @return 响应结果*/public DomainResponse<ProductInfo> queryById(@NotNull String productInfoId) {ProductInfo productInfo = productInfoRepo.queryById(productInfoId);return DomainResponse.ok(productInfo);}/*** 根据主键查询商品信息表** @param productInfoIds 商品信息表主键* @return 响应结果*/public DomainResponse<List<ProductInfo>> queryByIds(@NotNull List<String> productInfoIds) {List<ProductInfo> productInfo = productInfoRepo.queryByIds(productInfoIds);return DomainResponse.ok(productInfo);}/*** 根据主键集合查询商品信息表Map** @param productInfoIds 商品信息表主键* @return 响应结果*/public DomainResponse<Map<String,ProductInfo>> queryId2InfoMap(@NotNull List<String> productInfoIds) {List<ProductInfo> productInfos = productInfoRepo.queryByIds(productInfoIds);Map<String, ProductInfo> id2InfoMap = Optional.ofNullable(productInfos).orElse(Lists.newArrayList()).stream().collect(Collectors.toMap(ProductInfo::getId, Function.identity(), (a, b) -> a));return DomainResponse.ok(id2InfoMap);}}

【6】kafka消费者

(1)创建商品

package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class AddProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public AddProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "add_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理新建后的一系列操作productSearchService.saveDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 新建商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}

(2)编辑商品

package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class EditProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public EditProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "edit_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理修改后的一系列操作productSearchService.deleteDocument(productES);productSearchService.saveDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 修改商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}

(3)删除商品

package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class DelProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public DelProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "del_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理删除后的一系列操作productSearchService.deleteDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 删除商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}

【7】测试案例

(1)编辑或者新增商品

新增、编辑、删除商品业务信息,应该会通过kafka同步到es文档中
在这里插入图片描述

(2)查询验证

在这里插入图片描述

http://www.dtcms.com/a/352639.html

相关文章:

  • 人形机器人的“奥运会“:宇树科技领跑,动捕技术成训练关键
  • git submodule的基本使用
  • 数据与端点安全 (Protect data and apps)
  • 利用 Python 爬虫按关键字搜索 1688 商品详情 API 返回值说明(代码示例)实战指南
  • 从零开始配置前端环境及必要软件安装
  • 技术总结:AArch64架构下Jenkins Agent(RPM容器编译节点)掉线问题分析与排查
  • 基于用户行为分析的精确营销系统
  • 【java并发编程】--cas和synchronized
  • openEuler Embedded 的 Yocto入门 : 2. 构建一个Hello,world!
  • PWM控制实现呼吸灯
  • 基于CentOS7:Linux服务器的初始化流程
  • 基于51单片机的指纹红外密码电子锁
  • 【Elasticsearch】k-NN 搜索深度解析:参数优化与分数过滤实践
  • Pascal使用TMediaPlayer播放MIDI文件时的错误
  • 红外遥控模块
  • 逻辑流图、作业图、执行图、物理图
  • 嵌入式软件移植
  • 【制作100个Unity游戏】从零开始构建类《月圆之夜》《杀戮尖塔》的卡牌游戏(附带项目源码)
  • Windows远程协助安全配置与使用限制
  • STM32G4 SVPWM VF开环强拖电机
  • 2026 届最新大数据专业毕设选题推荐,毕业设计题目汇总
  • 达索 Enovia 许可管理技术白皮书:机制解析与智能优化实践
  • 段式存储、页式存储、段页式存储:三种内存管理策略的演进与权衡
  • PyTorch生成式人工智能——PatchGAN详解与实现
  • Docker实战系列:使用Docker部署YouTrack项目管理系统
  • Linux修改bootflag(启动标签)到指定分区
  • RedHat 5.7升级为PAE内核,并更新yum源
  • 软件产品线过程模型全景解析:双生命周期、SEI 与三生命周期
  • 《数据之心》——鱼小妖全传
  • 数据结构07(Java)-- (堆,大根堆,堆排序)