当前位置: 首页 > news >正文

商品中心—19.库存分桶高并发的优化文档

大纲

1.库存扣减时获取分桶元数据的优化

2.库存扣减的分桶路由⾃增序号优化

3.库存扣减明细消息异步发送到MQ优化

4.库存扣减明细key热点缓存打散优化

5.分桶⽆法扩容时快速触发下线

6.多个分桶同时触发下线的并发场景优化

7.⾼并发下分桶被全部下线如何修复

8.优化后的库存SQL

9.其他的优化

1.库存扣减时获取分桶元数据的优化

(1)优化原因

(2)解决⽅案

(1)优化原因

库存扣减过程中,⼤量的请求会加载本地缓存中的分桶元数据信息。在填充可⽤分桶到扣减上下⽂中,会调用缓存的分桶元数据信息的读方法。比如会调用本地缓存的分桶元数据对象bucketLocalCache的getAvailableList()方法。而当增加库存、分桶上下线时,会修改本地缓存的分桶元数据对象bucketLocalCache。

所以如果出现大量扣减请求时,也发生对本地缓存的分桶元数据对象修改,那么就会出现并发的读写问题,从而导致偶尔出现读方法的延迟问题。

优化前的部分日志:

...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 179毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 161毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 71毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 620毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 74毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 9毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 28毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 89毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 134毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒

优化前的代码:

//库存扣减业务实现类
@Service
public class InventoryServiceImpl implements InventoryService {...//构建接下来用于具体扣减库存所需要的模型对象private BucketContext buildDeductProductStock(InventoryRequest request) {//1.填充扣减库存相关信息明细InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);//2.填充扣减库存的分桶配置信息BucketContext bucketContext = buildDeductBucketList(request);bucketContext.setInventoryDetail(inventoryDetail);return bucketContext;}//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取缓存中的分桶元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();//获取本次扣减请求的次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过取模运算得到本次扣减需要定位到的分桶列表下标int index = incrementCount % availableList.size();log.info("本次可用分桶列表数量:{},扣减下标:{}", availableList.size(), index);//获取本次扣减准备处理的分桶信息,避免扣减失败(分桶已下线或者库存不足),多备份几个BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);context.getBucketNoList().add(bucketCacheBO.getBucketNo());context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());//如果其他分桶都作为备用分桶,那么就可以实现库存合并扣减的功能了for (int i = 0; i < 2; i++) {//任意填充2个作为备份Random random = new Random();int num = random.nextInt(availableList.size());BucketCacheBO bucketCache = availableList.get(num);//避免拿到重复的分桶,这里处理一下if (context.getBucketNoList().contains(bucketCache.getBucketNo())) {i--;continue;}context.getAvailableList().add(bucketCache);context.getBucketNoList().add(bucketCache.getBucketNo());}return context;}...
}@Component
@Data
public class InventoryBucketCache {@Autowiredprivate Cache cache;@Autowiredprivate TairCache tairCache;//本地存储分桶元数据信息,增加库存、分桶扩容、分桶上下线时就会触发调用这个方法修改本地缓存对象public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {log.info("local cache set key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));cache.put(bucketKey, bucketLocalCache);}//获取本地的分桶元数据信息public BucketLocalCache getBucketLocalCache(String bucketKey) {//先查本地缓存BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));if (Objects.isNull(bucketLocalCache)) {//再查远程缓存synchronized (bucketKey.intern()) {String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);if (!StringUtils.isEmpty(bucketCache)) {bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);cache.put(bucketKey, bucketLocalCache);}}}return bucketLocalCache;}
}

(2)解决⽅案

由于库存分桶元数据的对象变量是库存扣减请求和库存调配请求共⽤的,所以可以就将该变量交给ThreadLocal来管理其线程副本。

注意:只需要对分桶元数据的对象进行读取时使用ThreadLocal线程副本即可,对分桶元数据的对象进行修改时没必要使用ThreadLocal线程副本。

当使⽤ThreadLocal维护缓存的分桶元数据变量时,ThreadLocal为会每个使⽤该变量的线程提供独⽴的变量副本。从而每个线程都可以独⽴改变⾃⼰的副本,⽽不会影响其它线程的副本。

优化后,获取本地缓存的分桶元数据对象的可⽤分桶列表的耗时都为0毫秒了,没有再出现⼏⼗上百毫秒的情况。

@Component
@Data
public class InventoryBucketCache {//每次获取本地缓存存储的分桶元数据信息时,需要使⽤ThreadLocal来存储,避免线程之间的竞争private ThreadLocal<BucketLocalCache> bucketLocalCacheThreadLocal = new ThreadLocal<>();...//获取本地缓存的分桶元数据信息public BucketLocalCache getBucketLocalCache(String bucketKey) {bucketKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey;//先查本地缓存BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));if (Objects.isNull(bucketLocalCache)) {//再查远程缓存Long startTime = System.currentTimeMillis();synchronized (bucketKey.intern()) {String bucketCache = getBucketCache(bucketKey);if (!StringUtils.isEmpty(bucketCache)) {bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);cache.put(bucketKey, bucketLocalCache);}log.error("本地加载缓存模型未命中缓存,远程重新加载耗时{}毫秒", System.currentTimeMillis() - startTime);}}bucketLocalCacheThreadLocal.set(bucketLocalCache);return bucketLocalCacheThreadLocal.get();}public void threadLocalRemove() {bucketLocalCacheThreadLocal.remove();}...
}@Service
public class InventoryServiceImpl implements InventoryService {...//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();if (!CollectionUtils.isEmpty(availableList)) {//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过运算得到本次访问所需要定位的分桶int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶//全部分桶都作为备份,就是合并扣减的实现了for (int i = 0; i < 2; i++) {//填充2个作为备份,如果超过下标则从0开始继续取int num = index + i;if (num >= availableList.size()) {num = 0;}BucketCacheBO bucketCache = availableList.get(num);context.getAvailableList().add(bucketCache);}} else {//并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上)BucketCacheBO bucketCacheBO = new BucketCacheBO();bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()));//中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大得锁//所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在bucketCacheBO.setBucketNum(0);context.getAvailableList().add(bucketCacheBO);//异步消息发送同步本地缓存的消息bucketRefreshProducer.sendBucketOffline(request);}Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum());String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());context.setInventoryDetailKey(inventoryDetailKey);inventoryBucketCache.threadLocalRemove();return context;}...
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//刷新分桶元数据缓存//@param maxDepthNum      分桶最大库存深度//@param bucketLocalCache 分桶元数据信息//@param bucketNo         分桶编号private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) {List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketNo)) {//每次库存具体深度变化都要更细,否则很容易触发回源的比例bucketCacheBO.setBucketNum(maxDepthNum);bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum()));break;}}String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//刷新本地缓存inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);}...
}

2.库存扣减的分桶路由⾃增序号优化

(1)优化原因

(2)解决⽅案

(1)优化原因

每次库存扣减,对应的路由分桶原本是通过缓存的⾃增序号来获取的。但是由于是同⼀个key,⾼并发压⼒下,这个key的访问压⼒很⼤。进⽽部分请求出现阻塞,获取序列号的性能下降。

优化前的部分日志:

...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 3毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 6毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 4毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 238毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 258毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 3毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 0毫秒

优化前的代码:

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;...//获取对应售卖商品的扣减访问次数private Integer getIncrementCount(InventoryRequest request) {String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();Integer incrementCount = tairCache.incr(incrementKey);return incrementCount;}...
}

(2)解决⽅案

使⽤号段的方案,每次⾃增获取⼀万个序列号。消费序列号过程中如序列号使⽤过快,则⾃动增⻓序列号的⻓度。并在使⽤过程中,提前⽣成⼀批新的序列号等待使⽤。

使⽤该⽅案负责⾃增序号的⽣成后,获取扣减分桶耗时稳定在0毫秒内。除了第⼀次不存在序号时初始化耗时会⾼⼀些,后续请求性能稳定。

@Service
public class InventoryServiceImpl implements InventoryService {@Autowiredprivate SegmentNoGen segmentNoGen;...//获取对应售卖商品的扣减访问次数//这里考虑并发的时候自增导致性能过低,所以采取了批量获取一批序号,当这批序号被使用完以后才会再次获取一次private Integer getIncrementCount(InventoryRequest request) {String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();Long incrementCount = segmentNoGen.genNewNo(incrementKey);if (incrementCount > 0) {return incrementCount.intValue();}//避免获取缓存的时候出现异常,当为负数的时候默认取第一个,分桶最少存在1个return 0;}...
}//号段ID生成器组件
@Service
public class SegmentIDGenImpl implements SegmentIDGen {//下一次异步更新比率因子public static final double NEXT_INIT_FACTOR = 0.9;//最大步长不超过100,0000private static final int MAX_STEP = 1000000;//默认一个Segment会维持的时间为15分钟//如果在15分钟内Segment就消耗完了,则步长要扩容一倍,但不能超过MAX_STEP//如果在超过15*2=30分钟才将Segment消耗完,则步长要缩容一倍,但不能低于MIN_STEP,MIN_STEP的值为数据库中初始的step字段值private static final long SEGMENT_DURATION = 15 * 60 * 1000L;//更新因子//更新因子=2时,表示成倍扩容或者折半缩容private static final int EXPAND_FACTOR = 2;private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());@Autowiredprivate LeafAllocNoRepository leafAllocNoRepository;@Resourceprivate SegmentIDCache cache;//生成新的ID@Overridepublic Long genNewNo(String bizTag) {if (!cache.isInitOk()) {throw new RuntimeException("not init");}//如果没有,此时需要初始化一个if (!cache.containsKey(bizTag)) {leafAllocNoRepository.insertLeadAlloc(bizTag);cache.updateCacheFromDb(bizTag);}SegmentBuffer buffer = cache.getValue(bizTag);if (!buffer.isInitOk()) {synchronized (buffer) {if (!buffer.isInitOk()) {try {updateSegmentFromDb(bizTag, buffer.getCurrent());log.info("Init buffer. Update leafkey {} {} from db", bizTag, buffer.getCurrent());buffer.setInitOk(true);} catch (Exception e) {log.warn("Init buffer {} exception", buffer.getCurrent(), e);throw new RuntimeException("init error:" + bizTag);}}}}return getIdFromSegmentBuffer(buffer);}...
}

3.库存扣减明细消息异步发送到MQ优化

(1)优化原因

(2)解决⽅案

(1)优化原因

每次库存扣减,都需要发送消息来进行异步记录⼀条库存扣减明细。由于原来发送消息时是等待消息发送成功后才返回,这会导致⾼并发下消息的吞吐量上不去,从⽽影响整体库存扣减的性能。

优化前的代码:

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)){return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入,则写入失败并回退库存writeInventoryDetail(bucketContext);} catch (Exception e){e.printStackTrace();return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取库存扣减的明细详情InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0){//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Component
public class InventoryDetailProducer {@Autowiredprivate DefaultProducer defaultProducer;//库存扣减明细 MQ生产public void sendInventoryDetail(InventoryDetail inventoryDetail) {//发送库存扣减明细保存消息defaultProducer.sendMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC,JSONObject.toJSONString(inventoryDetail), "库存扣减");}
}@Component
public class DefaultProducer {private DefaultMQProducer producer;@Autowiredpublic DefaultProducer(RocketMQProperties rocketMQProperties) {producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP);producer.setNamesrvAddr(rocketMQProperties.getNameServer());start();}//对象在使用之前必须要调用一次,只能初始化一次public void start() {try {this.producer.start();} catch (MQClientException e) {log.error("producer start error", e);}}...//发送消息public void sendMessage(String topic, String message, String type) {sendMessage(topic, message, -1, type);}//发送消息,同步等待消息发送请求返回成功public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));try {if (delayTimeLevel > 0) {msg.setDelayTimeLevel(delayTimeLevel);}SendResult send = producer.send(msg);if (SendStatus.SEND_OK == send.getSendStatus()) {log.info("发送MQ消息成功, type:{}, message:{}", type, message);} else {throw new ProductBizException(send.getSendStatus().toString());}} catch (Exception e) {log.error("发送MQ消息失败:", e);throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED);}}...
}

(2)解决⽅案

可以使⽤消息的异步发送,这样可以不用等待Broker返回结果。但是库存扣减明细消息是不允许丢失的,异步发送消息就可能发送失败。所以对于发送消息时返回发送失败的,可以进⾏重试处理。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)) {return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入失败,则回退库存writeInventoryDetail(bucketContext);} catch (Exception e) {log.error("库存扣减失败", e);return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhsetNx(bucketContext.getInventoryDetailKey(), String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Component
public class InventoryDetailProducer {@Autowiredprivate DefaultProducer defaultProducer;//库存扣减明细 MQ生产public void sendInventoryDetail(InventoryDetail inventoryDetail) {//发送库存扣减 明细保存消息defaultProducer.sendAsyncMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC,JSONObject.toJSONString(inventoryDetail), "库存扣减明细");}
}@Component
public class DefaultProducer {private DefaultMQProducer producer;@Autowiredpublic DefaultProducer(RocketMQProperties rocketMQProperties) {producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP);producer.setNamesrvAddr(rocketMQProperties.getNameServer());start();}public DefaultMQProducer getProducer() {return this.producer;}//对象在使用之前必须要调用一次,只能初始化一次public void start() {try {this.producer.start();} catch (MQClientException e) {log.error("producer start error", e);}}...//异步发送消息public void sendAsyncMessage(String topic, String message, String type) {Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));try {//2.异步发送producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {}@Overridepublic void onException(Throwable e) {//对于失败的消息,要做重试处理log.error("发送MQ消息失败, type:{}, message:{}", type, message, e);}});} catch (Exception e) {log.error("发送MQ消息失败, type:{}, message:{}", type, message, e);throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED);}}...
}

4.库存扣减明细key热点缓存打散优化

(1)优化原因

(2)解决⽅案

(1)优化原因

库存进⾏分桶后,库存扣减的并发请求会均匀打散到多个缓存分⽚上。但库存扣减明细的key并没有进行缓存分片,⾼并发下会导致库存扣减明细的热key都集中在同⼀个分⽚上,从⽽影响写⼊性能。而其它⼏个缓存分⽚的性能还没有压到极限,所以要提升库存的性能,还需处理库存扣减明细的的热点key问题。

优化前的代码:

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取库存扣减的明细详情InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0){//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}

(2)解决⽅案

⽣成库存分桶的同时,还需要⽣成⼀份用于库存扣减明细的key。这样当发生库存扣减时,就可以对订单号ID进⾏Hash,然后与分桶数量进⾏取模。从而计算出要使⽤的库存明细的缓存key,实现对库存明细缓存的写入按缓存key均匀打散到不同分⽚上。

注意如下代码中的:

BucketContext.setInventoryDetailKey() + getInventoryDetailKey();
BucketLocalCache.setBucketDetailKeyList() + getBucketDetailKeyList();
@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();if (!CollectionUtils.isEmpty(availableList)) {//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过运算得到本次访问所需要定位的分桶int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶//全部分桶都作为备份,就是合并扣减的实现了for (int i = 0; i < 2; i++) {//填充2个作为备份,如果超过下标则从0开始继续取int num = index + i;if (num >= availableList.size()) {num = 0;}BucketCacheBO bucketCache = availableList.get(num);context.getAvailableList().add(bucketCache);}} else {//并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上)BucketCacheBO bucketCacheBO = new BucketCacheBO();bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()));//中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大得锁//所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在bucketCacheBO.setBucketNum(0);context.getAvailableList().add(bucketCacheBO);//异步消息发送同步本地缓存的消息bucketRefreshProducer.sendBucketOffline(request);}Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum());String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());context.setInventoryDetailKey(inventoryDetailKey);inventoryBucketCache.threadLocalRemove();return context;}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhsetNx(bucketContext.getInventoryDetailKey(), String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//构建缓存模型//@param key//@param bucketNum             分桶数量//@param inventoryNum          分桶分配的库存数量//@param residueNum            剩余的未分配均匀的库存//@param inventoryBucketConfig 分桶配置信息private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventoryNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {BucketLocalCache bucketLocalCache = new BucketLocalCache();//先获取得到这个模板配置的对应可分槽位的均匀桶列表List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum());List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum);List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum);//构建出多个分桶对象for (int i = 0; i < bucketNum; i++) {//生成对应的分桶编号,方便定义到具体的分桶上BucketCacheBO bucketCache = new BucketCacheBO();String bucketNo = bucketNoList.get(i);bucketCache.setBucketNo(bucketNo);//最后一个分桶,分配剩余未除尽的库存+平均库存if (i == bucketNum - 1) {bucketCache.setBucketNum(inventoryNum + residueNum);} else {bucketCache.setBucketNum(inventoryNum);}bucketCacheBOList.add(bucketCache);}//生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用if (bucketNoList.size() > bucketNum) {for (int i = bucketNum; i < bucketNoList.size(); i++) {BucketCacheBO bucketCache = new BucketCacheBO();String bucketNo = bucketNoList.get(i);bucketCache.setBucketNo(bucketNo);undistributedList.add(bucketCache);}}//生成缓存的明细keyList<String> bucketDetailKeyList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum(), "%07d");//设置分桶缓存明细的keybucketLocalCache.setBucketDetailKeyList(bucketDetailKeyList);//设置可用的分桶缓存列表bucketLocalCache.setAvailableList(bucketCacheBOList);//设置不可用或者已下线的分桶缓存列表bucketLocalCache.setUndistributedList(undistributedList);return bucketLocalCache;}...
}public class InventorBucketUtil {private static final int MAX_SIZE = 100000;//生成对应的槽位key,明细使用,多使用一位区分//@param key       卖家Id+商品skuId//@param bucketNum 分桶配置数量//@return 预先保留的槽位集合public static List<String> createBucketNoList(String key, Integer bucketNum, String format) {Map<Long, String> cacheKey = new HashMap<>(bucketNum);//bucketNoList用来存放每个桶对应的hashKeyList<String> bucketNoList = new ArrayList<>(bucketNum);//分配桶的编号for (int i = 1; i <= MAX_SIZE; i++) {String serialNum = String.format(format, i);//卖家ID + 商品SKU ID + 序号String hashKey = key + serialNum;//一致性哈希算法murmurlong hash = HashUtil.murMurHash(hashKey.getBytes());//对分桶数量进行取模运算long c = (hash %= bucketNum);//确保被选中的hashKey都能哈希到不同的分桶if (cacheKey.containsKey(c)) {continue;}cacheKey.put(c, hashKey);bucketNoList.add(hashKey);if (cacheKey.size() >= bucketNum) {break;}}return bucketNoList;}...
}

5.分桶⽆法扩容时快速触发下线

(1)优化原因

(2)解决⽅案

(1)优化原因

分桶扩容时,为避免并发操作中⼼桶库存,锁的维度是卖家ID + 商品ID。但是分桶默认配置是32个,⼤量并发请求下,可能会导致瞬间出现库存的多个分桶触发分桶下线阈值。

高并发下的每个请求都在锁内部排队,等待验证是否需要触发分桶下线。而且在⾼并发下,很多库存扣减请求都会路由到某个分桶。这样该分桶还未下线时,就可能⾯临分桶库存很快被扣光的情况。

也就是⼤量库存扣减请求,路由到库存很快被扣光的分桶,最终导致库存扣减失败,但是其他分桶实际上还是有库存的。

优化前的代码:

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {//先锁住中心桶库存,避免此时库存发生变化String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String value = SnowflakeIdWorker.getCode();//1.校验是否已经无需扩容了,如果是则快速结束long startTime = System.currentTimeMillis();BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶库存的库存Integer residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);log.info(bucketCapacity.getBucketNo() + "处理扩容消耗时间{}", System.currentTimeMillis() - startTime);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}//校验本次请求是否还需要执行扩容处理private BucketCapacityContext checkBucketCapacity(BucketCapacity bucketCapacity) {String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();//1.获取远程的分桶缓存Integer residueNum = getBucketInventoryNum(bucketCapacity.getBucketNo());//2.获取缓存元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//3.校验是否还需要执行扩容List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();for (BucketCacheBO bucketCacheBO : availableList) {//具体使用的是哪个分桶进行扣减库存if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {//触发回源比例的百分比Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();//当前分桶的分配总库存Integer bucketNum = bucketCacheBO.getBucketNum();int backSourceNum = bucketNum * backSourceProportion / 100;//回源比例的库存 大于剩余的库存,触发异步扩容return new BucketCapacityContext(residueNum, backSourceNum > residueNum, bucketCapacity);}}//如果不在可用列表里面,则意味已下线,快速结束掉return new BucketCapacityContext(residueNum, false, bucketCapacity);}//校验当前分桶是否触发下线的阈值private void checkBucketOffline(BucketCapacity bucketCapacity) {//1.获取当前分桶的配置信息String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//2.检测分桶的库存是否触发下线阈值,先获取当前分桶的具体库存以及下线配置阈值Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue();Integer inventoryNum = getBucketInventoryNum(bucketCapacity.getBucketNo());//3.如触发下线,发送消息调用分桶下线if (thresholdValue > inventoryNum) {log.info("触发下线{},阈值{},当前库存值{}", thresholdValue > inventoryNum, thresholdValue, inventoryNum);sendAsynchronous(bucketCapacity);}}...
}

(2)解决⽅案

一.在分布式锁外就开始验证该扩容请求是否会触发分桶下线操作

当出现⼤量的扣减库存请求,那么每个分桶都会多次触发需要进行扩容。是否触发分桶下线的检查可以在锁外部提前进行,从而提升分桶下线的触发效率。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {long startTime = System.currentTimeMillis();//获取中心桶的剩余库存Integer residueNum = getCenterStock(bucketCapacity);if (residueNum <= 0) {//中心桶无剩余库存,检查是否触发下线checkBucketOffline(bucketCapacity);return;}//判断本次扩容的分桶,是否有多次扩容失败的情况String failNum = tairCache.get(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCapacity.getBucketNo());if (StringUtils.isNotBlank(failNum) && Integer.parseInt(failNum) >= 2) {//当前分桶扩容失败次数超过两次了,直接放弃这次扩容//因为失败太多并且还继续去尝试,会持续的扣减中心桶库存,可能会导致其他可以正常扩容的分桶,没有中心桶库存可以扣减return;}//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//先锁住中心桶库存,避免此时库存发生变化String key = buildBucketLockKey(bucketCapacity.getSellerId(), bucketCapacity.getSkuId());String value = SnowflakeIdWorker.getCode();//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶库存的库存residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);log.info(bucketCapacity.getBucketNo() + "处理扩容消耗时间{}", System.currentTimeMillis() - startTime);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}...
}

二.动态下线阈值计算

模板默认中会配置⼀个分桶的下线阈值。同一个分桶,面对不同流量的访问,会有不同的库存扣减速度。所以,一个分桶应该根据具体的流量来选择对应的下线阈值。从而减少由于大量并发扣减请求而导致的库存扣减失败情况。

为了计算出动态的下线阈值:可以启动一个定时任务调度,每隔5秒检查⼀下库存的扣减速度,根据可⽤分桶的数量得出⽬前单个分桶扣减库存的速度。分桶的下线阈值会根据实际流量⽽变化,分桶的最⼩下线阈值不能低于模板配置的下线阈值,分桶的最⾼下线阈值不能超过当前模板配置的下线阈值⽐例。例如分桶深度30000,下线阈值最⼤⽐例10%,最⼤下线阈值则为3000。

注意:InventoryBucketCache会缓存所有在售商品的库存分桶元数据信息。如果在售商品很多,可能需要考虑是否会OOM。

@Component
@Data
public class InventoryBucketCache {//本地缓存@Autowiredprivate Cache cache;...//获取本地缓存的所有分桶元数据public List<BucketLocalCache> getBucketLocalCacheList() {ConcurrentMap concurrentMap = cache.asMap();List<BucketLocalCache> bucketLocalCacheList = new ArrayList<BucketLocalCache>(concurrentMap.size());for (Object key : concurrentMap.keySet()) {Object o = concurrentMap.get(key);if (!Objects.isNull(o)) {if (o instanceof BucketLocalCache) {BucketLocalCache bucketLocalCache = (BucketLocalCache) o;bucketLocalCacheList.add(bucketLocalCache);}}}return bucketLocalCacheList;}...
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {@Resourceprivate CalculateThresholdQueue calculateThresholdQueue;...//计算分桶某个时间区间的库存具体消费速度,生成预估的下线阈值@Overridepublic void calculateOfflineThreshold() {//先获取到目前缓存的所有分桶元数据List<BucketLocalCache> bucketLocalCacheList = inventoryBucketCache.getBucketLocalCacheList();if (!CollectionUtils.isEmpty(bucketLocalCacheList)) {//将分桶元数据加入到队列中,计算出每个分桶的预估下线阈值for (BucketLocalCache bucketLocalCache : bucketLocalCacheList) {calculateThresholdQueue.offerByRoundRobin(bucketLocalCache);}}}...
}//用于计算分桶下线阈值的计算队列
@Component
public class CalculateThresholdQueue {//计算队列列表private final List<BlockingQueue> calculateQueue = new ArrayList<>();@Resourceprivate TairCache tairCache;//处理下一个分桶元数据的计算队列在队列列表中的下标private AtomicInteger index = new AtomicInteger();//配置的计算队列数量@Value("${calculate.threshold-num:32}")private Integer thresholdQueueNum;@PostConstructpublic void init() {ExecutorService executors = Executors.newFixedThreadPool(thresholdQueueNum);for (int i = 0; i < thresholdQueueNum; i++) {//设置一个队列最大容纳数量BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);calculateQueue.add(blockingQueue);executors.execute(new CalculateThresholdRunner(blockingQueue, tairCache));}}//将分桶元数据提交到对应的计算队列public boolean offerByRoundRobin(Object object) {index.compareAndSet(thresholdQueueNum * 10000, 0);boolean offer = calculateQueue.get(index.getAndIncrement() % thresholdQueueNum).offer(object);return offer;}
}

分桶下线阈值的动态计算逻辑如下:

//多线程消费计算队列里的分桶元数据,来计算分桶下线阈值
public class CalculateThresholdRunner implements Runnable {//处理的计算队列private BlockingQueue blockingQueue;private TairCache tairCache;public CalculateThresholdRunner(BlockingQueue blockingQueue, TairCache tairCache) {this.blockingQueue = blockingQueue;this.tairCache = tairCache;}//内部线程计算每个SKU的缓存信息@Overridepublic void run() {try {while (true) {BucketLocalCache bucketLocalCache = (BucketLocalCache) blockingQueue.take();String currentDate = DateFormatUtil.formatDateTime();//获取可用列表,不可用列表默认触发下线,库存暂不考虑计算入内,否则增加性能开销List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();if (CollectionUtils.isEmpty(availableList)) {return;}//获取可用列表的分桶缓存集合keyList<String> cacheKeyList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());//批量获取的分桶库存数量List<String> bucketNumList = tairCache.mget(cacheKeyList);//构建中心桶的剩余库存keyString sellerInventoryKey = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());String inventoryNum = tairCache.get(sellerInventoryKey);//计算SKU所有分桶剩余的库存Integer residueNum = 0;if (!StringUtils.isEmpty(inventoryNum)) {residueNum = residueNum + Integer.parseInt(inventoryNum);}//合并累计相加,得到当前商品SKU的总库存for (String bucketNum : bucketNumList) {if (!StringUtils.isEmpty(bucketNum)) {residueNum = residueNum + Integer.parseInt(bucketNum);}}//获取之前缓存的商品SKU库存下线阈值信息,计算差集得到实际库存消费速度String key = buildSellerInventoryResidueKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//上次计算存储的库存实际值以及查询时间,用于计算均匀的每秒库存消费String oldCalculateInventory = tairCache.get(key);//默认为当前配置的分桶下线阈值Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue();if (!StringUtils.isEmpty(oldCalculateInventory)) {CalculateInventory calculateInventory = JsonUtil.json2Object(oldCalculateInventory, CalculateInventory.class);//计算上一次的库存 减去此次的库存,得到这个时间点消耗了多少库存int diffNum = calculateInventory.getOldResidueNum() - residueNum;//上一次计算的创建时间String createDate = calculateInventory.getCreateDate();//当前时间减去上一次的计算的创建时间,得到间隔时间差,再通过差集的库存除以秒,得到每秒平均的消耗库存long consumeStock = diffNum / (Long.parseLong(currentDate) - Long.parseLong(createDate));if (consumeStock > 0) {//每秒消耗的库存 / 当前存活的分桶数量,得到目前分桶的下线阈值Long newThresholdValue = consumeStock / availableList.size();//这里计算的下线阈值,最小值不能低于配置的最小阈值if (newThresholdValue > thresholdValue) {thresholdValue = newThresholdValue.intValue();//阈值的最大值,不能超过库存深度的10%比例int maxDepthNum = bucketLocalCache.getInventoryBucketConfig().getMaxDepthNum() / bucketLocalCache.getInventoryBucketConfig().getThresholdProportion();if (thresholdValue > maxDepthNum) {thresholdValue = maxDepthNum;}}log.info("预估的分桶下线阈值{},实际使用的分桶下线阈值{}", newThresholdValue, thresholdValue);}}//存储该商品SKU的预估下线阈值String thresholdKey = buildSellerInventoryThresholdKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());tairCache.set(thresholdKey, String.valueOf(thresholdValue), 0);//存储该商品SKU这次计算的库存和时间CalculateInventory calculateInventory = new CalculateInventory();calculateInventory.setOldResidueNum(residueNum);calculateInventory.setCreateDate(DateFormatUtil.formatDateTime());tairCache.set(key, JSONObject.toJSONString(calculateInventory), 0);}} catch (Exception e) {log.error("处理库存分桶下线阈值异常", e);}}//中心桶库存的keyprivate String buildSellerInventoryKey(String sellerId, String skuId) {return TairInventoryConstant.SELLER_INVENTORY_PREFIX + sellerId + skuId;}//计算商品SKU库存下线阈值的相关信息的keyprivate String buildSellerInventoryResidueKey(String sellerId, String skuId) {return TairInventoryConstant.SELLER_INVENTORY_RESIDUE_PREFIX + sellerId + skuId;}//商品SKU库存下线阈值的keyprivate String buildSellerInventoryThresholdKey(String sellerId, String skuId) {return TairInventoryConstant.SELLER_INVENTORY_THRESHOLD_PREFIX + sellerId + skuId;}
}

6.多个分桶同时触发下线的并发场景优化

(1)优化原因

(2)解决⽅案

(1)优化原因

首先需要明确几点:

一.库存分桶下线的时机

并非没有库存才下线,而是触发下线阈值就下线。

二.库存分桶下线后

其剩余的库存会返还到中心桶剩余库存。

三.库存分桶扩容时

会从中心桶获取剩余库存进行扩容。

四.进行库存分桶扣减时

会先处理扣减、然后检查扩容、最后检查下线。

五.在对分桶进行下线处理时

会通过发送消息进行异步下线。

当某个商品SKU出现大量的并发扣减库存请求时,可能其中一些扣减请求路由到分桶1234进行库存扣减处理,另外一些扣减请求路由到分桶4567进行库存扣减处理。路由到分桶1234的扣减请求,没法扩容,同时触发了分桶下线。在进行分桶1234下线时,这些分桶其实还有一些库存,可供扣减。也就是说,如果分桶1234下线成功,那么紧接其后路由到分桶4567的扣减请求,则可以触发扩容。

但是分桶扩容后上线和分桶下线,都会竞争同一把分布式锁。即用分布式锁来保证分桶的上线和下线不会覆盖分桶元数据的变更,从而导致即便分桶下线成功,下线分桶的剩余库存返还给中心桶。分桶上线也由于等待锁不能及时将返还的库存及时添加到还没下线的分桶。

所以在并发分桶下线 + 分桶扩容的场景下,虽然竞争到锁的⼏个分桶成功地快速下线了,但可能会导致很多请求访问到没法及时扩容的、快没库存的分桶上,从而导致很多扣减请求出现库存不⾜、扣减失败的问题,而实际上中心桶还是有剩余库存的。所以对于⾯临需要下线的分桶,需要最⼩粒度的锁,来实现快速下线。

优化前的代码:

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶下线接口@Overridepublic void bucketOffline(InventorOfflineRequest request) {long start = System.currentTimeMillis();//1.验证入参必填checkInventorOfflineParams(request);//过滤只有一个分桶的无效请求Boolean isOffline = checkBucketOffline(request);if (isOffline) {//2.注意这里需要锁定中心桶库存String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来updateBucket(request);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}log.info("分桶下线处理时间,request:{}, lock:{}, time:{}", JSON.toJSONString(request), lock, System.currentTimeMillis() - start);}}...
}

(2)解决⽅案

一.减少分桶下线的锁粒度

锁不再是卖家 + 商品的维度,⽽是卖家 + 商品 + 下线分桶的维度,增加不同分桶下线的并发执⾏速度。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶下线接口@Overridepublic void bucketOffline(InventorOfflineRequest request) {//1.验证入参必填checkInventorOfflineParams(request);//过滤只有一个分桶的无效请求Boolean isOffline = checkBucketOffline(request);if (isOffline) {long start = System.currentTimeMillis();//2.注意这里需要锁定 下线分桶的变更,这个接口默认一次只有一个分桶String key = buildBucketOfflineLockKey(request.getSellerId(), request.getSkuId(), request.getBucketNoList().get(0));String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来updateBucketCache(request);log.info("分桶下线处理时间,下线分桶:{}, 当前时间:{}, 耗时:{}", JSON.toJSONString(request.getBucketNoList()), DateFormatUtil.formatDateTime(new Date()), System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}}...
}

二.分桶下线并发更新元数据要避免脏数据覆盖

因为优化后采取的是单个分桶去验证分桶下线处理,覆盖的是整个商品SKU的分桶元数据信息,这⾥的顺序已⽆法保证了,不处理直接简单覆盖可能会造成已下线的分桶重复错误上线。

所以在更新各个分桶的元数据,包括⼴播消息消费更新本地元数据时,不能简单进⾏元数据覆盖,⽽是要diff下线的分桶,覆盖对应的分桶数据。而且因为覆盖本地元数据,涉及到分布式本地缓存,在更新自己的同时,还需要处理其它机器接受处理的分桶元数据更新。所以还需要对商品更新元数据的操作进⾏本地内存级别锁的处理。

@Component
@Data
public class InventoryBucketCache {@Autowiredprivate Cache cache;@Resourceprivate TairCache tairCache;...//本地存储关于分桶信息@CacheRefresh(cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE)public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();synchronized (bucketLocalKey.intern()) {log.info("保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));BucketLocalCache bucketCache = getTairBucketCache(bucketKey);log.info("远程缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache));//如果本地缓存没有就直接写入if (Objects.isNull(bucketCache)) {setBucketCache(bucketKey, bucketLocalCache);cache.put(bucketKey, bucketLocalCache);return;}//本地缓存的元数据覆盖,考虑到是并发执行的,这里需要上内存级别的锁,并进行diff处理if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) {diffCacheOnline(bucketCache, bucketLocalCache);} else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) {diffCacheOffline(bucketCache, bucketLocalCache);}setBucketCache(bucketKey, bucketCache);cache.put(bucketKey, bucketCache);log.info("实际保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache));}}//处理原有元数据和更新元数据的下线分桶的处理//@param bucketCache      原始本地缓存元数据//@param bucketLocalCache 新的元数据public void diffCacheOffline(BucketLocalCache bucketCache, BucketLocalCache bucketLocalCache) {if (Objects.isNull(bucketCache) || Objects.isNull(bucketLocalCache)) {return;}//原始的已下线分桶元数据列表List<BucketCacheBO> oldUndistributedList = bucketCache.getUndistributedList();//新的已下线分桶元数据列表List<BucketCacheBO> newUndistributedList = bucketLocalCache.getUndistributedList();List<BucketCacheBO> diffUndistributedList = null;//转换一个集合为MAP,用于计算差集的下线分桶,主要是看新的下线分桶里面有没有比旧的多if (CollectionUtils.isEmpty(oldUndistributedList)) {diffUndistributedList = newUndistributedList;} else {Map<String, BucketCacheBO> bucketCacheBOMap = oldUndistributedList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));//处理新的下线分桶里面 是否更新了一批新的下线分桶,并和原有的元数据下线分桶比较,看哪些是新增的if (!CollectionUtils.isEmpty(newUndistributedList)) {diffUndistributedList = new ArrayList<>();for (BucketCacheBO bucketCacheBO : newUndistributedList) {if (!bucketCacheBOMap.containsKey(bucketCacheBO.getBucketNo()) && !StringUtils.isEmpty(bucketCacheBO.getBucketNo())) {diffUndistributedList.add(bucketCacheBO);}}}}Map<String, BucketCacheBO> availableMap = bucketCache.getAvailableList().stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));//产生变化的 元数据下线分桶if (!CollectionUtils.isEmpty(diffUndistributedList)) {//处理下线的分桶for (BucketCacheBO bucketCacheBO : diffUndistributedList) {//最少保留一个分桶if (availableMap.size() > 1) {//获取分桶下线触发的时间log.info("本地元数据发生变更,分桶编号[{}]下线,下线时间{}", bucketCacheBO.getBucketNo(), DateFormatUtil.formatDateTime());if (availableMap.containsKey(bucketCacheBO.getBucketNo()) && !StringUtils.isEmpty(bucketCacheBO.getBucketNo())) {availableMap.remove(bucketCacheBO.getBucketNo());//增加下线的分桶数据bucketCache.getUndistributedList().add(bucketCacheBO);}}}//从上线分桶中移除List<BucketCacheBO> availableList = availableMap.values().stream().collect(Collectors.toList());bucketCache.setAvailableList(availableList);}bucketCache.setVersion(bucketLocalCache.getVersion());}...
}

7.⾼并发下分桶被全部下线如何修复

(1)优化原因

(2)解决⽅案

(1)优化原因

由于前面为了提⾼分桶下线的性能,会对可⽤分桶保留1个。但是因为⾼并发下还是有可能存在分桶全部下线,为了保证还有⼀个可⽤的分桶永远不下线,需要有⼀个兜底的分桶。

(2)解决⽅案

一.当所有的可⽤分桶都被扣减完时,可以使⽤中⼼桶库存来替代

构建扣减上下⽂对象时,如果发现没有⼀个可⽤的上线分桶。那么可选择中⼼桶来进⾏扣减尝试,避免此时⽆法扣减库存。

@Service
public class InventoryServiceImpl implements InventoryService {...//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();if (!CollectionUtils.isEmpty(availableList)) {//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过运算得到本次访问所需要定位的分桶int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶//全部分桶都作为备份,就是合并扣减的实现了for (int i = 0; i < 2; i++) {//填充2个作为备份,如果超过下标则从0开始继续取int num = index + i;if (num >= availableList.size()) {num = 0;}BucketCacheBO bucketCache = availableList.get(num);context.getAvailableList().add(bucketCache);}} else {//并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上)BucketCacheBO bucketCacheBO = new BucketCacheBO();bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()));//中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大的锁//所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在bucketCacheBO.setBucketNum(0);context.getAvailableList().add(bucketCacheBO);//发送消息异步刷新分布式本地缓存的消息bucketRefreshProducer.sendBucketOffline(request);}Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum());String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());context.setInventoryDetailKey(inventoryDetailKey);inventoryBucketCache.threadLocalRemove();return context;}...
}

二.异步刷新分布式库存服务机器的本地缓存

当扣减的上下⽂对象中存在中⼼桶作为为分桶进行扣减时,需要发送⼀个消息,异步刷新分布式库存服务机器的本地缓存,避免各分布式库存服务机器的本地缓存可能不⼀致。

//刷新本地缓存的分桶元数据,从而让分布式库存服务的本地缓存一致
@Component
public class BucketRefreshListener implements MessageListenerConcurrently {@Autowiredprivate Cache cache;@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairLock tairLock;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());InventoryRequest inventoryRequest = JsonUtil.json2Object(msg, InventoryRequest.class);//锁住这个商品的本地缓存同步,每次只会处理一个本地缓存元数据String key = inventoryRequest.getSellerId() + inventoryRequest.getSkuId();String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(TairInventoryConstant.SELLER_SYNC_BUCKET_PREFIX + key, value);if (lock) {try {String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + key;//远程缓存BucketLocalCache bucketCache = inventoryBucketCache.getTairBucketCache(bucketLocalKey);//本地缓存BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//以远程缓存为准进行本地缓存覆盖(缓存最少需要一个分桶列表,远程缓存没有就需要保证增加一个可用分桶)BucketLocalCache bucketLocalCache1 = inventoryBucketCache.diffRefreshCache(bucketLocalCache, bucketCache);//比较远程缓存和本地分桶是否一致if (!Objects.isNull(bucketLocalCache1)) {//覆盖本地缓存cache.put(bucketLocalKey, bucketLocalCache1);//更新远程缓存inventoryBucketCache.setBucketCache(bucketLocalKey, bucketLocalCache1);}} catch (Exception e) {log.error("consume error, 同步刷新本地缓存的分桶元数据失败", e);//失败不重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} finally {tairLock.unlock(key, value);}}}} catch (Exception e) {log.error("consume error, 刷新本地缓存的分桶元数据失败", e);//失败不重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Component
@Data
public class InventoryBucketCache {@Autowiredprivate Cache cache;@Resourceprivate TairCache tairCache;...//返回一个正确的可用分桶元数据对象//@param bucketCache      原始本地缓存元数据//@param bucketLocalCache 新的元数据public BucketLocalCache diffRefreshCache(BucketLocalCache bucketCache, BucketLocalCache bucketLocalCache) {if (Objects.isNull(bucketCache) || Objects.isNull(bucketLocalCache)) {return null;}//本地的上线分桶元数据列表List<BucketCacheBO> oldAvailableList = bucketCache.getAvailableList();//远程的已上线分桶元数据列表List<BucketCacheBO> newAvailableList = bucketLocalCache.getAvailableList();if (!CollectionUtils.isEmpty(oldAvailableList)) {Map<String, BucketCacheBO> bucketCacheBOMap = oldAvailableList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));//验证本地的可用分桶列表和远程缓存的分桶列表差异,并处理保证缓存统一if (!CollectionUtils.isEmpty(newAvailableList)) {for (BucketCacheBO bucketCacheBO : newAvailableList) {//如果有任意一个可用分桶和远程不一致,则统一以远程为准if (!bucketCacheBOMap.containsKey(bucketCacheBO.getBucketNo())) {return bucketLocalCache;}}}//数据一致,不处理return null;} else {//本地远程分桶可用列表为空,远程缓存可用列表也为空if (CollectionUtils.isEmpty(newAvailableList)) {//从不可用列表选择一个分桶作为可用分桶使用List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList();bucketLocalCache.getAvailableList().add(undistributedList.get(0));bucketLocalCache.getUndistributedList().remove(0);return bucketLocalCache;} else {//远程缓存有可用分桶,直接使用远程缓存覆盖本地缓存return bucketLocalCache;}}}...
}

8.优化后的库存SQL

CREATE TABLE `inventory_bucket_config` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`bucket_num` int(10) NOT NULL DEFAULT '0' COMMENT '分桶数量',`max_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼤库存深度',`min_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼩库存深度',`threshold_value` int(10) NOT NULL DEFAULT '0' COMMENT '分桶下线阈值',`threshold_proportion` int(10) DEFAULT NULL COMMENT '分桶下线动态⽐例',`back_source_proportion` int(10) NOT NULL DEFAULT '0' COMMENT '回源⽐例,从1-100设定⽐例',`back_source_step` int(10) NOT NULL DEFAULT '0' COMMENT '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',`template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',`is_default` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';CREATE TABLE `inventory_allot_detail` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',`inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号',`seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',`inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量',`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';CREATE TABLE `inventory_deduction_detail` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id',`refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号',`inventory_num` int(10) NOT NULL DEFAULT '0' COMMENT '扣减库存数量',`sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId',`seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',`bucket_no` int(10) NOT NULL COMMENT '扣减分桶编号',`deduction_type` int(2) NOT NULL COMMENT '库存操作类型(10库存扣减,20库存退货)',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';-- 分布式库存扣减版本新增
CREATE TABLE `inventory_bucket_operate` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,`operate_id` varchar(32) NOT NULL COMMENT '操作id',`seller_id` varchar(64) NOT NULL COMMENT '卖家id',`sku_id` varchar(64) NOT NULL COMMENT '商品sku',`operate_type` tinyint(3) NOT NULL COMMENT '操作类型:1-初始化,2-增加库存,3-分桶上线,4-分桶扩容,5-分桶下线',`bucket` text COMMENT '分桶变动信息',`inventory_num` int(11) DEFAULT NULL COMMENT '变动库存',`feature` text COMMENT '扩展信息',`operate_status` tinyint(4) DEFAULT '0' COMMENT '操作状态',`del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记',`create_user` int(11) DEFAULT NULL COMMENT '创建⼈',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_user` int(11) DEFAULT NULL COMMENT '更新⼈',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=58 DEFAULT CHARSET=utf8mb4 COMMENT='库存分桶操作表';CREATE TABLE `inventory_operate_fail` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`operate_id` varchar(32) NOT NULL COMMENT '操作id',`fail_type` varchar(32) DEFAULT NULL COMMENT '操作类型',`bucket_no` varchar(32) DEFAULT NULL COMMENT '分桶编号',`inventory_num` int(11) DEFAULT NULL COMMENT '变动库存数量',`del_flag` tinyint(1) DEFAULT NULL COMMENT '删除标识',`create_user` int(11) DEFAULT NULL COMMENT '创建⼈',`create_time` datetime   DEFAULT NULL COMMENT '创建时间',`update_user` int(11) DEFAULT NULL COMMENT '更新⼈',`update_time` datetime   DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存操作失败记录表';

9.其他的优化

(1)对库存扣减明细消息的处理

(2)分桶数量的处理

(3)合理进行库存缩减

(1)对库存扣减明细消息的处理

库存扣减明细的消息是通过异步来进行发送的,如果异步发送消息失败了,则会导致消息丢失。

考虑增加⼀个队列接收异步发送失败的消息。通过每秒⼀次刷⼊的⽅式,将写队列的数据转换到读队列后再进⾏写⼊,然后再清除读队列。队列中的数据,可先顺序写本地⽂件,保证机器宕机数据不丢失。

(2)分桶数量的处理

分桶的数量,⼀般是最开始初始化添加库存的时候⽣成到对应缓存分⽚中的。但对于不同的场景,分桶需要的数量是不⼀样的。比如1000库存分散到32个分桶比较合理,1万库存还是分散到32个分桶吗?或者库存只有20,还要⽤32个分桶吗?

所以分桶的数量并⾮初始化后就永远不变的。库存分桶也需要根据⼀些规则或者⼈⼯调整,进⾏分桶的增加和减少。

(3)合理进行库存缩减

比如已分配了分桶5000库存,但此时要减去2000库存。应该怎么从已上线分配库存的分桶⾥⾯对库存进⾏合理的减少。

http://www.dtcms.com/a/266271.html

相关文章:

  • element-plus按需自动导入的配置 以及icon图标不显示的问题解决
  • Ubuntu 22.04 + MySQL 8 无密码登录问题与 root 密码重置指南
  • ubuntu22桌面版中文输入法 fcitx5
  • goole chrome变更默认搜索引擎为百度
  • MySQL(116)如何监控负载均衡状态?
  • 如何调节笔记本电脑亮度?其实有很多种方式可以调整亮度
  • Linux中容器文件操作和数据卷使用以及目录挂载
  • Oracle CTE递归实现PCB行业的叠层关系
  • 缓存雪崩、穿透、预热、更新与降级问题与实战解决方案
  • 【网络】Linux 内核优化实战 - net.core.flow_limit_table_len
  • 批量剪辑混剪系统源码搭建与定制化开发:支持OEM
  • LeetCode1456. 定长子串中元音的最大数目
  • Acrel-1000系列分布式光伏监控系统在湖北荆门一马光彩大市场屋顶光伏发电项目中应用
  • 在数学中一个实对称矩阵的特性分析
  • 每天一个前端小知识 Day 21 - 浏览器兼容性与 Polyfill 策略
  • Web 项目如何自动化测试?
  • 大语言模型预训练数据——数据采样方法介绍以GPT3为例
  • 银河麒麟V10服务器版 + openGuass + JDK +Tomcat
  • 基于FPGA的一维序列三次样条插值算法verilog实现,包含testbench
  • 类图+案例+代码详解:软件设计模式----原型模式
  • 【网络与系统安全】域类实施模型DTE
  • 【AI总结】Git vs GitHub vs GitLab:深度解析三者联系与核心区别
  • 篇二 OSI七层模型,TCP/IP四层模型,路由器与交换机原理
  • 花尖墨 Web3 水果品牌白皮书
  • 【牛客算法】小苯的数字权值
  • Apache组件遭大规模攻击:Tomcat与Camel高危RCE漏洞引发数千次利用尝试
  • 基于Simulink的二关节机器人独立PD控制仿真
  • Java泛型笔记
  • 【Unity 编辑器工具开发:GUILayout 与 EditorGUILayout 对比分析】
  • 【阿里巴巴JAVA开发手册】IDE的text file encoding设置为UTF-8; IDE中文件的换行符使用Unix格式,不要使用Windows格式。