当前位置: 首页 > news >正文

商品中心—16.库存分桶调配的技术文档

大纲

1.库存分桶上线

2.库存分桶下线

3.库存分桶扩容

4.库存预警

1.库存分桶上线

(1)使用入口

(2)具体实现

(1)使用入口

当库存充⾜后,可针对下线的分桶进⾏再次上线并分配库存。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;...//分桶上线接口@Overridepublic void bucketOnline(InventorOnlineRequest request) {//1.验证入参必填checkInventorOnlineParams(request);//2.注意这里需要锁定中心桶库存String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.获取中心桶的剩余库存,并校验是否可上线分桶Integer residueNum = checkBucketOnlineNum(key);//4.构建新的分桶元数据信息,并写入writeBucketCache(request, residueNum);} catch (Exception e) {log.error(e.getStackTrace().toString());} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后再重试!");}}...
}

(2)具体实现

步骤一:校验⼊参必填,即指定的商品SKU以及需要上线的分桶编号
步骤二:校验中⼼桶的剩余库存,否则上线⼀个空库存分桶毫⽆意义
步骤三:接着判断本地缓存列表⾥是否还存在下线的分桶可供上线
步骤四:当存在可上线的分桶,需要构建新的元数据信息
步骤五:写入数据到远程缓存中并更新本地缓存的分桶元数据信息

步骤一:校验⼊参必填,即指定的商品SKU以及需要上线的分桶编号。可上线的分桶编号由远程缓存保存⼀份,具体上线⼏个由调⽤⽅决定。

步骤二:校验中⼼桶的剩余库存,否则上线⼀个空库存分桶毫⽆意义。因为空库存分桶⼜会⻢上下线,这⾥需要先获取对应中⼼桶的剩余库存,验证是否可以上线分桶。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairCache tairCache;...//校验中心桶是否可上线分桶,并返回对应的可分配库存private Integer checkBucketOnlineNum(String key) {//验证缓存是否存在String residueNum = tairCache.get(key);if (StringUtils.isEmpty(residueNum)) {return 0;}//返回具体的库存return Integer.valueOf(residueNum);}...
}

步骤三:接着判断本地缓存列表⾥是否还存在下线的分桶可供上线。如果本地缓存列表⾥不存在下线的分桶,则不再处理。如果本地缓存列表⾥存在下线的分桶,则计算出中⼼桶可⽤库存可以分发⼏个分桶上线,以及分发的库存信息。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate InventoryBucketCache inventoryBucketCache;...//构建新的分桶元数据信息//@param request    分桶上线对象//@param residueNum 中心桶剩余库存private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) {String key = request.getSellerId() + request.getSkuId();//5.获取本地存储的分桶元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);if (!Objects.isNull(bucketLocalCache)) {//6.获取当前可上线的分桶列表信息以及具体上线库存List<BucketCacheBO> bucketCacheBOList = buildBucketList(request.getBucketNoList(),bucketLocalCache.getAvailableList(),bucketLocalCache.getUndistributedList(),bucketLocalCache.getInventoryBucketConfig(),residueNum);//当前可上线的分桶为空,直接返回if (CollectionUtils.isEmpty(bucketCacheBOList)) {return;}//7.构建返回新的分桶元数据模型返回buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum);//8.写入数据到远程缓存中并更新本地缓存的分桶元数据信息writeBucketLocalCache(bucketLocalCache, bucketCacheBOList);}}...
}

步骤四:当存在可上线的分桶,需要构建新的元数据信息。这⾥因为是上线,先操作缓存的库存增加,再增加上线分桶可路由配置,暂时不考虑延迟处理。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//构建新的分桶元数据模型//@param bucketLocalCache  本地分桶元数据信息//@param bucketCacheBOList 上线的分桶数据列表//@param residueNum        中心桶剩余库存private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) {//获取本次上线的库存信息Integer inventoryNum = 0;for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {inventoryNum = inventoryNum + bucketCacheBO.getBucketNum();}//填充中心桶剩余库存residueNum = residueNum - inventoryNum;bucketLocalCache.setResidueNum(residueNum);//添加新上线的分桶列表bucketLocalCache.getAvailableList().addAll(bucketCacheBOList);Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO ->//在上线的分桶列表,需要移除掉!bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList());//从不可用的分桶列表重移除bucketLocalCache.setUndistributedList(undistributedList);}...
}

步骤五:写入数据到远程缓存中并更新本地缓存的分桶元数据信息

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//写入数据到远程缓存中并更新本地缓存的分桶元数据信息//@param bucketLocalCache  分桶元数据信息//@param bucketCacheBOList 上线的分桶信息private void writeBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList) {String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();//中心桶被扣减掉的库存(上线的分桶库存总和)Integer centerInventoryNum = bucketCacheBOList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();//中心桶的库存扣减信息tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, centerInventoryNum);//1.先更新分桶的上线缓存处理操作for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {tairCache.incr(bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());}//2.处理分桶列表的更新,待中心桶库存以及上线分桶库存更新完成,更新远程和本地的分桶列表inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);}...
}

2.库存分桶下线

(1)使用入口

(2)具体实现

(1)使用入口

当后台选择下线部分分桶或者分桶库存不⾜而触发下线阈值时会调⽤。即在进行异步分桶扩容时,发现中心桶剩余库存为0时,会检查是否下线。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;...//分桶下线接口@Overridepublic void bucketOffline(InventorOfflineRequest request) {long start = System.currentTimeMillis();//1.验证入参必填checkInventorOfflineParams(request);//2.过滤只有一个分桶的无效请求Boolean isOffline = checkBucketOffline(request);if (isOffline) {//注意这里需要锁定中心桶库存String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来updateBucket(request);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}log.info("分桶下线处理时间,request:{}, lock:{}, time:{}", JSON.toJSONString(request), lock, System.currentTimeMillis() - start);}}...
}

(2)具体实现

步骤一:校验⼊参必填,指定的商品SKU以及需要下线的分桶编号。

步骤二:在竞争锁之前,先判断⼀下是否⽆需处理。过滤只有一个分桶的无效请求,避免⼤量⽆效请求竞争锁带来的开销。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate InventoryBucketCache inventoryBucketCache;...//判断待下线分桶是否可以下线private Boolean checkBucketOffline(InventorOfflineRequest request) {BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//可用分桶缓存列表大于1if (!Objects.isNull(bucketLocalCache) && bucketLocalCache.getAvailableList().size() > 1) {//并且待下线的分桶在可用分桶列表中return bucketLocalCache.getAvailableList().stream().anyMatch(bucketCache -> request.getBucketNoList().contains(bucketCache.getBucketNo()));}return false;}...
}

步骤三:先把下线的分桶从可发分桶列表中移除,包括本地缓存的列表,从而可以避免后续请求再次路由到该分桶。

待改进:应该先扣减该分桶的远程缓存,再移除所在机器的本地缓存。因为如果先移除本机器的本地缓存后,但其他机器的本地缓存没有移除。这时会造成各机器本地缓存不一致,有些请求还是会到该分桶进行扣减。

先更新分桶库存缓存,再更新本地分桶元数据缓存及远程元数据缓存,可以避免不同机器的本地分桶元数据缓存不一致。比如更新了本地缓存的机器不会路由到该分桶,而没更新本地缓存的机器依然路由到了该分桶。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate InventoryBucketCache inventoryBucketCache;...//将准备下线的分桶列表,先从本地缓存以及远程缓存列表里面移除private void updateBucket(InventorOfflineRequest request) {//获取本地和远程的分桶列表BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());if (!Objects.isNull(bucketLocalCache)) {//过滤返回下线的分桶确实存在于存活的分桶列表上Map<String, BucketCacheBO> bucketCacheMap = bucketLocalCache.getAvailableList().stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));//过滤已不存在远程缓存的列表List<String> bucketCacheList = request.getBucketNoList().stream().filter(bucketCacheMap::containsKey).collect(Collectors.toList());//过滤后,有可下线的分桶缓存if (!CollectionUtils.isEmpty(bucketCacheList)) {//分桶最少也需要保留一个if (bucketLocalCache.getAvailableList().size() > 1) {//先移除缓存的分桶列表,避免新的请求访问影响真实库存updateBucketCache(bucketCacheList, bucketLocalCache);}}}}...
}

步骤四:触发库存回收,将当前下线的分桶还存在的库存都回退到中⼼桶。这⾥需要注意的是更新下线后的元数据,此时分桶还可能在被扣减库存。需要扣减请求路由不到该分桶才可以对该分桶内的库存进⾏回源中⼼桶。这⾥默认采取的是延迟1秒,具体最优时间可由压测得出。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate InventoryBucketCache inventoryBucketCache;...//移除本地分桶的对应分桶列表以及远程的分桶列表//@param bucketCacheList 下线的分桶列表//@param bucketCache     远程缓存元数据信息private void updateBucketCache(List<String> bucketCacheList, BucketLocalCache bucketCache) {String key = bucketCache.getSellerId() + bucketCache.getSkuId();//1.获取到本地的缓存列表BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//2.填充下线的分桶到不可用列表中for (String bucketNo : bucketCacheList) {bucketLocalCache.getUndistributedList().add(new BucketCacheBO(bucketNo));}//过滤返还上线的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> !bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList());bucketLocalCache.setAvailableList(availableList);//3.从本地缓存里面更新inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);//4.覆盖远程的分桶元数据信息tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);log.info("下线分桶,分桶元数据信息:{}", JSONObject.toJSONString(bucketLocalCache));//发送清空下线分桶库存的消息bucketClearProducer.sendBucketClear(new BucketClearRequest(bucketCache.getSkuId(), bucketCache.getSellerId(), bucketCacheList, 0));}...
}//清空分桶库存的消息队列
@Component
public class BucketClearProducer {@Autowiredprivate DefaultProducer defaultProducer;//清空分桶库存的消息,MQ生产public void sendBucketClear(BucketClearRequest bucketClearRequest) {//发送清空分桶库存消息,延迟1秒,留给更多的时间给正在扣减该分桶的线程处理defaultProducer.sendMessage(RocketMqConstant.BUCKET_CLEAR_TOPIC, JSONObject.toJSONString(bucketClearRequest), 1, "清空分桶");}
}

步骤五:接收延迟消息,开始处理分桶下线。同时处理完成分桶下线时,还需检测⼀下库存是否触发预警机制。如触发了预警通知,则需要发出消息进行异步处理。

//处理清空分桶库存的消息
@Component
public class BucketClearListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行分桶下线清空库存,消息内容:{}", msg);BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class);inventoryBucketService.bucketClear(bucketClearRequest);}} catch (Exception e) {log.error("consume error, 清空分桶库存失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//清空分桶库存,分桶库存放回中央库存@Overridepublic void bucketClear(BucketClearRequest request) {long start = System.currentTimeMillis();String key = TairInventoryConstant.SELLER_BUCKET_PREFIX + request.getSellerId() + request.getSkuId();String bucketCache = tairCache.get(key);if (!StringUtils.isEmpty(bucketCache)) {BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);updateBucketInventory(request.getBucketNoList(), bucketLocalCache);}log.info("清空下线分桶库存,request:{},时间:{}", JSON.toJSONString(request), System.currentTimeMillis() - start);//商品库存值预警warningProductInventory(bucketCache);}...
}

步骤六:清理下线分桶库存逻辑处理

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//将分桶的缓存库存返回给中心桶库存上private void updateBucketInventory(List<String> bucketCacheList, BucketLocalCache bucketLocalCache) {//中心桶的库存keyString key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();Integer inventoryNum = 0;//下线的分桶列表List<String> undistributedList = bucketLocalCache.getUndistributedList().stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());//只处理已经下线的分桶bucketCacheList = bucketCacheList.stream().filter(undistributedList::contains).collect(Collectors.toList());if (CollectionUtils.isEmpty(bucketCacheList)) {return;}for (String bucketNo : bucketCacheList) {//先获取下线的分桶实际剩余库存String bucketNum = tairCache.get(bucketNo);//当分桶的库存大于0的时候才处理if (!StringUtils.isEmpty(bucketNum) && Integer.valueOf(bucketNum) > 0) {//清理下线的分桶库存,设置为0Integer result = tairCache.decr(bucketNo, Integer.parseInt(bucketNum));if (result >= 0) {log.error("下线分桶,bucketNo:{},desc:{}", bucketNo, bucketNum);inventoryNum = inventoryNum + Integer.parseInt(bucketNum);} else {log.error("分桶已下线,bucketNo:{}", bucketNo);}}}if (inventoryNum > 0) {//将下线的剩余库存加至 中心桶库存上Integer incr = tairCache.incr(key, inventoryNum);log.error("回源中心桶,inventoryNum:{}, after value :{}", inventoryNum, incr);}}...
}

3.库存分桶扩容

(1)使用入口

(2)具体实现

(1)使用入口

每次对库存分桶缓存扣减库存时,都会检查是否需要进行分桶扩容。当分桶的剩余库存小于回源比例时,就会触发发送消息进行异步分桶扩容。

@Component
public class BucketCapacityListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行分桶库存扩容,消息内容:{}", msg);BucketCapacity bucketCapacity = JsonUtil.json2Object(msg, BucketCapacity.class);inventoryBucketService.bucketCapacity(bucketCapacity);}} catch (Exception e) {log.error("consume error, 分桶库存扩容失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate TairLock tairLock;...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {//先锁住中心桶库存,避免此时库存发生变化String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String value = SnowflakeIdWorker.getCode();//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶库存的库存Integer residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}...
}

(2)具体实现

步骤一:验证当前扩容分桶是否已经被处理
步骤二:获取中⼼桶缓存的剩余库存,校验是否存在库存
步骤三:对分桶进行扩容
步骤四:获取扩容后的预估库存深度
步骤五:刷新分桶元数据缓存
步骤六:将计算好的分桶库存和中⼼桶库存进⾏增加和扣减操作

步骤一:验证当前扩容分桶是否已经被处理。如果库存⽆需处理,则快速结束,避免因为争抢锁影响性能。只有需要扩容的才上锁进⾏后续校验,并且上锁后需要再次校验。Double Check可以避免部分⽆效请求。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//校验本次请求是否还需要执行扩容处理private BucketCapacityContext checkBucketCapacity(BucketCapacity bucketCapacity) {String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();//1.获取远程的分桶缓存Integer residueNum = getBucketInventoryNum(bucketCapacity.getBucketNo());//2.获取缓存元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//3.校验是否还需要执行扩容List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();for (BucketCacheBO bucketCacheBO : availableList) {//具体使用的是哪个分桶进行扣减库存if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {//触发回源比例的百分比Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();//当前分桶的分配总库存Integer bucketNum = bucketCacheBO.getBucketNum();int backSourceNum = bucketNum * backSourceProportion / 100;//回源比例的库存 大于剩余的库存,触发异步扩容return new BucketCapacityContext(residueNum, backSourceNum > residueNum, bucketCapacity);}}//如果不在可用列表里面,则意味已下线,快速结束掉return new BucketCapacityContext(residueNum, false, bucketCapacity);}...
}

步骤二:获取中⼼桶缓存的剩余库存,校验是否存在库存。有库存才扩容,⽆库存则验证是否触发下线阈值。如果触发下线阈值,则发送消息进行通知,异步处理分桶下线。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//返回中心桶库存,如返回中心库存大于0则允许处理private Integer getCenterStock(BucketCapacity bucketCapacity) {String key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String centreCacheNum = tairCache.get(key);if (!StringUtils.isEmpty(centreCacheNum)) {return Integer.valueOf(centreCacheNum);}return 0;}...
}

步骤三:对分桶进行扩容。分桶需要扩容多少库存,需要注意尽量保证每个分桶的库存尽可能均匀。如果中心桶库存超过最大深度库存,则直接以配置的回源步长增长库存,否则汇总当前分桶的实际库存深度。也就是根据当前的可⽤分桶列表、中⼼桶库存、总的可⽤库存深度,计算出平均的⼀个可分配库存数量,从而避免每个分桶扩容的库存不均匀(最⼩值必须超过最⼩库存深度)。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//回源库存到分桶上//@param residueNum            中心桶库存//@param bucketCapacityContext 扩容上下文对象private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {//首先需要当前分桶的库存,其次还需要获取目前分桶的可发库存深度(第一次初始化的时候分配的库存)//根据当初分配的库存深度以及最大库存深度以及中心桶库存,得出均匀到目前支持可用的分桶均匀分配库存大概数量//同时根据本次同步的库存数量刷新分桶的实际库存深度BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();//先获取本地的分桶元数据信息,获取当前分桶的总发放上限String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();Integer inventoryNum = 0;//获取实际配置的最大可用库存深度Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();BucketCacheBO bucketCache = null;for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {bucketCache = bucketCacheBO;break;}}//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程if (Objects.isNull(bucketCache)) {return;}//3.中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存if (residueNum > maxBucketNum) {inventoryNum = inventoryBucketConfig.getBackSourceStep();} else {inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);}//4.获取扩容后的预估库存深度Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);//5.更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);//6.回源分桶的库存Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);//7.扣减中心桶库存Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);}...//计算出均匀后的每个分桶实际分配的库存值//@param maxBucketNum          最大的库存深度//@param inventoryBucketConfig 分桶配置//@param residueNum            中心桶剩余库存//@param bucketCache           扩容分桶private Integer calcEvenInventoryNum(Integer maxBucketNum, InventoryBucketConfigDO inventoryBucketConfig, Integer residueNum, BucketCacheBO bucketCache) {//获取当前扩容的分桶深度Integer bucketDepthNum = bucketCache.getBucketNum();//得到扩容的分桶深度 和当前全部可用分桶的库存深度,计算占比//根据占比计算出回源的步长,注意最小深度,如果计算后的步长小于最小库存深度,则默认取最小库存深度BigDecimal proportion = new BigDecimal(bucketDepthNum).divide(new BigDecimal(maxBucketNum), 6, BigDecimal.ROUND_DOWN);//根据比例计算出可分配的库存BigDecimal allotNum = new BigDecimal(residueNum).multiply(proportion).setScale(0, BigDecimal.ROUND_DOWN);if (allotNum.compareTo(new BigDecimal(inventoryBucketConfig.getMinDepthNum())) < 0) {allotNum = new BigDecimal(inventoryBucketConfig.getMinDepthNum());}//当最小深度都已无法满足剩余库存,则以实际剩余库存扩容if (new BigDecimal(residueNum).compareTo(allotNum) < 0) {return residueNum;}//得到扩容的库存值return allotNum.intValue();}...
}

步骤四:获取扩容后的预估库存深度,此时分桶的库存深度发⽣变化,如果扩容的库存深度超过当时分配的库存深度,且未超过最⼤库存深度,则以当前分配的实际库存更新当前分桶库存深度。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//返回目前扩容后的库存深度,库存深度只允许增长不允许减少//@param inventoryNum          步长扩容库存//@param inventoryBucketConfig 分桶配置信息//@param bucketCache           分桶信息private Integer getMaxDepthNum(Integer inventoryNum, InventoryBucketConfigDO inventoryBucketConfig, BucketCacheBO bucketCache, BucketCapacityContext bucketCapacityContext) {//获取当前分桶的实际库存,实际库存和真实库存会有差异,但是这里只是计算一个大概库存深度,无需精确Integer residueNum = bucketCapacityContext.getResidueNum();//预估出实际库存深度,当前分桶库存 + 步长增长库存Integer maxBucketNum = residueNum + inventoryNum;if (bucketCache.getBucketNum() > maxBucketNum) {return bucketCache.getBucketNum();}log.info("前分桶的实际库存:{},预估的实际库存深度:{}", residueNum, maxBucketNum);//实际库存深度,不能超过配置的最大库存深度,同理,最小深度也不能小于最小的库存深度if (inventoryBucketConfig.getMaxDepthNum() < maxBucketNum) {return inventoryBucketConfig.getMaxDepthNum();}return maxBucketNum;}...
}

步骤五:刷新分桶元数据缓存

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//刷新分桶元数据缓存//@param maxDepthNum      分桶最大库存深度//@param bucketLocalCache 分桶元数据信息//@param bucketNo         分桶编号private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) {List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketNo)) {//每次库存具体深度变化都要更细,否则很容易触发 回源的比例bucketCacheBO.setBucketNum(maxDepthNum);bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum()));break;}}String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();//1.刷新本地缓存inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);//2.刷新远程缓存tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);}...
}

步骤六:将计算好的分桶库存和中⼼桶库存进⾏增加和扣减操作。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//回源库存到分桶上//@param residueNum            中心桶库存//@param bucketCapacityContext 扩容上下文对象private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();//先获取本地的分桶元数据信息,获取当前分桶的总发放上限String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();Integer inventoryNum = 0;//获取实际配置的最大可用库存深度Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();BucketCacheBO bucketCache = null;for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {bucketCache = bucketCacheBO;break;}}//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程if (Objects.isNull(bucketCache)) {return;}//3.中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存if (residueNum > maxBucketNum) {inventoryNum = inventoryBucketConfig.getBackSourceStep();} else {inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);}//4.获取扩容后的预估库存深度Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);//5.更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);//6.回源分桶的库存Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);//7.扣减中心桶库存Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);}...
}

4.库存预警

(1)库存系统发送库存预警消息

(2)商品可采可补可售系统消费预警消息

(1)库存系统发送库存预警消息

进行分桶扩容时,如果发现中心桶没有库存,就会触发检查是否要下线。在对分桶库存进行下线时,会清空分桶库存,并且进行库存预警。

//处理清空分桶库存的消息
@Component
public class BucketClearListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行分桶下线清空库存,消息内容:{}", msg);BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class);inventoryBucketService.bucketClear(bucketClearRequest);}} catch (Exception e) {log.error("consume error, 清空分桶库存失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//清空分桶库存,分桶库存放回中央库存@Overridepublic void bucketClear(BucketClearRequest request) {long start = System.currentTimeMillis();String key = TairInventoryConstant.SELLER_BUCKET_PREFIX + request.getSellerId() + request.getSkuId();String bucketCache = tairCache.get(key);if (!StringUtils.isEmpty(bucketCache)) {BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);updateBucketInventory(request.getBucketNoList(), bucketLocalCache);}log.info("清空下线分桶库存,request:{},时间:{}", JSON.toJSONString(request), System.currentTimeMillis() - start);//商品库存值预警warningProductInventory(bucketCache);}//对商品的库存发生变化进行预警处理private void warningProductInventory(String bucketCache) {//1.批量获取一下可用的缓存分桶列表编号BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();//2.批量获取汇总商品剩余库存(分桶下线代表中心桶库存已经没有了,不校验中心桶库存)List<String> cacheKeyList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());List<String> productInventoryList = tairCache.mget(cacheKeyList);//3.检测卖家单个商品的总库存 是否触发最小值预警或者百分比预警,是则异步消息通知供需服务Integer sumInventoryNum = 0;for (int i = 0; i < productInventoryList.size(); i++) {String inventoryNum = productInventoryList.get(i);if (StringUtils.isNotEmpty(inventoryNum)) {sumInventoryNum = sumInventoryNum + Integer.valueOf(inventoryNum);}}Boolean isWarning = false;//如果实际库存值,小于预警值,或者总库存触发比例阈值,异步消息通知if (sumInventoryNum < warningInventoryNum) {isWarning = true;}//未触发最小库存预警,检测是否触发最小比例预警if (!isWarning) {//总的库存深度,不仅仅要看可用分桶的库存深度,还要看下线的库存深度,从而计算出一个当时实际分配的库存深度,计算出一个预警值int sumBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList();if (!CollectionUtils.isEmpty(undistributedList)) {sumBucketNum = sumBucketNum + undistributedList.stream().mapToInt(cacheBO -> Objects.isNull(cacheBO.getBucketNum()) ? 0 : cacheBO.getBucketNum()).sum();}log.info("总的库存深度:{}", sumBucketNum);//预警比例BigDecimal warningProportion = new BigDecimal(proportion).divide(new BigDecimal(100), 3, BigDecimal.ROUND_DOWN);//库存占比BigDecimal inventoryProportion = new BigDecimal(sumInventoryNum).divide(new BigDecimal(sumBucketNum), 6, BigDecimal.ROUND_HALF_UP);//配置的预警比例,大于分配的实际库存深度和已剩的库存占比if (warningProportion.compareTo(inventoryProportion) > 0) {isWarning = true;}}//异步消息通知预警if (isWarning) {WarningInventoryDTO warningInventoryDTO = inventoryConverter.converterDTO(bucketLocalCache);warningInventoryProducer.sendWarningInventory(warningInventoryDTO);}}...
}@Component
public class WarningInventoryProducer {@Autowiredprivate DefaultProducer defaultProducer;//库存预警的消息 MQ生产public void sendWarningInventory(WarningInventoryDTO warningInventoryDTO) {//发送分库存预警消息defaultProducer.sendMessage(RocketMqConstant.WARNING_INVENTORY_TOPIC, JSONObject.toJSONString(warningInventoryDTO), "库存预警");}
}

(2)商品可采可补可售系统消费预警消息

@Component
public class WarningInventoryListener implements MessageListenerConcurrently {@Resourceprivate WarningInventoryService warningInventoryService;@Resourceprivate InventoryRemote inventoryRemote;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行库存预警,消息内容:{}", msg);WarningInventoryDTO warningInventoryDTO = JsonUtil.json2Object(msg, WarningInventoryDTO.class);//1.入预警缓存JsonResult jsonResult = warningInventoryService.warningInventory(warningInventoryDTO);//2.通知库存消息预警,只有有效的才通知if (jsonResult.getSuccess()) {inventoryRemote.warningInventoryMessage(warningInventoryDTO);}}} catch (Exception e) {log.error("consume error, 库存预警失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class WarningInventoryServiceImpl implements WarningInventoryService {@Autowiredprivate RedisReadWriteManager redisReadWriteManager;@Autowiredprivate RedisManagerRepository redisManagerRepository;@Overridepublic JsonResult warningInventory(WarningInventoryDTO warningInventoryDTO) {List<Long> sellerIdList = Arrays.asList(warningInventoryDTO.getSellerId());//1.查询缓存,key为卖家ID,value为对应的sku列表Map<Long, List<String>> redisSetMap = redisReadWriteManager.getRedisSortedSet(sellerIdList, AbstractRedisKeyConstants::getWarningInventoryZsetKey);//2.获取这个卖家下的库存预警商品列表List<String> skuList = redisSetMap.get(warningInventoryDTO.getSellerId());//3.返回缓存操作的模型对象Map<String, RedisSortedSetCache> redisSortedSetCacheMap = redisManagerRepository.diffWarningInventory(skuList, warningInventoryDTO, AbstractRedisKeyConstants::getWarningInventoryZsetKey);if (redisSortedSetCacheMap.size() > 0) {//4.执行数据缓存更新redisReadWriteManager.flushIncrSortedSetMap(redisSortedSetCacheMap);return JsonResult.buildSuccess();}return JsonResult.buildError("重复库存预警通知");}
}

http://www.dtcms.com/a/263281.html

相关文章:

  • 【分布式】自定义统一状态机流转设计
  • Flowable01SpringBoot项目的引入--------------------每天都会更新,自学中
  • 组成原理精讲课--硬布线控制器和微程序控制器
  • STM32之火焰传感器模块(四针)
  • 11、类加载器
  • 项目:数据库应用系统开发:智能电商管理系统
  • 【Springai】项目实战进度和规划
  • 【FR801xH】富芮坤FR801xH之PMU GPIO
  • OpenCV CUDA模块设备层----- 正切(tangent)运算函数tan()
  • Python 数据分析与机器学习入门 (五):Matplotlib 数据可视化基础
  • R1-Searcher使用强化学习增强语言模型解决问题的搜索能力
  • WebSocket 的核心原理和工作流程
  • 前端Vue面试八股常考题(一)
  • 企业流程知识:《超越再造:以流程为中心的组织如何改变我们的工作和生活》读书笔记
  • 力扣面试150(7/150)
  • 【c/c++2】多线程,动静态库,信号,socket
  • 如何让宿主机完全看不到Wi-Fi?虚拟机独立联网隐匿上网实战!
  • 【知识图谱构建系列7】:结果评价(1)
  • 可编辑39页PPT | 数字化工厂蓝图整体框架建设举措建设路径实施路线规划建设方案
  • 从入门到精通:npm、npx、nvm 包管理工具详解及常用命令
  • Microsoft Edge 打开无反应、打开后显示兼容性问题、卸载重装 解决方案。一键卸载Microsoft Edge 。
  • 卫朋:华为流程体系拆解系列——IPD流程L1-L6分级导入实战演练
  • android BottomSheet及AlertDialog的几种material3 常见ui的用法
  • vue上传各种文件,并预览组件,(预览,下载),下载resources目录下文件
  • vmware 17 安装win11 24h2
  • 【解析】 微服务测试工具Parasoft SOAtest如何为响应式架构助力?
  • MongoDB 常见查询语法与命令详解
  • 设计模式精讲 Day 19:观察者模式(Observer Pattern)
  • 自由学习记录(64)
  • 傅里叶变换理解