当前位置: 首页 > news >正文

商品中心—18.库存分桶的一致性改造文档

大纲

1.分布式库存扣减时序图和流程图

2.库存入桶分配改造

3.库存分桶上线改造

4.库存分桶扩容改造

5.库存分桶下线改造

6.执行库存分桶缓存操作的定时任务

7.分桶操作之初始化分配库存的处理策略

8.分桶操作之增加库存与分桶上线的处理策略

9.分桶操作之分桶扩容的处理策略

10.分桶操作之分桶下线的处理策略

11.清除执⾏成功的分桶操作的定时任务

1.分布式库存扣减时序图和流程图

(1)分布式库存扣减时序图

(2)分布式库存扣减流程图

(3)数据库设计

(1)分布式库存扣减时序图

(2)分布式库存扣减流程图

(3)数据库设计

一.库存分桶操作表

CREATE TABLE `inventory_bucket_operate` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,`operate_id` varchar(32) NOT NULL COMMENT '操作id',`seller_id` varchar(64) NOT NULL COMMENT '卖家id',`sku_id` varchar(64) NOT NULL COMMENT '商品sku',`operate_type` tinyint(3) NOT NULL COMMENT '操作类型:1-初始化,2-增加库存,3-分桶上线,4-分桶扩容,5-分桶下线',`bucket` text COMMENT '分桶变动信息',`inventory_num` int(11) DEFAULT NULL COMMENT '变动库存',`feature` text COMMENT '扩展信息',`operate_status` tinyint(4) DEFAULT '0' COMMENT '操作状态',`del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记',`create_user` int(11) DEFAULT NULL COMMENT '创建⼈',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_user` int(11) DEFAULT NULL COMMENT '更新⼈',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=58 DEFAULT CHARSET=utf8mb4 COMMENT='库存分桶操作表';

二.库存操作失败记录表

CREATE TABLE `inventory_operate_fail` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`operate_id` varchar(32) NOT NULL COMMENT '操作id',`fail_type` varchar(32) DEFAULT NULL COMMENT '操作类型',`bucket_no` varchar(32) DEFAULT NULL COMMENT '分桶编号',`inventory_num` int(11) DEFAULT NULL COMMENT '变动库存数量',`del_flag` tinyint(1) DEFAULT NULL COMMENT '删除标识',`create_user` int(11) DEFAULT NULL COMMENT '创建⼈',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_user` int(11) DEFAULT NULL COMMENT '更新⼈',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存操作失败记录表';

三.库存分桶配置表

CREATE TABLE `inventory_bucket_config` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`bucket_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶数量',`max_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼤库存深度,即分桶的最大库存容量',`min_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼩库存深度,即分桶的最小库存容量',`threshold_value` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了',`back_source_proportion` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源⽐例,从1-100设定⽐例',`back_source_step` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',`template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',`is_default` tinyint(1)  NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';

四.库存分配记录表

CREATE TABLE `inventory_allot_detail` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',`inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号',`seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',`inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量',`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';

五.库存扣减明细表

CREATE TABLE `inventory_deduction_detail` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id',`refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号',`inventory_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '扣减库存数量',`sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId',`seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',`bucket_no` int(10)  NOT NULL COMMENT  '扣减分桶编号',`deduction_type` int(2)  NOT NULL COMMENT  '库存操作类型(10库存扣减,20库存退货)',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';

2.库存入桶分配改造

(1)库存分桶初始化入口

(2)库存分桶元数据计算以及分桶编号创建

(3)库存分桶操作记录写DB + 中心桶库存写缓存

(4)库存分桶元数据缓存更新使用自缓存一致性服务的DB + 消息双写方案

(5)库存分桶元数据作为热点数据更新到各机器节点的本地缓存

这里主要进行的是库存分桶初始化分配库存的改造。其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。

计算出元数据(待上线分桶、中⼼桶剩余库存、每个分桶分配库存)信息后,为了保证⼀致性,会先将计算出的分桶元数据信息⼊库。也就是先写⼊库存分桶操作表,然后在缓存中写⼊中⼼桶剩余库存信息。如果⼊库失败或缓存写⼊失败,会抛出异常,数据库回滚,操作不成功。只有⼊库成功和缓存写⼊成功之后,本次操作才成功。

关键环节与核心代码:

(1)库存分桶初始化入口

@RestController
@RequestMapping("/product/inventory")
public class InventoryController {@Autowiredprivate InventoryBucketService inventoryBucketService;@Autowiredprivate InventoryBucketCache cache;@Resourceprivate TairCache tairCache;...//初始化库存@RequestMapping("/init")public void inventoryInit(@RequestBody InventorBucketRequest request) {//清除本地缓存数据,cache.getCache()获取的是Guava Cachecache.getCache().invalidateAll();//清除Tair中的数据,扫描卖家ID+SKU的ID的key会比较耗时Set<String> keys = tairCache.getJedis().keys("*" + request.getSellerId() + request.getSkuId() + "*");if (!CollectionUtils.isEmpty(keys)) {tairCache.mdelete(Lists.newArrayList(keys));}//这里模拟指定本次的库存业务单号,实际接口需要外部传入request.setInventorCode(SnowflakeIdWorker.getCode());//初始化库存信息inventoryBucketService.inventorBucket(request);}...
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;...//商品库存入桶分配@Override@Transactional(rollbackFor = Exception.class)public void inventorBucket(InventorBucketRequest request) {//1.验证入参必填项checkInventorParams(request);//锁key = 卖家ID + SKU的IDString key = buildBucketLockKey(request.getSellerId(), request.getSkuId());String value = SnowflakeIdWorker.getCode();//注意这里需要锁定中心桶库存boolean lock = tairLock.tryLock(key, value);//分配库存时,这个卖家的sku是不允许其他相关操作的if (lock) {try {//2.插入库存入库的记录信息//由于申请的库存业务编号是一个唯一key,所以可以避免重复请求//也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次inventoryRepository.saveInventoryAllotDetail(request);//3.将库存数据写入缓存inventoryBucketCache(request);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}//将库存数据写入缓存private void inventoryBucketCache(InventorBucketRequest request) {//获取中心桶库存的keyString key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());//1.先验证是否已缓存分桶元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());try {//缓存不存在,则进行初始化操作if (Objects.isNull(bucketLocalCache)) {//2.获取库存分桶的配置模板InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());//初始化分桶库存initInventoryBucket(request, inventoryBucketConfig);} else {//3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存Integer residueNum = tairCache.incr(key, request.getInventoryNum());if (residueNum < 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}//4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);//5.构建新的分桶元数据信息,并写入writeBucketCache(onlineRequest, residueNum);}} catch (Exception e) {log.error("分桶库存初始化出现失败", e);throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);} finally {inventoryBucketCache.threadLocalRemove();}}...
}

(2)库存分桶元数据计算以及分桶编号创建

//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;@Resourceprivate InventoryRepository inventoryRepository;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairCache tairCache;...//初始化分桶库存private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {//计算出分桶的元数据信息BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);//库存分桶的元数据信息入库inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.INIT.getCode(), bucketLocalCache.getAvailableList(), null);//写入中心桶的剩余库存信息log.info("中心桶剩余库存:{}", bucketLocalCache.getResidueNum());//获取中心桶剩余库存的keyString key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//设置中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量boolean setFlag = tairCache.set(key, bucketLocalCache.getResidueNum().toString(), 0);if (!setFlag) {//中心桶剩余库存写入失败,回滚事务throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}}//计算出本次库存入库的具体分桶信息private BucketLocalCache calcInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {//分桶配置模版中默认的分桶数量Integer bucketNum = inventoryBucketConfig.getBucketNum();//获取本次需要入库的库存数量Integer inventoryNum = request.getInventoryNum();//配置模版中所有分桶的最大库存容量Integer maxBucketNum = bucketNum * inventoryBucketConfig.getMaxDepthNum();//配置模版中所有分桶的最小库存容量//如果需要放入分桶的库存数量低于这个值,那么只会分配给部分分桶,此时就需要重新计算分桶Integer minBucketNum = bucketNum * inventoryBucketConfig.getMinDepthNum();//本次最多可以放入分桶的库存数量int countBucketNum = Math.min(inventoryNum, maxBucketNum);//当库存数量小于最小分桶深度*分桶数量,就需要减少分配的分桶数//此时要分配的分桶数量 bucketNum = 本次库存入库的数量 / 每个分桶的最小库存容量if (minBucketNum > countBucketNum) {bucketNum = countBucketNum / inventoryBucketConfig.getMinDepthNum();//如果库存数量不足一个分桶的最小深度,但是大于0,则上线一个分桶if (bucketNum == 0 && countBucketNum % inventoryBucketConfig.getMinDepthNum() > 0) {bucketNum++;}}//获取每个分桶分配的库存数量Integer bucketInventoryNum = countBucketNum / bucketNum;//剩余库存数量,可能为0或者大于0,补到最后一个分桶上Integer residueNum = countBucketNum - bucketInventoryNum * bucketNum;//构建缓存数据模型时,以卖家ID + 商品skuId为唯一标识String key = request.getSellerId() + request.getSkuId();//构建具体的缓存数据模型BucketLocalCache bucketLocalCache = buildBucketCache(key, bucketNum, bucketInventoryNum, residueNum, inventoryBucketConfig);//标记到具体的数据上bucketLocalCache.setSellerId(request.getSellerId());bucketLocalCache.setSkuId(request.getSkuId());bucketLocalCache.setInventoryNum(inventoryNum);//中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量bucketLocalCache.setResidueNum(inventoryNum - countBucketNum);bucketLocalCache.setInventoryBucketConfig(inventoryBucketConfig);bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode());return bucketLocalCache;}//构建缓存模型//@param bucketNum             分桶数量//@param inventoryNum          分桶分配的库存数量//@param residueNum            剩余的未分配均匀的库存//@param inventoryBucketConfig 分桶配置信息private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventoryNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {BucketLocalCache bucketLocalCache = new BucketLocalCache();//先获取得到这个模板配置的对应可分槽位的均匀桶列表List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum());List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum);List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum);//构建出多个分桶对象for (int i = 0; i < bucketNum; i++) {//生成对应的分桶编号,方便定义到具体的分桶上BucketCacheBO bucketCache = new BucketCacheBO();String bucketNo = bucketNoList.get(i);bucketCache.setBucketNo(bucketNo);//最后一个分桶,分配剩余未除尽的库存+平均库存if (i == bucketNum - 1) {bucketCache.setBucketNum(inventoryNum + residueNum);} else {bucketCache.setBucketNum(inventoryNum);}bucketCacheBOList.add(bucketCache);}//生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用if (bucketNoList.size() > bucketNum) {for (int i = bucketNum; i < bucketNoList.size(); i++) {BucketCacheBO bucketCache = new BucketCacheBO();String bucketNo = bucketNoList.get(i);bucketCache.setBucketNo(bucketNo);undistributedList.add(bucketCache);}}//生成缓存的明细keyList<String> bucketDetailKeyList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum(), "%07d");//设置分桶缓存明细的keybucketLocalCache.setBucketDetailKeyList(bucketDetailKeyList);//设置可用的分桶缓存列表bucketLocalCache.setAvailableList(bucketCacheBOList);//设置不可用或者已下线的分桶缓存列表bucketLocalCache.setUndistributedList(undistributedList);return bucketLocalCache;}...
}public class InventorBucketUtil {private static final int MAX_SIZE = 100000;//生成对应的槽位key,默认的//@param key       卖家Id+商品skuId//@param bucketNum 分桶配置数量//@return 预先保留的槽位集合public static List<String> createBucketNoList(String key, Integer bucketNum) {return createBucketNoList(key, bucketNum, "%06d");}//生成对应的槽位key,明细使用,多使用一位区分//@param key       卖家Id+商品skuId//@param bucketNum 分桶配置数量//@return 预先保留的槽位集合public static List<String> createBucketNoList(String key, Integer bucketNum, String format) {Map<Long, String> cacheKey = new HashMap<>(bucketNum);//bucketNoList用来存放每个桶对应的hashKeyList<String> bucketNoList = new ArrayList<>(bucketNum);//分配桶的编号for (int i = 1; i <= MAX_SIZE; i++) {String serialNum = String.format(format, i);//卖家ID + 商品SKU ID + 序号String hashKey = key + serialNum;//一致性哈希算法murmurlong hash = HashUtil.murMurHash(hashKey.getBytes());//对分桶数量进行取模运算long c = (hash %= bucketNum);//确保被选中的hashKey都能哈希到不同的分桶if (cacheKey.containsKey(c)) {continue;}cacheKey.put(c, hashKey);bucketNoList.add(hashKey);if (cacheKey.size() >= bucketNum) {break;}}return bucketNoList;}...
}

(3)库存分桶操作记录写DB + 中心桶库存写缓存

//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;@Resourceprivate InventoryRepository inventoryRepository;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairCache tairCache;...//初始化分桶库存private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {//计算出分桶的元数据信息BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);//库存分桶的元数据信息入库inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.INIT.getCode(), bucketLocalCache.getAvailableList(), null);//写入中心桶的剩余库存信息log.info("中心桶剩余库存:{}", bucketLocalCache.getResidueNum());//获取中心桶剩余库存的keyString key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//设置中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量boolean setFlag = tairCache.set(key, bucketLocalCache.getResidueNum().toString(), 0);if (!setFlag) {//中心桶剩余库存写入失败,回滚事务throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}}...
}@Repository
public class InventoryRepository {...//保存库存分桶的元数据信息//@param operateId         操作id//@param bucketLocalCache  变更之后的元数据信息//@param operateType       操作类型//@param bucketCacheBOList 变动的分桶列表//@param inventoryNum      变动的库存数量public void saveBucketDetail(String operateId, BucketLocalCache bucketLocalCache, Integer operateType,List<BucketCacheBO> bucketCacheBOList, Integer inventoryNum) {//变动的分桶为空,则不必要保存if (CollectionUtils.isEmpty(bucketCacheBOList)) {return;}if (!StringUtils.hasLength(operateId)) {operateId = SnowflakeIdWorker.getCode();}InventoryBucketOperateDO inventoryBucketOperateDO = InventoryBucketOperateDO.builder().operateId(operateId).sellerId(bucketLocalCache.getSellerId()).skuId(bucketLocalCache.getSkuId()).bucket(JSON.toJSONString(bucketCacheBOList)).operateType(operateType).feature(JSON.toJSONString(bucketLocalCache)).inventoryNum(inventoryNum).build();int count = inventoryBucketOperateMapper.insert(inventoryBucketOperateDO);if (count <= 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);}}...
}

(4)库存分桶元数据缓存更新使用自缓存一致性服务的DB + 消息双写方案

InventoryBucketCache.setBucketLocalCache()方法设置库存分桶元数据到本地缓存时,就用到了缓存一致性框架。也就是使用了缓存一致性服务的@CacheRefresh注解,因为库存分桶元数据属于热点数据,对实时性要求比较高。在一台机器的本地缓存了库存分桶元数据后,其他机器也应缓存该数据。

@Component
@Data
public class InventoryBucketCache {@Autowiredprivate Cache cache;@Resourceprivate TairCache tairCache;private ThreadLocal<BucketLocalCache> bucketLocalCacheThreadLocal = new ThreadLocal<>();//本地存储关于分桶信息@CacheRefresh(cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE)public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();synchronized (bucketLocalKey.intern()) {log.info("保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));BucketLocalCache bucketCache = getTairBucketCache(bucketKey);log.info("远程缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache));//如果本地缓存没有就直接写入if (Objects.isNull(bucketCache)) {setBucketCache(bucketKey, bucketLocalCache);cache.put(bucketKey, bucketLocalCache);return;}//本地缓存的元数据覆盖,考虑到是并发执行的,这里需要上内存级别的锁,并进行diff处理if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) {diffCacheOnline(bucketCache, bucketLocalCache);} else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) {diffCacheOffline(bucketCache, bucketLocalCache);}setBucketCache(bucketKey, bucketCache);cache.put(bucketKey, bucketCache);log.info("实际保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache));}}...
}//刷新缓存的自定义注解
@Aspect
@Component
public class CacheRefreshAspect {@Autowiredprivate DataRefreshProducer producer;@Autowiredprivate CacheRefreshConverter cacheRefreshConverter;@Autowiredprivate CacheQueue cacheQueue;//切入点,@CacheRefresh注解标注的@Pointcut("@annotation(com.demo.eshop.cache.annotation.CacheRefresh)")public void pointcut() {}//环绕通知,在方法执行前后//@param point 切入点//@return 结果@Around("pointcut() && @annotation(cacheRefresh)")public Object around(ProceedingJoinPoint point, CacheRefresh cacheRefresh) throws Throwable {//签名信息Signature signature = point.getSignature();//强转为方法信息MethodSignature methodSignature = (MethodSignature) signature;//参数名称String[] parameterNames = methodSignature.getParameterNames();//参数值Object[] parameterValues = point.getArgs();Object response;try {//先执行本地方法再执行异步的操作response = point.proceed();} catch (Throwable throwable) {log.error("执行方法: {}失败,异常信息: {}", methodSignature.getMethod().getName(), throwable);throw throwable;}try {MessageCache messageCache = new MessageCache();for (int i = 0; i < parameterValues.length; i++) {if (parameterNames[i].equals(cacheRefresh.cacheKey())) {messageCache.setCacheKey(String.valueOf(parameterValues[i]));}if (Integer.valueOf(cacheRefresh.index()) == i) {messageCache.setCacheJSON(JSONObject.toJSONString(parameterValues[i]));}}messageCache.setOperationType(Integer.valueOf(cacheRefresh.operationType()));//给定一个有序的版本号(默认统一的工作ID和数据中心ID)messageCache.setVersion(SnowflakeIdWorker.getVersion());messageCache.setMessageType(Integer.valueOf(cacheRefresh.messageType()));messageCache.setCacheType(Integer.valueOf(cacheRefresh.cacheType()));messageCache.setCreateDate(new Date());//将缓存数据写入读写队列//缓存数据写入读写队列后,会定时每秒批量写入数据库(缓存数据写入DB只用于兜底,所以偶尔出现丢失并不影响)DataRefreshDetailDO dataRefreshDetailDO = cacheRefreshConverter.converter(messageCache);cacheQueue.submit(dataRefreshDetailDO);//发送MQ消息去处理缓存数据,比如将缓存数据更新到缓存上//一般来说,热点缓存会比普通缓存少很多,所以普通缓存的更新会比较多,热点缓存的更新会比较少//此外,热点缓存的更新会对时效性要求比较高,通过消息去异步处理本来就已存在一定的延迟//所以这里将普通缓存和热点缓存的更新进行分开处理,减少时效性高的消息的处理延迟if (CacheConstant.MESSAGE_TYPE_HOT.equals(cacheRefresh.messageType())) {producer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(messageCache), "热点缓存消息发送");} else {producer.sendMessage(RocketMqConstant.DATA_MESSAGE_CACHE_TOPIC, JSONObject.toJSONString(messageCache), cacheRefresh.mqCacheKey(), "通用缓存消息发送");}} catch (Exception e) {log.error("处理缓存同步:{}失败,异常信息:{}", methodSignature.getMethod().getName(), e);}return response;}
}

(5)库存分桶元数据作为热点数据更新到各机器节点的本地缓存

setBucketLocalCache()方法的@CacheRefresh注解描述缓存是热点类型。该方法被执行后,会被AOP切面切入,将需要缓存的数据发送到MQ。接着MQ会对这种热点类型的消息进行广播处理,也就是每台机器都会执行CacheRefreshListener的方法。

@Configuration
public class ConsumerBeanConfig {...//刷新本地缓存@Bean("cacheRefresTopic")public DefaultMQPushConsumer cacheRefresTopic(CacheRefreshListener cacheRefreshListener) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REFRESH_CACHE_GROUP);//设置为广播模式consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setNamesrvAddr(rocketMQProperties.getNameServer());consumer.subscribe(RocketMqConstant.DATA_HOT_RADIO_TOPIC, "*");consumer.registerMessageListener(cacheRefreshListener);consumer.start();return consumer;}...
}//刷新分布式节点的本地缓存
@Component
public class CacheRefreshListener implements MessageListenerConcurrently {//本地缓存@Autowiredprivate Cache cache;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("刷新本地缓存,消息内容:{},消费时间:{}", msg, DateFormatUtil.formatDateTime(new Date()));MessageCache messageCache = JsonUtil.json2Object(msg, MessageCache.class);BucketLocalCache bucketLocalCache = JsonUtil.json2Object(messageCache.getCacheJSON(), BucketLocalCache.class);synchronized (messageCache.getCacheKey().intern()) {String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();//获取远程缓存的分桶元数据信息BucketLocalCache bucketCache = inventoryBucketCache.getTairBucketCache(bucketLocalKey);if (Objects.isNull(bucketCache)) {cache.put(messageCache.getCacheKey(), JsonUtil.json2Object(messageCache.getCacheJSON(), BucketLocalCache.class));log.info("更新本地缓存,本次更新内容:{},更新时间:{}", messageCache.getCacheJSON(), DateFormatUtil.formatDateTime(new Date()));} else {//进行diff数据处理if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) {inventoryBucketCache.diffCacheOnline(bucketCache, bucketLocalCache);} else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) {inventoryBucketCache.diffCacheOffline(bucketCache, bucketLocalCache);}cache.put(messageCache.getCacheKey(), bucketCache);log.info("更新本地缓存,本次更新内容:{},更新时间:{}", JsonUtil.object2Json(bucketCache), DateFormatUtil.formatDateTime(new Date()));}}}} catch (Exception e) {//本地缓存只有参数转换会出错,这种错误重试也没什么作用log.error("consume error, 刷新本地缓存失败", e);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

3.库存分桶上线改造

当向库存分桶增加库存时,会调用分桶上线接⼝,也就是会调⽤InventoryBucketServiceImpl的writeBucketCache()⽅法,writeBucketCache()⽅法会实现具体的分桶上线任务。

InventoryBucketServiceImpl的bucketOnline()方法,适⽤场景是在商品库存⼊桶时,分桶上线中存在上线失败的分桶。此时运营⼈员就可以通过bucketOnline()方法⼿动执⾏分桶的上线。从而防⽌上线的分桶过少,承担的并发压⼒⼤。

其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。

也是先把分桶元数据写⼊到数据库中,然后再操作中⼼桶缓存数据。数据库写⼊成功和缓存写⼊成功,则本次操作成功。数据库写⼊失败或者缓存写⼊失败,都会回滚数据库,本次操作失败。

//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;@Resourceprivate InventoryRepository inventoryRepository;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairCache tairCache;...//商品库存入桶分配@Override@Transactional(rollbackFor = Exception.class)public void inventorBucket(InventorBucketRequest request) {//1.验证入参必填项checkInventorParams(request);//锁key = 卖家ID + SKU的IDString key = buildBucketLockKey(request.getSellerId(), request.getSkuId());String value = SnowflakeIdWorker.getCode();//注意这里需要锁定中心桶库存boolean lock = tairLock.tryLock(key, value);//分配库存时,这个卖家的sku是不允许其他相关操作的if (lock) {try {//2.插入库存入库的记录信息//由于申请的库存业务编号是一个唯一key,所以可以避免重复请求//也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次inventoryRepository.saveInventoryAllotDetail(request);//3.将库存数据写入缓存inventoryBucketCache(request);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}//将库存数据写入缓存private void inventoryBucketCache(InventorBucketRequest request) {//获取中心桶库存的keyString key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());//1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());try {//缓存不存在,则进行初始化操作if (Objects.isNull(bucketLocalCache)) {//2.获取库存分桶的配置模板InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());//初始化分桶库存initInventoryBucket(request, inventoryBucketConfig);} else {//3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存Integer residueNum = tairCache.incr(key, request.getInventoryNum());if (residueNum < 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}//4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);//5.构建新的分桶元数据信息,并写入writeBucketCache(onlineRequest, residueNum);}} catch (Exception e) {log.error("分桶库存初始化出现失败", e);throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);} finally {inventoryBucketCache.threadLocalRemove();}}//分桶上线接口@Override@Transactional(rollbackFor = Exception.class)public void bucketOnline(InventorOnlineRequest request) {//1.验证入参必填checkInventorOnlineParams(request);//2.注意这里需要锁定中心桶库存String key = buildBucketLockKey(request.getSellerId(), request.getSkuId());String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.获取中心桶的库存,并校验是否可上线分桶Integer residueNum = checkBucketOnlineNum(key);//4.构建新的分桶元数据信息,并写入writeBucketCache(request, residueNum);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}...//构建新的分桶元数据信息//@param request    分桶上线对象//@param residueNum 中心桶剩余库存private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) {String key = request.getSellerId() + request.getSkuId();//获取到本地的缓存列表BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);try {if (!Objects.isNull(bucketLocalCache)) {//获取当前可上线的分桶列表信息List<BucketCacheBO> bucketCacheBOList = buildBucketList(request.getBucketNoList(), bucketLocalCache.getAvailableList(), bucketLocalCache.getUndistributedList(), bucketLocalCache.getInventoryBucketConfig(), residueNum);//当前可上线的分桶为空,直接返回if (CollectionUtils.isEmpty(bucketCacheBOList)) {return;}//中心桶被扣减掉的库存(上线的分桶库存总和)Integer descInventoryNum = bucketCacheBOList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();//构建返回新的元数据模型返回buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum - descInventoryNum);//分桶信息入库inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.ONLINE.getCode(), bucketCacheBOList, descInventoryNum);//扣减中心桶剩余库存,如果扣减失败了,直接抛异常Integer decr = tairCache.decr(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()), descInventoryNum);if (decr < 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}}} catch (Exception e) {log.error("分桶构建初始化失败", e);throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);} finally {inventoryBucketCache.threadLocalRemove();}}//获取可上线的分桶列表信息以及具体上线库存//@param bucketNoList            上线分桶编号列表//@param availableList           上线正在使用的分桶编号列表//@param undistributedList       下线或者未使用的分桶编号列表//@param inventoryBucketConfigDO 当前分桶的配置模板信息//@param residueNum              中心桶的剩余可分配库存//@return 可上线的分桶列表以及具体分桶库存private List<BucketCacheBO> buildBucketList(List<String> bucketNoList, List<BucketCacheBO> availableList, List<BucketCacheBO> undistributedList, InventoryBucketConfigDO inventoryBucketConfigDO, Integer residueNum) {//1.如果入参选择了上线的分桶编号列表,则从缓存中配置的未使用分桶列表进行比对处理List<String> bucketCacheList = null;if (!CollectionUtils.isEmpty(bucketNoList)) {Map<String, BucketCacheBO> bucketCacheMap = undistributedList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));//过滤返回可用的分桶编号bucketCacheList = bucketNoList.stream().filter(bucketNo -> bucketCacheMap.containsKey(bucketNo)).collect(Collectors.toList());} else {//直接返回下线的不可用分桶列表bucketCacheList = undistributedList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());}//可上线的分桶列表为空if (CollectionUtils.isEmpty(bucketCacheList)) {return Lists.newArrayList();}//2.根据中心桶的可分配库存,处理返回具体上线的分桶配置信息return calcOnlineBucket(availableList, bucketCacheList, residueNum, inventoryBucketConfigDO);}//构建新的元数据模型//@param bucketLocalCache  本地分桶元数据信息//@param bucketCacheBOList 上线的分桶列表//@param residueNum        中心桶剩余库存private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) {//填充中心桶剩余库存bucketLocalCache.setResidueNum(residueNum);//添加新上线的分桶列表bucketLocalCache.getAvailableList().addAll(bucketCacheBOList);Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO ->//在上线的分桶列表,需要移除掉!bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList());//从不可用的分桶列表重移除bucketLocalCache.setUndistributedList(undistributedList);bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode());}...
}@Repository
public class InventoryRepository {...//保存库存分桶的元数据信息//@param operateId         操作id//@param bucketLocalCache  变更之后的元数据信息//@param operateType       操作类型//@param bucketCacheBOList 变动的分桶列表//@param inventoryNum      变动的库存数量public void saveBucketDetail(String operateId, BucketLocalCache bucketLocalCache, Integer operateType,List<BucketCacheBO> bucketCacheBOList, Integer inventoryNum) {//变动的分桶为空,则不必要保存if (CollectionUtils.isEmpty(bucketCacheBOList)) {return;}if (!StringUtils.hasLength(operateId)) {operateId = SnowflakeIdWorker.getCode();}InventoryBucketOperateDO inventoryBucketOperateDO = InventoryBucketOperateDO.builder().operateId(operateId).sellerId(bucketLocalCache.getSellerId()).skuId(bucketLocalCache.getSkuId()).bucket(JSON.toJSONString(bucketCacheBOList)).operateType(operateType).feature(JSON.toJSONString(bucketLocalCache)).inventoryNum(inventoryNum).build();int count = inventoryBucketOperateMapper.insert(inventoryBucketOperateDO);if (count <= 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);}}...
}

4.库存分桶扩容改造

在库存扣减接⼝执⾏完后,会检查分桶库存是否需要扩容。当分桶剩余库存⼩于分桶回源数量时,会执行扩容接⼝,即调⽤InventoryBucketServiceImpl的writeBucketCache()⽅法。其中,分桶回源数量是通过库存分桶配置模板表中的回源⽐例计算出的。

其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。

首先会判断当前要扩容的分桶是否已有存在的并且扩容失败的标识。为了避免⼀个分桶多次扩容失败频繁锁定中⼼桶的库存,所以当失败次数超过两次之后,剩下的扩容请求就直接拒绝掉。等待扩容的分桶后续流程执⾏完成后,再将失败的次数减少,从而防⽌后续的扩容请求⽆法正常处理。

因为扩容对实时性要求⽐较⾼,所以在处理扩容请求时,会直接尝试执⾏扩容流程。也就是首先尝试扣减中⼼桶,如果中⼼桶因为库存不足扣减失败,那么就直接返回。如果中⼼桶扣减成功,那么才继续执⾏后续分桶扩容操作。

只有在扩容失败时,才会将失败的扩容存储在数据库中,等待后续定时任务扫描兜底处理。

//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;@Resourceprivate InventoryRepository inventoryRepository;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairCache tairCache;...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {long startTime = System.currentTimeMillis();//获取中心桶的剩余库存Integer residueNum = getCenterStock(bucketCapacity);if (residueNum <= 0) {//中心桶无剩余库存,检查是否触发下线checkBucketOffline(bucketCapacity);return;}//判断本次扩容的分桶,是否有多次扩容失败的情况String failNum = tairCache.get(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCapacity.getBucketNo());if (StringUtils.isNotBlank(failNum) && Integer.parseInt(failNum) >= 2) {//当前分桶扩容失败次数超过两次了,直接放弃这次扩容//因为失败太多并且还继续去尝试,会持续的扣减中心桶库存,可能会导致其他可以正常扩容的分桶,没有中心桶库存可以扣减return;}//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//先锁住中心桶库存,避免此时库存发生变化String key = buildBucketLockKey(bucketCapacity.getSellerId(), bucketCapacity.getSkuId());String value = SnowflakeIdWorker.getCode();//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶库存的库存residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);log.info(bucketCapacity.getBucketNo() + "处理扩容消耗时间{}", System.currentTimeMillis() - startTime);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}//回源库存到分桶上//@param residueNum            中心桶剩余库存//@param bucketCapacityContext 扩容上下文对象private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();//先获取本地的分桶元数据信息,获取当前分桶的总发放上限String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();//中心桶库存keyString sellerInventoryKey = buildSellerInventoryKey(bucketCapacity.getSellerId(), bucketCapacity.getSkuId());BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);String bucketNo = "";try {InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();Integer inventoryNum = 0;//获取实际配置的最大可用库存深度Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();BucketCacheBO bucketCache = null;for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {bucketCache = bucketCacheBO;bucketNo = bucketCache.getBucketNo();break;}}//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程if (Objects.isNull(bucketCache)) {return;}//中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存if (residueNum > maxBucketNum) {inventoryNum = inventoryBucketConfig.getBackSourceStep();} else {inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);}//获取扩容后的预估库存深度Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);//更新分桶元数据相关信息refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);//扣减中心桶库存Integer decr = tairCache.decr(sellerInventoryKey, inventoryNum);if (decr < 0) {//中心桶扣减失败,直接返回throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}//回源分桶的库存Integer incr = tairCache.retryIncr(bucketCache.getBucketNo(), inventoryNum, 3);if (incr < 0) {//这里扣减中心桶成功了,但是回源分桶库存失败了,记录失败的操作,重试,如果重试失败了,则库存返回中心桶//本次操作的唯一idString operateId = SnowflakeIdWorker.getCode();//分桶元数据信息入库inventoryRepository.saveBucketDetail(operateId, bucketLocalCache, BucketOperateEnum.CAPACITY.getCode(), Lists.newArrayList(bucketCache), inventoryNum);//考虑到这里同一个分桶,可能会有多个任务没处理完,value值需要用对象来存储inventoryRepository.saveFailOperate(operateId, TairInventoryConstant.OPERATE_INCR_FAIL, bucketCache.getBucketNo(), inventoryNum);tairCache.appendJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCache.getBucketNo(), operateId, inventoryNum);throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);}log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCache.getBucketNo(), inventoryNum, incr, decr);} catch (Exception e) {//增加分桶扩容失败次数,如果次数超过两次了,则不会尝试更新缓存,直接写DB,让DB去操作//DB操作成功,会减少失败次数tairCache.incr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketNo);throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);} finally {inventoryBucketCache.threadLocalRemove();}}//刷新分桶元数据缓存//@param maxDepthNum      分桶最大库存深度//@param bucketLocalCache 分桶元数据信息//@param bucketNo         分桶编号private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) {List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketNo)) {//每次库存具体深度变化都要更细,否则很容易触发回源的比例bucketCacheBO.setBucketNum(maxDepthNum);bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum()));break;}}String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//刷新本地缓存inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);}...
}

5.库存分桶下线改造

在库存分桶扩容时,如果发现中心桶库存为0,那么会进行下线检查。如果分桶的剩余库存达到下线阈值,那么就会调⽤InventoryBucketServiceImpl的bucketOffline()⽅法进行分桶下线。

其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。

//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;@Resourceprivate InventoryRepository inventoryRepository;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairCache tairCache;...//分桶下线接口@Overridepublic void bucketOffline(InventorOfflineRequest request) {//1.验证入参必填checkInventorOfflineParams(request);//过滤只有一个分桶的无效请求Boolean isOffline = checkBucketOffline(request);if (isOffline) {long start = System.currentTimeMillis();//2.注意这里需要锁定 下线分桶的变更,这个接口默认一次只有一个分桶String key = buildBucketOfflineLockKey(request.getSellerId(), request.getSkuId(), request.getBucketNoList().get(0));String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来updateBucketCache(request);log.info("分桶下线处理时间,下线分桶:{}, 当前时间:{}, 耗时:{}", JSON.toJSONString(request.getBucketNoList()), DateFormatUtil.formatDateTime(new Date()), System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}}//移除本地分桶的对应分桶列表以及远程的分桶列表//@param request 下线的请求参数列表private void updateBucketCache(InventorOfflineRequest request) {//下线的分桶列表List<String> bucketCacheList = request.getBucketNoList();String key = buildBucketCacheKey(request.getSellerId(), request.getSkuId());//1.获取到本地的缓存列表BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());try {//2.填充下线的分桶到不可用列表中List<BucketCacheBO> offlineBucket = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList());if (CollectionUtils.isEmpty(offlineBucket)) {//如果下线的分桶过滤后,没有要下线的了,直接返回return;}boolean isOfflineBucket = true;for (BucketCacheBO bucketCacheBO : offlineBucket) {if (!StringUtils.isEmpty(bucketCacheBO.getBucketNo())) {bucketLocalCache.getUndistributedList().add(bucketCacheBO);isOfflineBucket = false;}}//有合法的下线分桶,才操作if (isOfflineBucket) {return;}//过滤返回,还上线的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> !bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList());bucketLocalCache.setAvailableList(availableList);//可用分桶列表大于等于1的时候,才允许分桶下线if (availableList.size() >= 1) {//元数据缓存更新,这里会切面处理,为避免出现延迟,先操作远程缓存的元数据覆盖bucketLocalCache.setOperationType(BucketStatusEnum.OFFLINE_STATUS.getCode());//发一个消息进行本地缓存的更新,同时切面会发一个消息更新缓存,本地缓存的更新涉及版本号inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);//分桶元数据信息入库,待跑批任务执行inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.OFFLINE.getCode(), offlineBucket, null);}} catch (Exception e) {log.error("分桶下线出现失败", e);throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE);} finally {inventoryBucketCache.threadLocalRemove();}}...
}

6.执行库存分桶缓存操作的定时任务

(1)定时任务的入口

(2)定时任务的整体实现流程

(1)定时任务的入口

在库存入桶分配、分桶上线、分桶扩容、分桶下线四个接口中,都只更新了中⼼桶缓存,而分桶的缓存数据还没处理。

由于这四个接口都会将变更的分桶元数据存储到库存分桶操作表里,所以可以启动定时任务扫描库存分桶操作表中数据,更新分桶的缓存数据。

@Component
public class BucketOperateJobHandler {@DubboReference(version = "1.0.0")private InventoryServiceApi inventoryService;@XxlJob("processBucketOperate")public void processBucketOperate() {XxlJobHelper.log("process bucket operate job starting...");JsonResult result = inventoryService.processBucketOperate();XxlJobHelper.log("process bucket operate job end, result: {}", result);}...
}

(2)定时任务的整体实现流程

⾸先分⻚查询库存分桶操作表中的记录,然后将记录添加到操作队列中。程序启动时会创建⼀个线程池,并开启多个线程,分别去处理操作队列。

在OperateRunner线程中,会不断扫描对应操作队列的数据。只要有数据添加到操作队列中,OperateRunner线程就会去处理操作队列。

当OperateRunner线程获取到操作队列中的分桶操作数据后,⾸先会尝试将该分桶操作的状态设置为处理中。如果设置失败,表示有其他线程已在处理该分桶操作了,不需要继续处理。如果设置成功,则通过策略模式,根据不同的操作类型,执⾏对应的操作。接着设置本次分桶操作的状态为执⾏完成。

当操作队列中的所有分桶操作都执⾏完后,会将其数据状态修改为已完成。然后定时任务会通过while循环间断轮训查出来的所有分桶操作是否已处理。如果是,就会批量修改分桶操作记录的状态,释放锁,结束本次任务。

@Component
public class BucketOperateJobHandler {@DubboReference(version = "1.0.0")private InventoryServiceApi inventoryService;@XxlJob("processBucketOperate")public void processBucketOperate() {XxlJobHelper.log("process bucket operate job starting...");JsonResult result = inventoryService.processBucketOperate();XxlJobHelper.log("process bucket operate job end, result: {}", result);}...
}//库存服务
@DubboService(version = "1.0.0", interfaceClass = InventoryServiceApi.class, retries = 0)
public class InventoryServiceApiImpl implements InventoryServiceApi {@Resourceprivate InventoryBucketService inventoryBucketService;...//执行分桶操作@Overridepublic JsonResult processBucketOperate() {try {inventoryBucketService.processBucketOperate();return JsonResult.buildSuccess();} catch (ProductBizException e) {log.error("biz error", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error", e);return JsonResult.buildError(e.getMessage());}}
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate OperateQueue operateQueue;@Resourceprivate InventoryRepository inventoryRepository;...//执行分桶操作@Overridepublic JsonResult processBucketOperate() {String code = SnowflakeIdWorker.getCode();boolean lock = tairLock.lock(TairInventoryConstant.BUCKET_OPERATE_PROCESS, code);if (lock) {try {int page = 1;//分页获取要执行的操作列表List<InventoryBucketOperateBO> inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.UN_PROCESS.getCode(), page, operateQueueNum);while (!CollectionUtils.isEmpty(inventoryBucketOperateBOS)) {//当前有多少个分桶操作,添加至缓存中inventoryBucketCache.incrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT, inventoryBucketOperateBOS.size());//遍历每个分桶操作,添加到操作队列中for (InventoryBucketOperateBO inventoryBucketOperateBO : inventoryBucketOperateBOS) {operateQueue.offerByRoundRobin(inventoryBucketOperateBO);}//分页获取下一页要执行的分桶操作列表inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.UN_PROCESS.getCode(), ++page, operateQueueNum);}//等待本次查询出的所有分桶操作执行完成,这里不能按照队列的大小来判断//因为可能操作队列中还有最后一个分桶操作没执行,刚好取出这个分桶操作后还没执行,此时队列大小也为0while (inventoryBucketCache.getOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT) != 0) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}//所有分桶操作都执行过之后,获取idList<Long> ids = inventoryBucketCache.getOperateId(TairInventoryConstant.BUCKET_OPERATE_PROCESS);if (!CollectionUtils.isEmpty(ids)) {log.info("修改分桶操作记录的状态为已完成:{}", JSON.toJSONString(ids));inventoryRepository.bucketOperateSuccess(ids, BucketOperateStatusEnum.FINISH.getCode());}} finally {inventoryBucketCache.removeOperateId(TairInventoryConstant.BUCKET_OPERATE_PROCESS);tairLock.unlock(TairInventoryConstant.BUCKET_OPERATE_PROCESS, code);}}return JsonResult.buildSuccess();}...
}@Component
public class OperateQueue {//处理分桶操作的操作队列private final List<BlockingQueue> operateQueue = new ArrayList<>();//配置的操作队列数量@Value("${bucket.operate.queue-num:32}")private Integer operateQueueNum;//处理下一个分桶操作的操作队列在队列列表中的下标private AtomicInteger index = new AtomicInteger();@Resourceprivate TairCache tairCache;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate BucketOperateStrategyFactory operateFactory;@PostConstructpublic void init() {ExecutorService executors = Executors.newFixedThreadPool(operateQueueNum);for (int i = 0; i < operateQueueNum; i++) {//设置操作队列的最大容纳数量BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);operateQueue.add(blockingQueue);executors.execute(new OperateRunner(blockingQueue, tairCache, inventoryBucketCache, operateFactory));}}//轮询获取一个操作队列public boolean offerByRoundRobin(Object object) {index.compareAndSet(operateQueueNum * 10000, 0);boolean offer = operateQueue.get(index.getAndIncrement() % operateQueueNum).offer(object);return offer;}
}//多线程消费操作队列中的数据
public class OperateRunner implements Runnable {//处理分桶操作的队列private BlockingQueue blockingQueue;private TairCache tairCache;private InventoryBucketCache inventoryBucketCache;private BucketOperateStrategyFactory operateFactory;public OperateRunner(BlockingQueue blockingQueue, TairCache tairCache, InventoryBucketCache inventoryBucketCache, BucketOperateStrategyFactory operateFactory) {this.blockingQueue = blockingQueue;this.tairCache = tairCache;this.inventoryBucketCache = inventoryBucketCache;this.operateFactory = operateFactory;}//内部线程处理每个操作队列中的数据@Overridepublic void run() {while (true) {if (CollectionUtils.isEmpty(blockingQueue)) {try {Thread.sleep(500);continue;} catch (InterruptedException e) {e.printStackTrace();}}InventoryBucketOperateBO operate = null;try {operate = (InventoryBucketOperateBO) blockingQueue.take();} catch (InterruptedException e) {log.error("获取分桶任务异常", e);}if (operate == null) {continue;}try {//尝试将分桶操作设置成处理中,成功了则继续处理,失败了表示已经处理了,则直接跳过if (!tairCache.setNx(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId(), BucketOperateStatusEnum.PROCESS.getCodeString(), 0)) {//分桶操作不需要执行,减少分桶操作的总数量inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT);continue;}BucketOperateEnum operateEnum = BucketOperateEnum.getByCode(operate.getOperateType());if (operateEnum != null) {//通过策略模式去处理分桶操作operateFactory.getStrategy(operateEnum.getService()).process(operate);}//处理完成,则将分桶操作的状态改为成功tairCache.set(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId(), BucketOperateStatusEnum.FINISH.getCodeString(), 0);//把成功的分桶操作记录的id放到缓存中inventoryBucketCache.addOperateId(TairInventoryConstant.BUCKET_OPERATE_PROCESS, operate.getId());//任务执行后,减少分桶操作的总数量inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT);} catch (Exception e) {//处理分桶操作异常,将分桶操作的处理状态删除,不然后续再处理,setNx设置状态为处理中时会失败tairCache.delete(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId());//分桶操作执行失败,也减少分桶操作的总数量inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT);log.error("处理分桶操作异常", e);}}}
}

7.分桶操作之初始化分配库存的处理策略

初始化时,⼀般会上线多个分桶,此时可以通过线程池来处理。通过向线程池提交线程来操作缓存,也就是设置上线分桶的库存数量。

如果缓存操作失败,则需要做回滚操作。即在缓存中记录失败的操作,并把失败的操作写⼊库存操作失败记录表中,等待清除执⾏成功的分桶操作的定时任务来处理回滚操作。

如果有设置分桶库存失败的分桶,则需要从上线分桶列表中删除,防⽌上线失败的分桶被用来扣减库存。

当所有分桶库存设置流程执行完后,就可以将元数据信息写⼊,供库存扣减接⼝使⽤。

//分桶操作路由工厂
@Component
public class BucketOperateStrategyFactory {@Autowiredprivate Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16);//获取分桶操作处理策略的路由public AbstractOperateStrategy getStrategy(String operate) {return operateMap.get(operate);}
}//分桶操作的处理策略抽象类
public abstract class AbstractOperateStrategy {//处理分桶操作public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO);
}//分桶初始化操作的处理策略
@Service("initOperateProcess")
public class InitOperateProcessService extends AbstractOperateStrategy {@Resourceprivate InventoryBucketService inventoryBucketService;@Overridepublic void process(InventoryBucketOperateBO inventoryBucketOperateBO) {inventoryBucketService.processInitBucket(inventoryBucketOperateBO);}
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//将初始化的分桶数据写入远程缓存以及本地缓存@Overridepublic void processInitBucket(InventoryBucketOperateBO inventoryBucketOperateBO) {//从分桶操作记录中解析出分桶元数据信息BucketLocalCache bucketLocalCache = JSON.parseObject(inventoryBucketOperateBO.getFeature(), BucketLocalCache.class);log.info("执行库存初始化{}", JSONObject.toJSONString(inventoryBucketOperateBO));//1.获取卖家ID + 商品skuId标识String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//2.写入数据到对应的缓存上List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();CountDownLatch latch = new CountDownLatch(availableList.size());List<String> failBucketNos = Collections.synchronizedList(new ArrayList<>());for (BucketCacheBO bucketCacheBO : availableList) {executors.execute(() -> {log.info("bucketNo:{}, inventoryNum:{}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());boolean setFlag = tairCache.retrySet(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0, 3);log.info("set bucket inventory, bucketNo:{}, inventoryNum:{}, successful:{}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum(), setFlag);if (!setFlag) {failBucketNos.add(bucketCacheBO.getBucketNo());//记录失败,后续处理tairCache.appendJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), inventoryBucketOperateBO.getOperateId(), bucketCacheBO.getBucketNum());inventoryRepository.saveFailOperate(inventoryBucketOperateBO.getOperateId(), TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());}latch.countDown();});}try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}if (!CollectionUtils.isEmpty(failBucketNos)) {bucketLocalCache.getUndistributedList().addAll(failBucketNos.stream().map(BucketCacheBO::new).collect(Collectors.toList()));availableList = availableList.stream().filter(bucketCacheBO -> failBucketNos.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList());bucketLocalCache.setAvailableList(availableList);}//3.记录存储到本地缓存列表bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode());inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);log.info("元数据信息:{}", JSONObject.toJSONString(bucketLocalCache));}...
}

8.分桶操作之增加库存与分桶上线的处理策略

//分桶操作路由工厂
@Component
public class BucketOperateStrategyFactory {@Autowiredprivate Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16);//获取分桶操作处理策略的路由public AbstractOperateStrategy getStrategy(String operate) {return operateMap.get(operate);}
}//分桶操作的处理策略抽象类
public abstract class AbstractOperateStrategy {//处理分桶操作public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO);
}//分桶上线操作的处理策略
@Service("onlineOperateProcess")
public class OnlineOperateProcessService extends AbstractOperateStrategy {@Resourceprivate InventoryBucketService inventoryBucketService;@Overridepublic void process(InventoryBucketOperateBO inventoryBucketOperateBO) {inventoryBucketService.processOnlineOperate(inventoryBucketOperateBO);}
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶上线数据写入远程缓存与本地缓存@Overridepublic void processOnlineOperate(InventoryBucketOperateBO inventoryBucketOperateBO) {log.info("执行分桶上线{}", JSONObject.toJSONString(inventoryBucketOperateBO));//从分桶操作记录中解析出分桶元数据信息BucketLocalCache bucketLocalCache = JSON.parseObject(inventoryBucketOperateBO.getFeature(), BucketLocalCache.class);//上线的分桶信息List<BucketCacheBO> bucketCacheBOList = JSON.parseArray(inventoryBucketOperateBO.getBucket(), BucketCacheBO.class);//本次操作的idString operateId = inventoryBucketOperateBO.getOperateId();String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//上线失败的列表List<String> failBucketNos = Collections.synchronizedList(new ArrayList<>());CountDownLatch latch = new CountDownLatch(bucketCacheBOList.size());//1.先更新分桶的上线缓存处理操作for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {executors.execute(() -> {boolean setFlag = tairCache.retrySet(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0, 3);if (!setFlag) {failBucketNos.add(bucketCacheBO.getBucketNo());//记录失败,后续处理inventoryRepository.saveFailOperate(operateId, TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());tairCache.appendJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), operateId, bucketCacheBO.getBucketNum());}latch.countDown();});}try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}if (!CollectionUtils.isEmpty(failBucketNos)) {bucketLocalCache.getUndistributedList().addAll(failBucketNos.stream().map(BucketCacheBO::new).collect(Collectors.toList()));bucketLocalCache.setAvailableList(bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> failBucketNos.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList()));}//2.处理分桶列表的更新,待中心桶库存以及上线分桶库存更新完成,更新远程和本地的分桶列表bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode());inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);}...
}

9.分桶操作之分桶扩容的处理策略

尝试执⾏缓存库存增加操作。如果执⾏成功, 则从失败操作记录中移除,并将失败次数减⼀。如果执⾏失败,不做处理,因为会将这次任务设置为执⾏完成,后续清除执⾏成功的分桶操作的定时任务会处理回滚操作。

//分桶操作路由工厂
@Component
public class BucketOperateStrategyFactory {@Autowiredprivate Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16);//获取分桶操作处理策略的路由public AbstractOperateStrategy getStrategy(String operate) {return operateMap.get(operate);}
}//分桶操作的处理策略抽象类
public abstract class AbstractOperateStrategy {//处理分桶操作public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO);
}//分桶扩容操作的处理策略
@Service("capacityOperateProcess")
public class CapacityOperateProcessService extends AbstractOperateStrategy {@Resourceprivate InventoryBucketService inventoryBucketService;@Overridepublic void process(InventoryBucketOperateBO inventoryBucketOperateBO) {inventoryBucketService.processCapacityOperate(inventoryBucketOperateBO);}
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//处理扩容的分桶操作@Overridepublic void processCapacityOperate(InventoryBucketOperateBO operate) {Long startTime = System.currentTimeMillis();List<BucketCacheBO> bucketCacheBOList = JSON.parseArray(operate.getBucket(), BucketCacheBO.class);//回源分桶的库存String bucketNo = bucketCacheBOList.get(0).getBucketNo();Integer incr = tairCache.retryIncr(bucketNo, operate.getInventoryNum(), 3);if (incr >= 0) {//执行成功,从失败列表中移除,移除数据库inventoryRepository.deleteBucketOperate(operate.getId());//移除缓存tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketNo, operate.getOperateId());//回源成功后将失败次数减一tairCache.decr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCacheBOList.get(0).getBucketNo(), 1);}log.info("执行扩容操作{},本次耗时{},执行结果{}", JSONObject.toJSONString(operate), System.currentTimeMillis() - startTime, incr >= 0 ? true : false);}...
}

10.分桶操作之分桶下线的处理策略

这⾥只是发送⼀条清空分桶的延迟消息,等待后续延迟处理分桶的下线。将下线分桶中的剩余库存清空,并放回中⼼桶中。

//分桶操作路由工厂
@Component
public class BucketOperateStrategyFactory {@Autowiredprivate Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16);//获取分桶操作处理策略的路由public AbstractOperateStrategy getStrategy(String operate) {return operateMap.get(operate);}
}//分桶操作的处理策略抽象类
public abstract class AbstractOperateStrategy {//处理分桶操作public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO);
}//分桶下线操作的处理策略
@Service("offlineOperateProcess")
public class OfflineOperateProcessService extends AbstractOperateStrategy {@Resourceprivate InventoryBucketService inventoryBucketService;@Overridepublic void process(InventoryBucketOperateBO inventoryBucketOperateBO) {inventoryBucketService.processOfflineOperate(inventoryBucketOperateBO);}
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//处理下线的分桶操作@Overridepublic void processOfflineOperate(InventoryBucketOperateBO inventoryBucketOperateBO) {log.info("执行分桶下线{}", JSONObject.toJSONString(inventoryBucketOperateBO));log.info("发送分桶清空的消息:{}", inventoryBucketOperateBO.getBucket());BucketLocalCache bucketLocalCache = JSON.parseObject(inventoryBucketOperateBO.getFeature(), BucketLocalCache.class);List<BucketCacheBO> bucketCacheBOList = JSON.parseArray(inventoryBucketOperateBO.getBucket(), BucketCacheBO.class);List<String> bucketCacheList = bucketCacheBOList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());BucketClearRequest bucketClearRequest = new BucketClearRequest(bucketLocalCache.getSkuId(), bucketLocalCache.getSellerId(), bucketCacheList, 0);for (int i = 0; i < 3; i++) {try {//发送清空下线分桶库存的消息,默认这里不存在需要处理的中心桶库存bucketClearProducer.sendBucketClear(bucketClearRequest);//发送成功,直接返回,发送失败,继续处理return;} catch (Exception e) {//这里消息发送失败了,再次尝试发送,失败三次的话,就抛出异常}}//到这里就是消息多次发送失败,需要做其他的处理throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED);}...
}//清空分桶库存的消息队列
@Component
public class BucketClearProducer {@Autowiredprivate DefaultProducer defaultProducer;//清空分桶库存的消息 MQ生产public void sendBucketClear(BucketClearRequest bucketClearRequest) {//发送清空分桶库存消息,延迟1秒,留给更多的时间给正在扣减该分桶的线程处理//分布式本地缓存的消息覆盖通知有一定延迟性,为避免库存数据的错误必须保证分桶已下线才能准确扣减库存//实际清空库存的任务,也做了对应重试//获取当前下线分桶库存和回退如果不一致的下线分桶请求,会重新发延迟消息等待下次消费重试defaultProducer.sendMessage(RocketMqConstant.BUCKET_CLEAR_TOPIC, JSONObject.toJSONString(bucketClearRequest), 1, "清空分桶");}
}//处理清空分桶库存的消息
@Component
public class BucketClearListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class);log.info("执行分桶下线清空库存,消息内容:{}", bucketClearRequest.getBucketNoList());inventoryBucketService.bucketClear(bucketClearRequest);}} catch (Exception e) {log.error("consume error, 清空分桶库存失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}//库存分桶业务实现类
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//清空分桶库存,分桶库存放回中央库存@Overridepublic void bucketClear(BucketClearRequest request) {long start = System.currentTimeMillis();String key = buildBucketCacheKey(request.getSellerId(), request.getSkuId());String bucketCache = inventoryBucketCache.getBucketCache(key);if (!StringUtils.isEmpty(bucketCache)) {//引用缓存组件需要通过通用对象进行对应的缓存获取BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);updateBucketInventory(request, bucketLocalCache);}log.info("清空下线分桶库存:{},时间:{}", JSON.toJSONString(request.getBucketNoList()), System.currentTimeMillis() - start);//商品库存值预警warningProductInventory(bucketCache);}//将分桶的缓存库存返回给中心桶库存上private void updateBucketInventory(BucketClearRequest request, BucketLocalCache bucketLocalCache) {//中心桶的库存keyString key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//中心桶需要回源的库存,默认为0Integer inventoryNum = request.getInventoryNum();//准备操作下线的分桶List<String> bucketCacheList = request.getBucketNoList();//下线的分桶列表List<String> undistributedList = bucketLocalCache.getUndistributedList().stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());//只处理已经下线的分桶bucketCacheList = bucketCacheList.stream().filter(undistributedList::contains).collect(Collectors.toList());//当分桶状态不是已下线的状态,验证这个分桶是否需要处理为下线状态if (CollectionUtils.isEmpty(bucketCacheList)) {BucketCapacity bucketCapacity = inventoryConverter.converter(request);for (String bucketNo : request.getBucketNoList()) {bucketCapacity.setBucketNo(bucketNo);//再次校验分桶是否触发下线checkBucketOffline(bucketCapacity);}//不处理分桶库存回退,但是如果有中心桶库存需要重试,这里进行重试bucketCacheList = new ArrayList<>();}//标记处理过程中失败的数据,如果是缓存没有库存这种是不会加入List<String> failureBucketCacheList = new ArrayList<>();for (String bucketNo : bucketCacheList) {//先获取下线的分桶实际剩余库存String bucketNum = tairCache.get(bucketNo);//当分桶的库存大于0的时候才处理if (!StringUtils.isEmpty(bucketNum) && Integer.valueOf(bucketNum) > 0) {//清理下线的分桶库存,设置为0Integer result = tairCache.decr(bucketNo, Integer.parseInt(bucketNum));if (result >= 0) {log.info("下线分桶,bucketNo:{},desc:{}", bucketNo, bucketNum);inventoryNum = inventoryNum + Integer.parseInt(bucketNum);} else {log.info("分桶已下线,bucketNo:{}", bucketNo);failureBucketCacheList.add(bucketNo);}}}if (inventoryNum > 0) {//将下线的剩余库存加至 中心桶库存上Integer incr = tairCache.retryIncr(key, inventoryNum, 3);//当返回的值大于0,则意味本次操作回源中心桶库存成功,标记为0,当存在失败的分桶下线不会累计添加上次任务处理的中心桶库存log.info("回源中心桶,inventoryNum:{}, after value :{}", inventoryNum, incr);if (incr >= 0) {inventoryNum = 0;}}//对本次库存操作失败的分桶信息,重新写入MQ进行重试//这里只有回退库存失败的或者中心桶库存没有回源成功的才会再次发送//如果是已经被扣减掉库存的查询的时候会过滤掉if (!CollectionUtils.isEmpty(failureBucketCacheList) || inventoryNum > 0) {//发送清空下线分桶库存的消息bucketClearProducer.sendBucketClear(new BucketClearRequest(bucketLocalCache.getSkuId(), bucketLocalCache.getSellerId(), failureBucketCacheList, inventoryNum));}}...
}

11.清除执⾏成功的分桶操作的定时任务

(1)定时任务概述

(2)具体处理流程

(1)定时任务概述

这个定时任务主要处理执⾏分桶操作定时任务中处理完成的数据,也就是将库存分桶操作表中状态为已完成的数据查询出来。如果有失败的操作,则回滚该操作,然后删除该任务。

这个定时任务与执⾏分桶操作的定时任务类似,它会分⻚查询执⾏完成的分桶操作,然后放⼊队列中。按照队列处理完成后,整个流程结束,删除表里的分桶操作记录。

(2)具体处理流程

一.处理缓存中有记录的失败的分桶操作

先根据分桶操作记录,处理缓存中有记录的失败的分桶操作。根据记录分桶操作的失败类型,进行对应的回退处理。

如果是初始化分桶库存或分桶上线,则将分配给该分桶的库存放回中⼼桶中,并删除失败的操作记录。

如果是分桶扩容,则将对该分桶进行扩容的库存放回中⼼桶中,并删除当时失败的操作记录,并且将分桶失败的次数减⼀。此时缓存中有记录的、失败的分桶操作就已经处理了。

二.处理缓存中没有但数据库中有的分桶操作记录

当回退任务处理完成之后,删除失败的操作记录,以及分桶操作任务记录。

@Component
public class BucketOperateJobHandler {@DubboReference(version = "1.0.0")private InventoryServiceApi inventoryService;...@XxlJob("bucketOperateFinishedClear")public void bucketOperateFinishedClear() {XxlJobHelper.log("bucket operate finished clear job starting...");JsonResult result = inventoryService.bucketOperateFinishedClear();XxlJobHelper.log("bucket operate finished clear job end, result: {}", result);}
}@DubboService(version = "1.0.0", interfaceClass = InventoryServiceApi.class, retries = 0)
public class InventoryServiceApiImpl implements InventoryServiceApi {@Resourceprivate InventoryBucketService inventoryBucketService;...@Overridepublic JsonResult bucketOperateFinishedClear() {try {//处理执行成功的inventoryBucketService.bucketOperateFinishedClear();return JsonResult.buildSuccess();} catch (ProductBizException e) {log.error("biz error", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error", e);return JsonResult.buildError(e.getMessage());}}...
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate OperateClearQueue operateClearQueue;...//清除执行成功的分桶操作记录@Overridepublic JsonResult bucketOperateFinishedClear() {String code = SnowflakeIdWorker.getCode();boolean lock = tairLock.lock(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS, code);if (lock) {try {int page = 1;List<InventoryBucketOperateBO> inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.FINISH.getCode(), page, operateQueueNum);while (!CollectionUtils.isEmpty(inventoryBucketOperateBOS)) {//当前有多少个分桶操作,添加至缓存中inventoryBucketCache.incrOperateCount(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS_COUNT, inventoryBucketOperateBOS.size());//将分桶操作轮询添加到操作队列中for (InventoryBucketOperateBO inventoryBucketOperateBO : inventoryBucketOperateBOS) {operateClearQueue.offerByRoundRobin(inventoryBucketOperateBO);}//分页获取下一页要执行的分桶操作列表inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.FINISH.getCode(), ++page, operateQueueNum);}//等待本次所有分桶操作都处理完成while (inventoryBucketCache.getOperateCount(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS_COUNT) != 0) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}//删除已经执行完成的分桶操作记录List<Long> ids = inventoryBucketCache.getOperateId(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS);if (!CollectionUtils.isEmpty(ids)) {log.info("清除已完成的任务:{}", JSON.toJSONString(ids));inventoryRepository.deleteBatchBucketOperate(ids);}} finally {inventoryBucketCache.removeOperateId(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS);tairLock.unlock(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS, code);}}return JsonResult.buildSuccess();}...
}@Component
public class OperateClearQueue {//处理分桶操作的操作队列列表private final List<BlockingQueue> operateQueue = new ArrayList<>();//配置的操作队列数量@Value("${bucket.operate.queue-num:32}")private Integer queueNum;//处理下一个分桶操作的操作队列在队列列表中的下标private AtomicInteger index = new AtomicInteger();@Resourceprivate InventoryBucketService inventoryBucketService;@PostConstructpublic void init() {ExecutorService executors = Executors.newFixedThreadPool(queueNum);for (int i = 0; i < queueNum; i++) {//设置操作队列的最大容纳数量BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);operateQueue.add(blockingQueue);executors.execute(new OperateClearRunner(blockingQueue, inventoryBucketService));}}//轮询获取一个操作队列public boolean offerByRoundRobin(Object object) {index.compareAndSet(queueNum * 10000, 0);boolean offer = operateQueue.get(index.getAndIncrement() % queueNum).offer(object);return offer;}
}//多线程消费操作队列中的数据
public class OperateClearRunner implements Runnable {//处理分桶操作的操作队列private BlockingQueue blockingQueue;private InventoryBucketService inventoryBucketService;public OperateClearRunner(BlockingQueue blockingQueue, InventoryBucketService inventoryBucketService) {this.blockingQueue = blockingQueue;this.inventoryBucketService = inventoryBucketService;}//内部线程处理每个操作队列的数据@Overridepublic void run() {while (true) {try {if (CollectionUtils.isEmpty(blockingQueue)) {Thread.sleep(500);continue;}InventoryBucketOperateBO operate = (InventoryBucketOperateBO) blockingQueue.take();inventoryBucketService.clearBucketOperate(operate);} catch (Exception e) {log.error("处理分桶操作异常", e);}}}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//清除执行完的分桶操作@Overridepublic void clearBucketOperate(InventoryBucketOperateBO operate) {Long startTime = System.currentTimeMillis();List<InventoryOperateFailBO> inventoryOperateFailBOS = inventoryRepository.queryFailOperateList(operate.getOperateId());Map<String, InventoryOperateFailBO> operateFailBOMap = inventoryOperateFailBOS.stream().collect(Collectors.toMap(InventoryOperateFailBO::getBucketNo, Function.identity()));//先处理缓存中存储的失败记录processCacheRollback(operate, operateFailBOMap);//再处理缓存中没有存储的但是DB中存储的失败记录processDbRollback(operate, operateFailBOMap);if (!CollectionUtils.isEmpty(inventoryOperateFailBOS)) {//所有的都执行完成了,删除处理完成的数据库记录,缓存记录在处理过程中已经执行了inventoryRepository.deleteBatchOperateFail(inventoryOperateFailBOS);}//执行成功的,删除记录tairCache.delete(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId());inventoryBucketCache.addOperateId(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS, operate.getId());//任务执行后,减少任务数量inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS_COUNT);log.info("本次分桶清除任务耗时{}, 任务对象{}", System.currentTimeMillis() - startTime, JSONObject.toJSONString(operate));}//处理缓存中的失败回退private void processCacheRollback(InventoryBucketOperateBO operate, Map<String, InventoryOperateFailBO> operateFailBOMap) {List<BucketCacheBO> bucketCacheBOS = JSON.parseArray(operate.getBucket(), BucketCacheBO.class);//获取中心桶库存的keyString key = buildSellerInventoryKey(operate.getSellerId(), operate.getSkuId());for (BucketCacheBO bucketCacheBO : bucketCacheBOS) {//分桶的缓存操作有两个,一个是初始化设置库存,一个是回源增加库存//获取初始化库存失败的操作Object bucketNum = tairCache.getJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId());if (!Objects.isNull(bucketNum)) {//这里是分桶库存初始化失败了,需要将数据放回中心桶tairCache.incr(key, (Integer) bucketNum);//处理成功,将失败列表中的数据删除tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId());operateFailBOMap.remove(bucketCacheBO.getBucketNo());}//获取增加库存失败的操作bucketNum = tairCache.getJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId());if (!Objects.isNull(bucketNum)) {//这里是分桶库存增加失败了,需要将数据放回中心桶tairCache.incr(key, (Integer) bucketNum);//处理成功,将失败列表中的数据删除tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId());operateFailBOMap.remove(bucketCacheBO.getBucketNo());//处理完回退之后,如果是扩容的,需要将扩容失败次数减一if (BucketOperateEnum.CAPACITY.getCode().equals(operate.getOperateType())) {tairCache.decr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCacheBO.getBucketNo(), 1);}}}}//处理数据库中的失败回退private void processDbRollback(InventoryBucketOperateBO operate, Map<String, InventoryOperateFailBO> operateFailBOMap) {//获取中心桶库存的keyString key = buildSellerInventoryKey(operate.getSellerId(), operate.getSkuId());//如果缓存中数据操作完,数据库中还有未处理的,需要处理数据库中的if (!CollectionUtils.isEmpty(operateFailBOMap)) {for (Map.Entry<String, InventoryOperateFailBO> entry : operateFailBOMap.entrySet()) {InventoryOperateFailBO operateFailBO = entry.getValue();if (TairInventoryConstant.OPERATE_SET_FAIL.equals(operateFailBO.getFailType())) {//这里是分桶库存初始化失败了,需要将数据放回中心桶tairCache.incr(key, operateFailBO.getInventoryNum());//处理成功,将失败列表中的数据删除tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, operateFailBO.getBucketNo(), operate.getOperateId());} else if (TairInventoryConstant.OPERATE_INCR_FAIL.equals(operateFailBO.getFailType())) {//这里是分桶库存增加失败了,需要将数据放回中心桶tairCache.incr(key, operateFailBO.getInventoryNum());//处理成功,将失败列表中的数据删除tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, operateFailBO.getBucketNo(), operate.getOperateId());//处理完回退之后,如果是扩容的,需要将扩容失败次数减一if (BucketOperateEnum.CAPACITY.getCode().equals(operate.getOperateType())) {//扩容时,一次操作只有一个分桶数据变更//所以这里跟缓存中获取的失败记录,最多执行一次,不会导致这次失败的任务,将其他任务的次数减去了tairCache.decr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + operateFailBO.getBucketNo(), 1);}}}}}...
}

http://www.dtcms.com/a/265317.html

相关文章:

  • RedisCluster不可用的6大隐患
  • 通俗理解JVM细节-面试篇
  • 配置tcp的https协议证书
  • [云上玩转Qwen3系列之四]PAI-LangStudio x AI搜索开放平台 x ElasticSearch: 构建AI Search RAG全栈应用
  • JSON 安装使用教程
  • 新版本没有docker-desktop-data分发 | docker desktop 镜像迁移
  • 用Python实现两种爱心效果❤️
  • 人机协同的智能体开发范式(ADS)
  • HCIA-实现VLAN间通信
  • nrf52840蓝牙学习(定时器的应用)
  • Python 数据分析:numpy,说人话,说说数组维度。听故事学知识点怎么这么容易?
  • 从暴力穷举到智能导航,PC本地搜索被腾讯电脑管家“拯救”
  • 【Vue入门学习笔记】Vue核心语法
  • 百度文心 ERNIE 4.5 开源:开启中国多模态大模型开源新时代
  • MYSQL基础内容
  • 读VJEPA 2
  • Linux Mem -- Slub内存分配器基础
  • 08_Excel 导入 - 用户信息批量导入
  • [ linux-系统 ] 软硬链接与动静态库
  • 基于Java+SpringBoot的图书管理系统
  • 【字节跳动】数据挖掘面试题0002:从转发数据中求原视频用户以及转发的最长深度和二叉排序树指定值
  • Scala 安装使用教程
  • windows系统下将Docker Desktop安装到除了C盘的其它盘中
  • 前端可视化——Canvas实战篇
  • Docker Compose 基础——AI教你学Docker
  • 《寻北技术的全面剖析与应用前景研究报告》
  • 【4】 Deployment深入简出实战演练
  • 无代码自动化测试工具介绍
  • Java中创建线程方法以及线程池参数配置
  • (LeetCode ) 13. 罗马数字转整数 (哈希表)