当前位置: 首页 > news >正文

公司网站模板下载网站建设大体包含

公司网站模板下载,网站建设大体包含,拼多多跨境电商怎么样,wordpress外贸主题用哪个好【ElasticSearch】数据同步【一】es数据同步方案【1】应用层双写(同步/异步)【2】使用数据库事务日志(CDC)【3】定时任务同步【4】使用Logstash【5】使用Data Integration工具【二】选型对比【1】选型建议【2】总结【3】性能瓶颈【…

【ElasticSearch】数据同步

  • 【一】es数据同步方案
    • 【1】应用层双写(同步/异步)
    • 【2】使用数据库事务日志(CDC)
    • 【3】定时任务同步
    • 【4】使用Logstash
    • 【5】使用Data Integration工具
  • 【二】选型对比
    • 【1】选型建议
    • 【2】总结
    • 【3】性能瓶颈
    • 【4】注意事项
  • 【三】kafka实现es数据同步
    • 【1】需求说明
    • 【2】es文档CRUD方法
    • 【3】创建主题Topic
    • 【4】kafka生产者
    • 【5】商品业务层发送消息
    • 【6】kafka消费者
      • (1)创建商品
      • (2)编辑商品
      • (3)删除商品
    • 【7】测试案例
      • (1)编辑或者新增商品
      • (2)查询验证

【一】es数据同步方案

【1】应用层双写(同步/异步)

​原理​:在业务代码中,当数据写入数据库的同时,也写入ES。

(1)​同步双写​:在同一个事务中,先写数据库,再写ES。保证强一致性,但性能较低,且存在耦合。
(2)异步双写​:写数据库后,通过消息队列、线程池等方式异步写ES。提高性能,解耦,但可能数据不一致。

​适用场景​:
(1)数据量不大,对实时性要求高。
(2)业务逻辑简单,对性能要求不高(同步双写)。
(3)允许短暂的数据不一致(异步双写)。

实现案例
(1)异步双写

@Service
public class UserService {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate TaskExecutor taskExecutor; // 线程池@Transactionalpublic User createUser(User user) {// 保存到数据库User savedUser = userRepository.save(user);// 异步同步到EStaskExecutor.execute(() -> {elasticsearchRestTemplate.save(savedUser);});return savedUser;}
}

优化技巧:​​
(1)使用AOP解耦同步逻辑
(2)添加重试机制
(3)异步写入ES(使用@Async)

【2】使用数据库事务日志(CDC)

原理​:通过捕获数据库的事务日志(如MySQL的binlog),解析日志并同步到ES。

常用工具:
(1)Debezium​:开源的CDC工具,支持多种数据库。
(2)Canal​:阿里巴巴开源的MySQL binlog解析工具。
(3)Maxwell​:另一个MySQL binlog解析工具。

​流程​:
(1)数据库开启binlog。
(2)CDC工具读取binlog,解析数据变更。
(3)将变更事件发送到消息队列(如Kafka)。
(4)消费者从消息队列获取事件,写入ES。

​适用场景​:
(1)数据量大,对实时性要求较高。
(2)需要解耦,不影响业务代码。
(3)支持多种数据源同步。

优势​:
(1)业务代码无侵入。
(2)高性能,不影响主业务。
(3)支持全量和增量同步。

实现步骤​:
(1)部署Debezium(或Canal)连接MySQL。
(2)配置Debezium将变更发送到Kafka。
(3)编写Spring Boot消费者,从Kafka读取消息并更新ES。

【3】定时任务同步

​原理​:通过定时任务(如Spring Scheduler)定期扫描数据库,将新增或修改的数据同步到ES。

​适用场景​:
(1)数据量不大,对实时性要求不高(如允许分钟级延迟)。
(2)作为兜底方案,与其他方案结合。

@Component
public class EsSyncScheduler {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Scheduled(fixedDelay = 60000) // 每分钟同步一次public void syncDataToEs() {// 查询最近更新的数据(根据更新时间戳)LocalDateTime lastSyncTime = getLastSyncTime(); // 从缓存/数据库获取上次同步时间List<User> users = userRepository.findByUpdatedAtAfter(lastSyncTime);// 同步到ESusers.forEach(user -> elasticsearchRestTemplate.save(user));// 更新同步时间updateLastSyncTime(LocalDateTime.now());}
}

【4】使用Logstash

​原理​:Logstash是一个数据处理管道,支持从多种来源(如JDBC)采集数据,处理后输出到ES。

​步骤​:
(1)配置Logstash的JDBC输入插件,定期查询数据库。
(2)配置输出插件为Elasticsearch。

适用场景​:
(1)数据量中等,对实时性要求不高(分钟级)。
(2)需要从多种数据源同步到ES。

input {jdbc {jdbc_driver_library => "mysql-connector-java-8.0.25.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "root"jdbc_password => "password"schedule => "* * * * *" # 每分钟执行一次statement => "SELECT * FROM users WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"}
}
output {elasticsearch {hosts => ["http://localhost:9200"]index => "users"document_id => "%{id}"}
}

【5】使用Data Integration工具

​工具​:Apache Nifi、Flink、Spark等。

​原理​:通过大数据处理工具实现数据同步,支持复杂的数据转换和清洗。

​适用场景​:
(1)大数据量,需要复杂的ETL处理。
(2)需要将多个数据源的数据整合到ES。

【二】选型对比

【1】选型建议

在这里插入图片描述

【2】总结

(1)追求实时性和解耦​:选择CDC方案(如Debezium+Kafka),适合大型项目。
(2)简单业务,快速实现​:选择应用层异步双写或定时任务同步。
(3)已有大数据平台​:使用Flink、Spark等工具同步。
(4)需要简单ETL且实时性要求不高​:Logstash是较好选择。

【3】性能瓶颈

在这里插入图片描述

【4】注意事项

(1)数据一致性​:异步方案可能导致数据不一致,需根据业务容忍度选择。
(2)性能影响​:双写方案可能影响数据库性能,需评估。
(3)错误处理​:异步方案需考虑重试机制和错误处理,避免数据丢失。
(4)索引管理​:同步前需设计好ES索引的映射和分片策略。

【三】kafka实现es数据同步

【1】需求说明

业务层对商品进行增删改的时候,会通过kafka消息队列异步通知es业务层同步数据

【2】es文档CRUD方法

使用productESRepository实现文档的管理方法,简单易用

注意:文档操作之前要根据id判断是否存在,否则会报错

高版本的es客户端对接口的Response解析会报错,但是不影响执行效果,直接捕捉异常处理掉就行

package com.allen.study.application.elasticSearch.es_service;import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import com.allen.study.application.api.request.ProductInfoQueryRequest;
import com.allen.study.application.api.response.ProductInfoQueryResponse;
import com.allen.study.application.elasticSearch.ElasticsearchTemplateUtil;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.AdvanceAggreEsResult;
import com.allen.study.application.elasticSearch.es_entity.AdvanceHighLightEsResult;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_repository.ProductESRepository;
import com.allen.study.application.elasticSearch.es_request.ProductEsQueryRequest;
import com.allen.study.application.repository.IProductInfoReadModelRepo;
import com.allen.study.common.base.ApiPageResponse;
import com.allen.study.common.base.ApiResponse;
import com.allen.study.common.base.Pagination;
import com.allen.study.common.utils.redis.RedissonConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.concurrent.TimeUnit;/*** @ClassName: ProductSearchService* @Author: AllenSun* @Date: 2025/8/24 17:01*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductSearchService {private final ProductESRepository productESRepository;private final IProductInfoReadModelRepo productInfoReadModelRepo;private final ProductInfoEsEntityAssembler assembler;private final RedissonConfig redissonConfig;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ElasticsearchTemplateUtil<ProductES> esUtil;public ApiResponse<Boolean> documentExists(ProductES productES) {boolean existsById = productESRepository.existsById(productES.getId());// boolean documentExists = esUtil.documentExists(productES.getId(), ProductES.class);return ApiResponse.ok(existsById);}public ApiResponse<Void> saveDocument(ProductES productES) {if(!documentExists(productES).getData()){// esUtil.saveDocument(productES);try {productESRepository.save(productES);} catch (Exception e) {log.info("文档保存异常:{}",e.getMessage());}} else {log.info("数据已存在");}return ApiResponse.ok();}public ApiResponse<Void> deleteDocument(ProductES productES) {if(documentExists(productES).getData()){// esUtil.deleteById(productES.getId(),ProductES.class);try {productESRepository.deleteById(productES.getId());} catch (Exception e) {log.info("文档删除异常:{}",e.getMessage());}} else {log.info("数据不存在");}return ApiResponse.ok();}}

【3】创建主题Topic

在这里插入图片描述

【4】kafka生产者

package com.allen.study.domain.utils.kafka.producer;import com.alibaba.fastjson.JSON;
import com.allen.study.domain.entity.ProductInfo;
import com.allen.study.domain.vo.EmployeeInfoVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** MQ 消息发送服务* @MethodName: KafkaProducer* @Author: AllenSun* @Date: 2025/3/5 22:21*/
@Component
@Slf4j
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;public static final String ADD_PRODUCT = "add_product";public static final String DEL_PRODUCT = "del_product";public static final String EDIT_PRODUCT = "edit_product";public ListenableFuture<SendResult<String, Object>> sendAddProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(创建商品信息) topic:{} name:{} message:{}", ADD_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(ADD_PRODUCT, objJson);}public ListenableFuture<SendResult<String, Object>> sendEditProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(编辑商品信息) topic:{} name:{} message:{}", EDIT_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(EDIT_PRODUCT, objJson);}public ListenableFuture<SendResult<String, Object>> sendDelProduct(ProductInfo productInfo) {String objJson = JSON.toJSONString(productInfo);log.info("发送MQ消息(删除商品信息) topic:{} name:{} message:{}", DEL_PRODUCT, productInfo.getProductName(), objJson);return kafkaTemplate.send(DEL_PRODUCT, objJson);}}

【5】商品业务层发送消息

package com.allen.study.domain.service;import cn.hutool.core.util.IdUtil;
import com.allen.study.common.base.DomainResponse;
import com.allen.study.common.utils.kafka.KafkaUtils;
import com.allen.study.domain.entity.ProductInfo;
import com.allen.study.domain.repository.IProductInfoRepo;
import com.allen.study.domain.utils.kafka.producer.KafkaProducer;
import lombok.AllArgsConstructor;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;/*** 商品信息表领域服务** @author AllenSun* @since 2025-08-24 13:47*/
@Service
@AllArgsConstructor
public class ProductInfoService {/*** 商品信息表资源库*/private final IProductInfoRepo productInfoRepo;@Autowiredprivate KafkaProducer kafkaProducer;@Autowiredprivate KafkaUtils kafkaUtils;/*** 创建商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> create(@NotNull ProductInfo productInfo) {// 保存商品信息表productInfo.setId(IdUtil.getSnowflakeNextIdStr());productInfoRepo.create(productInfo);kafkaProducer.sendAddProduct(productInfo);return DomainResponse.ok();}/*** 创建商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> createBatch(@NotNull List<ProductInfo> productInfo) {// 保存商品信息表productInfoRepo.createBatch(productInfo);for (ProductInfo info : productInfo) {kafkaProducer.sendAddProduct(info);}return DomainResponse.ok();}/*** 根据主键删除商品信息表** @param productInfoId 商品信息表主键* @return 响应结果*/public DomainResponse<Void> deleteById(@NotNull String productInfoId) {// 根据主键查询商品信息表ProductInfo productInfo = productInfoRepo.queryById(productInfoId);// 商品信息表数据存在且租户正确,删除商品信息表数据if (ObjectUtils.isNotEmpty(productInfo) && productInfo.checkTenant()) {productInfoRepo.deleteById(productInfoId);kafkaProducer.sendDelProduct(productInfo);}return DomainResponse.ok();}/*** 根据主键更新商品信息表** @param productInfo 商品信息表实体* @return 响应结果*/public DomainResponse<Void> updateById(@NotNull ProductInfo productInfo) {// 根据主键查询商品信息表ProductInfo exists = productInfoRepo.queryById(productInfo.getId());// 商品信息表数据存在且租户正确,更新商品信息表数据if (ObjectUtils.isNotEmpty(exists) && exists.checkTenant()) {productInfoRepo.updateById(productInfo);kafkaProducer.sendEditProduct(productInfo);}return DomainResponse.ok();}/*** 根据主键查询商品信息表** @param productInfoId 商品信息表主键* @return 响应结果*/public DomainResponse<ProductInfo> queryById(@NotNull String productInfoId) {ProductInfo productInfo = productInfoRepo.queryById(productInfoId);return DomainResponse.ok(productInfo);}/*** 根据主键查询商品信息表** @param productInfoIds 商品信息表主键* @return 响应结果*/public DomainResponse<List<ProductInfo>> queryByIds(@NotNull List<String> productInfoIds) {List<ProductInfo> productInfo = productInfoRepo.queryByIds(productInfoIds);return DomainResponse.ok(productInfo);}/*** 根据主键集合查询商品信息表Map** @param productInfoIds 商品信息表主键* @return 响应结果*/public DomainResponse<Map<String,ProductInfo>> queryId2InfoMap(@NotNull List<String> productInfoIds) {List<ProductInfo> productInfos = productInfoRepo.queryByIds(productInfoIds);Map<String, ProductInfo> id2InfoMap = Optional.ofNullable(productInfos).orElse(Lists.newArrayList()).stream().collect(Collectors.toMap(ProductInfo::getId, Function.identity(), (a, b) -> a));return DomainResponse.ok(id2InfoMap);}}

【6】kafka消费者

(1)创建商品

package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class AddProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public AddProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "add_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理新建后的一系列操作productSearchService.saveDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 新建商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}

(2)编辑商品

package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class EditProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public EditProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "edit_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理修改后的一系列操作productSearchService.deleteDocument(productES);productSearchService.saveDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 修改商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}

(3)删除商品

package com.allen.study.application.elasticSearch.esSyncMq;import com.alibaba.fastjson.JSON;
import com.allen.study.application.elasticSearch.es_assembler.ProductInfoEsEntityAssembler;
import com.allen.study.application.elasticSearch.es_entity.ProductES;
import com.allen.study.application.elasticSearch.es_service.ProductSearchService;
import com.allen.study.domain.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
@Slf4j
public class DelProductListener {private final ProductSearchService productSearchService;private final ProductInfoEsEntityAssembler assembler;public DelProductListener(ProductSearchService productSearchService, ProductInfoEsEntityAssembler assembler) {this.productSearchService = productSearchService;this.assembler = assembler;}@KafkaListener(topics = "del_product", groupId = "product_info")public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());// 1. 判断消息是否存在if (!message.isPresent()) {return;}// 2. 处理 MQ 消息try {// 1. 转化对象(或者你也可以重写Serializer<T>)ProductInfo info = JSON.parseObject((String) message.get(), ProductInfo.class);ProductES productES = assembler.info2EsEntity(info);// 2. 处理删除后的一系列操作productSearchService.deleteDocument(productES);// 3. 打印日志log.info("消费MQ消息,完成 topic:{} name:{} 删除商品流程处理结果:{}", topic, info.getProductName(),JSON.toJSONString(info));// 4. 消息消费完成ack.acknowledge();} catch (Exception e) {// 处理失败,消息重试。log.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());throw e;}}}

【7】测试案例

(1)编辑或者新增商品

新增、编辑、删除商品业务信息,应该会通过kafka同步到es文档中
在这里插入图片描述

(2)查询验证

在这里插入图片描述

http://www.dtcms.com/a/413630.html

相关文章:

  • 一级做受网站网站制作价格上海
  • 学校网站用途重庆市工程建设信息网新网站
  • 名校建设专题网站自己的电脑怎么做网站
  • 杭州建设信用网新网站百度标注平台怎么加入
  • 怎样可以开网站招聘网站开发源码
  • 如何用dedecms做网站平邑县住房和城乡建设局网站
  • 大连模板建站哪家好常用wordpress搭建环境
  • 怎样下载建设银行信用卡网站网站可以免费建设吗
  • html5 网站源码娱乐彩票网站建设制作
  • 杭州网站程序开发公司全网精准获客营销
  • 郑州做网站软件导航类网站模板
  • 阿里巴巴开通诚信通后网站怎么做中山半江红网站建设
  • 网站怎么做等级保护建设通破解vip
  • 同泰公司网站公司查询松北建设局网站
  • 建站都需要什么怎么做文学动漫网站
  • 如何在电脑里做网站宁晋网站建设
  • 做国外网站有哪些临沂外贸网站建设
  • 爱做的小说网站吗wordpress编辑模板
  • 基于android的app开发用什么软件英文网站seo发展前景
  • 湖北做网站平台哪家好s9视频直播
  • 用flash做的网站平面设计好的网站
  • 域名备案企业网站内容一个公司完整的组织架构
  • 建设网站呼叫中心有什么好处郑州机械网站制作
  • 整站seo网络科技有限公司简介范文
  • 平阳县建设局网站太原seo网站建设
  • 贵州省水利建设管理总站网站深圳网站建设分期付
  • 在哪个网站可以做二建的题网站开发合同 下载
  • 廊坊seo网站排名python自学免费教程
  • 网站访问慢 分析工具做网络技术方案叫什么
  • 网站动态背景怎么做什么是网盟推广