当前位置: 首页 > news >正文

京东返利app的多数据源整合策略:分布式数据同步与一致性保障

京东返利app的多数据源整合策略:分布式数据同步与一致性保障

大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!

在京东返利app的业务场景中,需整合京东开放平台API数据、用户本地行为数据、第三方支付数据等多类数据源,这些数据分散在不同存储介质(MySQL、Redis、MongoDB)和物理节点中,面临数据格式不统一、同步延迟、一致性难保障等问题。基于此,我们设计了“分层同步+一致性校验”的多数据源整合方案,通过分布式同步组件与一致性算法,实现数据高效整合与可靠流转。以下从数据源分类、同步架构设计、核心代码实现三方面展开说明。
在这里插入图片描述

一、京东返利app数据源分类与核心诉求

1.1 数据源类型划分

根据数据来源与用途,将系统数据源分为三类:

  1. 业务核心数据源:用户账户数据(MySQL)、订单返利数据(MySQL分库分表);
  2. 外部依赖数据源:京东商品数据(通过开放平台API获取)、第三方支付流水(MongoDB);
  3. 缓存与日志数据源:热点商品缓存(Redis)、用户操作日志(Elasticsearch)。

1.2 核心业务诉求

  • 实时性:京东商品价格、库存变更需在10秒内同步至app前端;
  • 一致性:用户下单后,订单数据与返利计算结果需保证最终一致;
  • 可靠性:同步过程中出现网络波动或节点故障,需支持数据重试与断点续传。

二、多数据源分布式同步架构设计

采用“采集-转换-同步-校验”四层架构实现数据整合,各层职责与技术选型如下:

  1. 数据采集层:通过定时任务(Quartz)拉取京东API数据,基于Canal监听MySQL binlog获取本地数据变更;
  2. 数据转换层:使用Flink SQL统一数据格式,将不同数据源的JSON、XML格式转换为系统标准DTO;
  3. 数据同步层:基于RocketMQ实现跨节点数据投递,Redis Cluster用于缓存同步结果;
  4. 一致性校验层:通过TCC事务补偿与定时对账机制,保障数据最终一致性。

三、核心组件代码实现

3.1 京东API数据采集组件

基于Quartz定时拉取京东商品数据,封装API调用与数据解析逻辑,代码如下:

package cn.juwatech.jdrebate.collector;import cn.juwatech.jdrebate.config.JdApiConfig;
import cn.juwatech.jdrebate.dto.JdProductDTO;
import cn.juwatech.jdrebate.utils.HttpClientUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;/*** 京东商品数据采集任务*/
@Component
public class JdProductCollector implements Job {@Autowiredprivate JdApiConfig jdApiConfig;@Autowiredprivate DataTransferService dataTransferService;@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {try {// 1. 构建京东API请求参数Map<String, String> params = new HashMap<>();params.put("appKey", jdApiConfig.getAppKey());params.put("timestamp", String.valueOf(System.currentTimeMillis()));params.put("pageNum", "1");params.put("pageSize", "50");// 签名生成(省略具体签名逻辑)params.put("sign", generateSign(params));// 2. 调用京东商品列表APIString response = HttpClientUtils.doGet(jdApiConfig.getProductListUrl(), params);JSONObject result = JSON.parseObject(response);// 3. 解析API返回数据if ("0".equals(result.getString("code"))) {JSONArray productArray = result.getJSONArray("data");for (int i = 0; i < productArray.size(); i++) {JSONObject productJson = productArray.getJSONObject(i);// 4. 转换为系统标准DTOJdProductDTO productDTO = dataTransferService.convertJdProduct(productJson);// 5. 发送至同步队列sendToSyncQueue(productDTO);}}} catch (Exception e) {throw new JobExecutionException("京东商品数据采集失败", e);}}/*** 生成京东API签名*/private String generateSign(Map<String, String> params) {// 省略签名算法实现(按京东API文档要求拼接参数并加密)return "";}/*** 发送数据至同步队列*/private void sendToSyncQueue(JdProductDTO productDTO) {// 后续章节实现}
}

3.2 MySQL binlog数据监听组件

基于Canal监听用户订单表变更,实时捕获订单创建、修改事件,代码如下:

package cn.juwatech.jdrebate.collector;import cn.juwatech.jdrebate.dto.OrderDTO;
import cn.juwatech.jdrebate.service.SyncService;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;/*** MySQL binlog监听组件(监听订单表变更)*/
@Component
public class MysqlBinlogCollector {@Value("${canal.server.host}")private String canalHost;@Value("${canal.server.port}")private Integer canalPort;@Value("${canal.destination}")private String destination;@Value("${canal.username}")private String username;@Value("${canal.password}")private String password;@Autowiredprivate SyncService syncService;private CanalConnector canalConnector;@PostConstructpublic void init() {// 1. 创建Canal连接canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),destination, username, password);// 2. 启动监听线程new Thread(this::startListen).start();}/*** 启动binlog监听*/private void startListen() {try {canalConnector.connect();// 订阅订单表(数据库.表名)canalConnector.subscribe("jd_rebate.order_info");while (true) {// 3. 获取binlog消息(每次拉取100条)Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {Thread.sleep(1000);continue;}// 4. 解析binlog条目parseBinlogEntries(message.getEntries());// 5. 确认消息消费canalConnector.ack(batchId);}} catch (Exception e) {e.printStackTrace();// 异常重试try {Thread.sleep(5000);startListen();} catch (InterruptedException ex) {ex.printStackTrace();}}}/*** 解析binlog条目,转换为订单DTO*/private void parseBinlogEntries(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {continue;}CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("解析binlog条目失败", e);}// 处理INSERT/UPDATE事件if (rowChange.getEventType() == CanalEntry.EventType.INSERT ||rowChange.getEventType() == CanalEntry.EventType.UPDATE) {for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {OrderDTO orderDTO = buildOrderDTO(rowData.getAfterColumnsList());// 发送至同步服务syncService.syncOrderData(orderDTO);}}}}/*** 从binlog列数据构建订单DTO*/private OrderDTO buildOrderDTO(List<CanalEntry.Column> columns) {OrderDTO orderDTO = new OrderDTO();for (CanalEntry.Column column : columns) {switch (column.getName()) {case "order_id":orderDTO.setOrderId(column.getValue());break;case "user_id":orderDTO.setUserId(column.getValue());break;case "product_id":orderDTO.setProductId(column.getValue());break;case "rebate_amount":orderDTO.setRebateAmount(new BigDecimal(column.getValue()));break;// 其他字段映射(省略)}}return orderDTO;}
}

3.3 分布式数据同步服务

基于RocketMQ实现跨数据源同步,封装同步逻辑与重试机制,代码如下:

package cn.juwatech.jdrebate.service;import cn.juwatech.jdrebate.dto.JdProductDTO;
import cn.juwatech.jdrebate.dto.OrderDTO;
import cn.juwatech.jdrebate.mapper.ProductMapper;
import cn.juwatech.jdrebate.mapper.OrderMapper;
import cn.juwatech.jdrebate.utils.RocketMQUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** 分布式数据同步服务*/
@Service
public class SyncService {@Autowiredprivate ProductMapper productMapper;@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate RocketMQUtils rocketMQUtils;/*** 同步京东商品数据至本地MySQL*/public void syncProductData(JdProductDTO productDTO) {try {// 1. 检查商品是否已存在(幂等处理)if (productMapper.selectById(productDTO.getProductId()) != null) {// 存在则更新productMapper.updateById(productDTO);} else {// 不存在则插入productMapper.insert(productDTO);}// 2. 同步至Redis缓存syncProductToRedis(productDTO);} catch (Exception e) {// 同步失败,发送至重试队列rocketMQUtils.send("jd_product_retry_topic", productDTO);throw new RuntimeException("商品数据同步失败", e);}}/*** 同步订单数据(含TCC事务补偿)*/@Transactional(rollbackFor = Exception.class)public void syncOrderData(OrderDTO orderDTO) {// 1. 本地事务:保存订单数据orderMapper.insert(orderDTO);// 2. 发送事务消息,触发返利计算(TCC Try阶段)rocketMQUtils.sendTransactionMsg("jd_order_topic", orderDTO, orderDTO.getOrderId());}/*** 同步商品数据至Redis*/private void syncProductToRedis(JdProductDTO productDTO) {// 省略Redis SET逻辑(使用RedisTemplate)}
}

3.4 数据一致性校验组件

通过定时对账与TCC补偿机制保障一致性,代码如下:

package cn.juwatech.jdrebate.service;import cn.juwatech.jdrebate.mapper.OrderMapper;
import cn.juwatech.jdrebate.mapper.RebateMapper;
import cn.juwatech.jdrebate.dto.OrderDTO;
import cn.juwatech.jdrebate.dto.RebateDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;/*** 数据一致性校验组件(定时对账+补偿)*/
@Component
public class ConsistencyCheckService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate RebateMapper rebateMapper;@Autowiredprivate RebateService rebateService;/*** 每小时执行一次订单-返利对账(0 0 * * * ?)*/@Scheduled(cron = "0 0 * * * ?")public void checkOrderRebateConsistency() {// 1. 查询近1小时已完成但未返利的订单List<OrderDTO> unRebateOrders = orderMapper.selectUnRebateOrders(System.currentTimeMillis() - 3600000L,System.currentTimeMillis());// 2. 对账并补偿for (OrderDTO order : unRebateOrders) {RebateDTO rebate = rebateMapper.selectByOrderId(order.getOrderId());if (rebate == null) {// 返利记录缺失,触发补偿rebateService.compensateRebate(order);}}}
}

四、关键技术优化

  1. 分库分表路由:采用Sharding-JDBC对订单表按用户ID哈希分片,解决单表数据量过大问题;
  2. 缓存一致性保障:商品数据更新时,先更新MySQL再删除Redis缓存(避免缓存脏读);
  3. 失败重试策略:基于RocketMQ重试队列,设置3次重试,失败后存入死信队列人工处理。

本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!


文章转载自:

http://ILQDCacq.yskhj.cn
http://jC65pGuQ.yskhj.cn
http://J5KvuTnt.yskhj.cn
http://x2TE6SPB.yskhj.cn
http://oNpbAV5P.yskhj.cn
http://D56bm8L0.yskhj.cn
http://xnErv5Wp.yskhj.cn
http://DsAdwU5A.yskhj.cn
http://n7kQmqYr.yskhj.cn
http://ALQxKNAL.yskhj.cn
http://Tl6LC2o8.yskhj.cn
http://bsA9nsOT.yskhj.cn
http://hMUVmqiL.yskhj.cn
http://cjrzCmSk.yskhj.cn
http://GbxOJQOW.yskhj.cn
http://veIxicU5.yskhj.cn
http://e6l8SGIL.yskhj.cn
http://3rwY9Tbo.yskhj.cn
http://I7vCSG7p.yskhj.cn
http://IewAN20L.yskhj.cn
http://rNd1Qjpw.yskhj.cn
http://wdRSxnkP.yskhj.cn
http://irgNOOli.yskhj.cn
http://tcoSbF8E.yskhj.cn
http://NadIrAKh.yskhj.cn
http://NQlx3lKd.yskhj.cn
http://bD2ZUNjt.yskhj.cn
http://zcdvsuSm.yskhj.cn
http://L7WSn97A.yskhj.cn
http://udKBavxr.yskhj.cn
http://www.dtcms.com/a/380127.html

相关文章:

  • 提升复购为什么对品牌很重要?
  • 第三方软件测试机构【性能测试工具用LoadRunner还是JMeter?】
  • 适合工业用的笔记本电脑
  • 8卡直连,Turin加持!国鑫8U8卡服务器让生成式AI落地更近一步
  • SELinux安全上下文
  • 【项目】 :C++ - 仿mudou库one thread one loop式并发服务器实现(代码实现)
  • 主动性算法-解决点:新陈代谢
  • 从0开始开发app(AI助手版)-架构及环境搭建
  • 服务器内存不足会造成哪些影响?
  • 缓存三大劫攻防战:穿透、击穿、雪崩的Java实战防御体系(二)
  • MongoDB BI Connector 详细介绍与使用指南(手动安装方式,CentOS 7 + MongoDB 5.0.5)
  • 【计算机网络】HTTP协议(一)——超文本传输协议
  • 【国内电子数据取证厂商龙信科技】被格式化的手机如何恢复数据
  • 【项目】 :C++ - 仿mudou库one thread one loop式并发服务器实现(模块划分)
  • 采集集群外的k8s(prometheus监控)
  • AI 玩转网页自动化无压力:基于函数计算 FC 构建 Browser Tool Sandbox
  • Redisson原理与面试问题解析
  • ICCV 2025 | 首次引入Flash Attention,轻量SR窗口扩至32×32还不卡!
  • 关于线性子空间(Linear Subspace)的数学定义
  • OpenHarmony AVSession深度解析(二):从本地会话到分布式跨设备协同的完整生命周期管理
  • 12.NModbus4在C#上的部署与使用 C#例子 WPF例子
  • 迅为RK3568开发板Linux_NVR_SDK 系统开发-扩展根文件系统
  • OpenCV:特征提取
  • Zynq开发实践(FPGA之第一个vivado工程)
  • 数字人技术如何与数字孪生深度融合?
  • 如何生成 GitHub Token(用于 Hexo 部署):保姆级教程+避坑指南
  • Python uv常用命令及使用详解
  • MySQL主从同步参数调优案例
  • Python的uv包管理工具使用
  • 构建python3.11+uv+openssh环境的docker镜像