当前位置: 首页 > news >正文

订单初版—6.生单链路实现的重构文档

大纲

1.库存服务的数据库与缓存双写的实现

2.如何发起异构存储下的Seata TCC分布式事务

3.异构存储下的Seata TCC分布式事务原理

4.生单链路锁定库存的Seata TCC分布式事务实现

5.库存服务异构存储双写TCC异常处理

6.库存服务TCC事务的空悬挂问题

7.库存服务TCC二阶段重试的幂等问题

8.假设使用异步锁库存方案可能会导致的几种问题

9.生单链路的AT + TCC混合事务方案流程总结

10.生单链路非分布式事务的纯补偿方案

1.库存服务的数据库与缓存双写的实现

(1)生单链路使用Seata AT分布式事务的原理流程图

(2)生单链路Seata AT模式下的并发问题分析

(3)生单链路如何解决库存全局锁争用问题

(4)库存服务双写数据库 + 缓存的实现

(1)生单链路使用Seata AT分布式事务的原理流程图

库存服务写库存时,通常都会进行数据库 + 缓存的双写处理。写缓存是库存服务写库存的一个常规性操作,因为需要支撑高并发的库存扣减。

(2)生单链路Seata AT模式下的并发问题分析

生单链路中的分布式事务环节在于:锁定优惠券 + 锁定库存 + 生成订单。

一.在锁定优惠券环节

每个用户都会有属于自己的优惠券。日常情况下,都是不同的用户使用不同的优惠券购买商品,所以并不会出现并发获取同一优惠券数据的全局锁的情况。

二.在锁定库存环节

对于爆品或秒杀,大量用户可能都会基于某商品进行下单扣减库存,因此会出现并发获取同一个SKU数据的全局锁。

第一个获取到某SKU数据的全局锁的事务,在进行生成订单环节由于需要插入多条SQL,所以可能会比较耗时,从而导致并发等待获取该SKU数据的全局锁的其他事务等待时间过长。

(3)生单链路如何解决库存全局锁争用问题

一.问题分析

一个商品SKU就对应一条库存数据记录。如果大量用户同时购买一个商品SKU,必然导致:多个分布式事务都去竞争和等待同一个SKU库存数据的全局锁。

二.解决方案

方案一:库存分桶方案

例如对库存表进行库存分桶。一般一个SKU就一条库存数据,该方案下一个SKU会有多条库存数据。比如1万的库存可分为1000条库存数据,每条库存数据可扣库存为10。每次扣减库存时,按照一定的规则和算法,选择一个库存分桶去扣减。

方案二:RocketMQ柔性事务方案

通过RocketMQ柔性事务方案来替换掉Seata刚性事务方案。在互联网公司里,一般的业务系统,都是使用RocketMQ柔性事务。大多情况下,RocketMQ柔性事务都能确保数据是一致的。

刚性事务:分支事务出现异常或者失败,则全局回滚。柔性事务:分支事务出现异常或者失败,则不断重试消费,直到成功。使用RocketMQ柔性事务方案,需要确保消息能被投递到RocketMQ。

方案三:使用没有全局锁的分布式事务方案

Seata支持AT、TCC、Saga、XA这几种事务方案。生单链路的建议是:锁定营销使用AT模式 + 锁定库存使用TCC模式的混合分布式事务方案。

三.生单链路中锁库存的技术方案重构

为了提升生单链路的性能,避免扣减库存时出现大量的全局锁争用。锁定库存使用Seata的TCC模式,纳入到全局事务中。而且为了让写库存时双写数据库 + 缓存的数据一致性,也可以用TCC模式实现。

四.存在异构存储的服务解决数据一致性问题的方案

使用Seata分布式事务的TCC模式。

(4)库存服务双写数据库 + 缓存的实现

@RestController
@RequestMapping("/inventory")
public class InventoryController {@Autowiredprivate InventoryService inventoryService;//新增商品库存@PostMapping("/addProductStock")public JsonResult<Boolean> addProductStock(@RequestBody AddProductStockRequest request) {inventoryService.addProductStock(request);return JsonResult.buildSuccess(true);}...
}@Service
public class InventoryServiceImpl implements InventoryService {...@Overridepublic Boolean addProductStock(AddProductStockRequest request) {log.info("新增商品库存:request={}", JSONObject.toJSONString(request));//1.校验入参checkAddProductStockRequest(request);//2.查询商品库存ProductStockDO productStock = productStockDAO.getBySkuCode(request.getSkuCode());ParamCheckUtil.checkObjectNull(productStock, InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_EXISTED_ERROR);//3.添加Redis锁,防并发String lockKey = RedisLockKeyConstants.ADD_PRODUCT_STOCK_KEY + request.getSkuCode();Boolean locked = redisLock.lock(lockKey);if (!locked) {throw new InventoryBizException(InventoryErrorCodeEnum.ADD_PRODUCT_SKU_STOCK_ERROR);}try {//4.执行添加商品库存逻辑addProductStockProcessor.doAdd(request);} finally {//5.解锁redisLock.unlock(lockKey);}return true;}...
}@Component
public class AddProductStockProcessor {@Autowiredprivate RedisCache redisCache;@Autowiredprivate ProductStockDAO productStockDAO;//执行添加商品库存逻辑@Transactional(rollbackFor = Exception.class)public void doAdd(AddProductStockRequest request) {//1.构造商品库存ProductStockDO productStock = buildProductStock(request);//2.保存商品库存到MySQLproductStockDAO.save(productStock);//3.保存商品库存到RedisaddStockToRedis(productStock);}//保存商品库存到Redispublic void addStockToRedis(ProductStockDO productStock) {String productStockKey = CacheSupport.buildProductStockKey(productStock.getSkuCode());Map<String, String> productStockValue = CacheSupport.buildProductStockValue(productStock.getSaleStockQuantity(), productStock.getSaledStockQuantity());redisCache.hPutAll(productStockKey, productStockValue);}private ProductStockDO buildProductStock(AddProductStockRequest request) {ProductStockDO productStock = new ProductStockDO();productStock.setSkuCode(request.getSkuCode());productStock.setSaleStockQuantity(request.getSaleStockQuantity());productStock.setSaledStockQuantity(0L);return productStock;}
}public interface CacheSupport {String PREFIX_PRODUCT_STOCK = "PRODUCT_STOCK:";//可销售库存keyString SALE_STOCK = "saleStock";//已销售库存keyString SALED_STOCK = "saledStock";//构造缓存商品库存keystatic String buildProductStockKey(String skuCode) {return PREFIX_PRODUCT_STOCK + ":" + skuCode;}//构造缓存商品库存valuestatic Map<String, String> buildProductStockValue(Long saleStockQuantity, Long saledStockQuantity) {Map<String, String> value = new HashMap<>();value.put(SALE_STOCK, String.valueOf(saleStockQuantity));value.put(SALED_STOCK, String.valueOf(saledStockQuantity));return value;}
}

2.如何发起异构存储下的Seata TCC分布式事务

一.在双写数据库 + 缓存的入口添加Seata的@GlobalTransactional注解。

二.在写数据库接口和写缓存接口上添加Seata的@TwoPhaseBusinessAction注解。

三.在提供双写数据库 + 库存接口的服务上添加Seata的@LocalTCC注解。

3.异构存储下的Seata TCC分布式事务原理

(1)TCC的核心逻辑

(2)TCC的事务流程

(1)TCC的核心逻辑

TCC的核心逻辑就是:try、commit、cancel。

一.try

会预留一些资源,但实际的动作并没有执行,当然实际应用比如写库中也可以直接执行实际的动作即提交SQL。

二.commit

分支事务执行try都成功后,就会让所有分支事务都执行commit,commit会执行实际的动作。

三.cancel

如果存在分支事务try失败了,那么所有分支事务都要执行cancel。执行cancel时会对预留的资源进行逆向补偿,取消资源预留。

(2)TCC的事务流程

4.生单链路锁定库存的Seata TCC分布式事务实现

(1)生单链路中生成订单到扣减库存的实现

(2)库存服务中扣减库存的双写数据库 + 缓存实现

(1)生单链路中生成订单到扣减库存的实现

createOrder()时锁定优惠券 + 生成订单到数据库会使用Seata的AT模式来实现分布式事务。

@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0)
public class OrderApiImpl implements OrderApi {@Autowiredprivate OrderService orderService;...//提交订单/生成订单接口@Overridepublic JsonResult<CreateOrderDTO> createOrder(CreateOrderRequest createOrderRequest) {try {CreateOrderDTO createOrderDTO = orderService.createOrder(createOrderRequest);return JsonResult.buildSuccess(createOrderDTO);} catch (OrderBizException e) {log.error("biz error", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error", e);return JsonResult.buildError(e.getMessage());}}...
}@Service
public class OrderServiceImpl implements OrderService {@Autowiredprivate OrderManager orderManager;...//提交订单/生成订单接口@Overridepublic CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {//1.入参检查checkCreateOrderRequestParam(createOrderRequest);//2.风控检查checkRisk(createOrderRequest);//3.获取商品信息List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);//4.计算订单价格CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);//5.验证订单实付金额checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);//6.生成订单(包含锁定优惠券、扣减库存等逻辑)createOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);//7.发送订单延迟消息用于支付超时自动关单sendPayOrderTimeoutDelayMessage(createOrderRequest);//返回订单信息CreateOrderDTO createOrderDTO = new CreateOrderDTO();createOrderDTO.setOrderId(createOrderRequest.getOrderId());return createOrderDTO;}//插入订单到数据库private void createOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {//插入订单到数据库orderManager.createOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);}...
}@Service
public class OrderManagerImpl implements OrderManager {@DubboReference(version = "1.0.0", retries = 0)private InventoryApi inventoryApi;//库存服务...//生成订单//由于锁定优惠券不会出现竞争AT模式下的全局锁,所以锁定优惠券+生成订单可以一起使用Seata的AT模式//但扣减库存继续使用Seata的AT模式则会出现竞争全局锁,所以扣减库存使用Seata的TCC模式@Override@GlobalTransactional(rollbackFor = Exception.class)public void createOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {//锁定优惠券lockUserCoupon(createOrderRequest);//扣减库存deductProductStock(createOrderRequest);//生成订单到数据库addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);}//锁定商品库存private void deductProductStock(CreateOrderRequest createOrderRequest) {String orderId = createOrderRequest.getOrderId();List<DeductProductStockRequest.OrderItemRequest> orderItemRequestList = ObjectUtil.convertList(createOrderRequest.getOrderItemRequestList(), DeductProductStockRequest.OrderItemRequest.class);DeductProductStockRequest lockProductStockRequest = new DeductProductStockRequest();lockProductStockRequest.setOrderId(orderId);lockProductStockRequest.setOrderItemRequestList(orderItemRequestList);JsonResult<Boolean> jsonResult = inventoryApi.deductProductStock(lockProductStockRequest);//检查锁定商品库存结果if (!jsonResult.getSuccess()) {throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());}}...
}@DubboService(version = "1.0.0", interfaceClass = InventoryApi.class, retries = 0)
public class InventoryApiImpl implements InventoryApi {@Autowiredprivate InventoryService inventoryService;//扣减商品库存@Overridepublic JsonResult<Boolean> deductProductStock(DeductProductStockRequest deductProductStockRequest) {try {Boolean result = inventoryService.deductProductStock(deductProductStockRequest);return JsonResult.buildSuccess(result);} catch (InventoryBizException e) {log.error("biz error", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error", e);return JsonResult.buildError(e.getMessage());}}...
}

(2)库存服务中扣减库存的双写数据库 + 缓存实现

doDeduct()扣减库存时双写数据库 + 缓存会使用Seata的TCC模式来实现分布式事务。TCC模式特别适合这种多写异构存储的业务,关键的注解是@LocalTCC和@TwoPhaseBusinessAction。

@Service
public class InventoryServiceImpl implements InventoryService {...//扣减商品库存@Overridepublic Boolean deductProductStock(DeductProductStockRequest deductProductStockRequest) {//检查入参checkLockProductStockRequest(deductProductStockRequest);String orderId = deductProductStockRequest.getOrderId();List<DeductProductStockRequest.OrderItemRequest> orderItemRequestList = deductProductStockRequest.getOrderItemRequestList();for (DeductProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {String skuCode = orderItemRequest.getSkuCode();//1.查询MySQL库存数据ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode);if (productStockDO == null) {log.error("商品库存记录不存在,skuCode={}", skuCode);throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR);}//2.查询Redis库存数据String productStockKey = CacheSupport.buildProductStockKey(skuCode);Map<String, String> productStockValue = redisCache.hGetAll(productStockKey);if (productStockValue.isEmpty()) {//如果查询不到Redis库存数据,将MySQL库存数据放入Redis,以MySQL的数据为准addProductStockProcessor.addStockToRedis(productStockDO);}//3.添加Redis锁,防并发String lockKey = MessageFormat.format(RedisLockKeyConstants.DEDUCT_PRODUCT_STOCK_KEY, orderId, skuCode);Boolean locked = redisLock.lock(lockKey);if (!locked) {log.error("无法获取扣减库存锁,orderId={},skuCode={}", orderId, skuCode);throw new InventoryBizException(InventoryErrorCodeEnum.DEDUCT_PRODUCT_SKU_STOCK_ERROR);}try {//4.查询库存扣减日志ProductStockLogDO productStockLog = productStockLogDAO.getLog(orderId, skuCode);if (null != productStockLog) {log.info("已扣减过,扣减库存日志已存在,orderId={},skuCode={}", orderId, skuCode);return true;}Integer saleQuantity = orderItemRequest.getSaleQuantity();Integer originSaleStock = productStockDO.getSaleStockQuantity().intValue();Integer originSaledStock = productStockDO.getSaledStockQuantity().intValue();//5.执行执库存扣减DeductStockDTO deductStock = new DeductStockDTO(orderId, skuCode, saleQuantity, originSaleStock, originSaledStock);deductProductStockProcessor.doDeduct(deductStock);} finally {redisLock.unlock(lockKey);}}return true;}...
}//扣减商品库存处理器
@Component
public class DeductProductStockProcessor {@Autowiredprivate LockMysqlStockTccService lockMysqlStockTccService;@Autowiredprivate LockRedisStockTccService lockRedisStockTccService;@Autowiredprivate ProductStockLogDAO productStockLogDAO;//执行扣减商品库存逻辑,由于createOrder()已经加了@GlobalTransactional注解,这里就不用加了//@GlobalTransactional(rollbackFor = Exception.class)public void doDeduct(DeductStockDTO deductStock) {//1.执行执行MySQL库存扣减,就是执行TCC的tryboolean result = lockMysqlStockTccService.deductStock(null, deductStock);if (!result) {throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR);}//2.执行Redis库存扣减,就是执行TCC的tryresult = lockRedisStockTccService.deductStock(null, deductStock);if (!result) {throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR);}}
}//锁定MySQL库存,Seata TCC模式的Service
@LocalTCC
public interface LockMysqlStockTccService {//一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try//所有分支事务的try执行成功,则所有事务执行commit方法;//存在分支事务的try执行失败,则所有事务执行rollback方法;@TwoPhaseBusinessAction(name = "lockMysqlStockTccService", commitMethod = "commit", rollbackMethod = "rollback")boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commitvoid commit(BusinessActionContext actionContext);//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancelvoid rollback(BusinessActionContext actionContext);
}//锁定Redis库存,Seata TCC模式的Service
@LocalTCC
public interface LockRedisStockTccService {//一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try//所有分支事务的try执行成功,则所有事务执行commit方法;//存在分支事务的try执行失败,则所有事务执行rollback方法;@TwoPhaseBusinessAction(name = "lockRedisStockTccService", commitMethod = "commit", rollbackMethod = "rollback")boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commitvoid commit(BusinessActionContext actionContext);//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancelvoid rollback(BusinessActionContext actionContext);
}

其中,try操作可以是预留资源,也可以是直接执行动作(即等于commit)。比如锁库存,会在try操作中把写数据库或写缓存直接处理了。try操作具体是预留资源还是直接执行,往往会根据业务来决定。

@Service
public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {...@Transactional(rollbackFor = Exception.class)@Overridepublic boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {//actionContext上下文获取全局事务xidString xid = actionContext.getXid();String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();//标识try阶段开始执行TccResultHolder.tagTryStart(getClass(), skuCode, xid);if (isEmptyRollback()) {return false;}log.info("一阶段方法:扣减MySQL销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);int result = productStockDAO.deductSaleStock(skuCode, saleQuantity, originSaleStock);//标识try阶段执行成功if (result > 0) {TccResultHolder.tagTrySuccess(getClass(), skuCode, xid);}return result > 0;}...
}@Service
public class LockRedisStockTccServiceImpl implements LockRedisStockTccService {...@Overridepublic boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {String xid = actionContext.getXid();String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();//标识try阶段开始执行TccResultHolder.tagTryStart(getClass(), skuCode, xid);log.info("一阶段方法:扣减redis销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);if (isEmptyRollback()) {return false;}String luaScript = LuaScript.DEDUCT_SALE_STOCK;String saleStockKey = CacheSupport.SALE_STOCK;String productStockKey = CacheSupport.buildProductStockKey(skuCode);Long result = redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class),Arrays.asList(productStockKey, saleStockKey), String.valueOf(saleQuantity), String.valueOf(originSaleStock));//标识try阶段执行成功if (result > 0) {TccResultHolder.tagTrySuccess(getClass(), skuCode, xid);}return result > 0;}...
}//存储TCC第一阶段执行结果,用于解决TCC幂等,空回滚,悬挂问题
public class TccResultHolder {//标识TCC try阶段开始执行的标识private static final String TRY_START = "TRY_START";//标识TCC try阶段执行成功的标识private static final String TRY_SUCCESS = "TRY_SUCCESS";//保存TCC事务执行过程的状态private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();...//标记try阶段开始执行public static void tagTryStart(Class<?> tccClass, String bizKey, String xid) {setResult(tccClass, bizKey, xid, TRY_START);}//标记try阶段执行成功public static void tagTrySuccess(Class<?> tccClass, String bizKey, String xid) {setResult(tccClass, bizKey, xid, TRY_SUCCESS);}//一个tccClass代表了TCC的一个分支事务public static void setResult(Class<?> tccClass, String bizKey, String xid, String v) {Map<String, String> results = map.get(tccClass);if (results == null) {synchronized (map) {if (results == null) {results = new ConcurrentHashMap<>();map.put(tccClass, results);}}}results.put(getTccExecution(xid, bizKey), v);//保存当前分布式事务id}...
}

5.库存服务异构存储双写TCC异常处理

只要TCC的分支事务在try过程中出现异常,都需要回滚所有分支事务。

如果没有出现异常,则执行commit()方法。如果出现异常,则执行rollback()方法。

//锁定MySQL库存,Seata TCC模式的Service
@LocalTCC
public interface LockMysqlStockTccService {//一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try//所有分支事务的try执行成功,则所有事务执行commit方法;//存在分支事务的try执行失败,则所有事务执行rollback方法;@TwoPhaseBusinessAction(name = "lockMysqlStockTccService", commitMethod = "commit", rollbackMethod = "rollback")boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commitvoid commit(BusinessActionContext actionContext);//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancelvoid rollback(BusinessActionContext actionContext);
}//锁定Redis库存,Seata TCC模式的Service
@LocalTCC
public interface LockRedisStockTccService {//一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try//所有分支事务的try执行成功,则所有事务执行commit方法;//存在分支事务的try执行失败,则所有事务执行rollback方法;@TwoPhaseBusinessAction(name = "lockRedisStockTccService", commitMethod = "commit", rollbackMethod = "rollback")boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commitvoid commit(BusinessActionContext actionContext);//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancelvoid rollback(BusinessActionContext actionContext);
}

写数据库的commit()和rollback():

@Service
public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {@Autowiredprivate ProductStockDAO productStockDAO;@Autowiredprivate ProductStockLogDAO productStockLogDAO;...@Overridepublic void commit(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("二阶段方法:增加mysql已销售库存,deductStock,xid={}", JSONObject.toJSONString(deductStock), xid);//幂等//当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {return;}//1.增加已销售库存productStockDAO.increaseSaledStock(skuCode, saleQuantity, originSaledStock);//2.插入一条扣减日志表log.info("插入一条扣减日志表");productStockLogDAO.save(buildStockLog(deductStock));//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}@Overridepublic void rollback(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);//空回滚处理if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {log.info("mysql:出现空回滚");insertEmptyRollbackTag();return;}//幂等处理//try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚//如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {log.info("mysql:无需回滚");return;}//1.还原销售库存productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);//2.删除库存扣减日志ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode);if (null != logDO) {productStockLogDAO.removeById(logDO.getId());}//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}...
}

写缓存的commit()和rollback():

@Service
public class LockRedisStockTccServiceImpl implements LockRedisStockTccService {@Autowiredprivate RedisCache redisCache;...@Overridepublic void commit(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("二阶段方法:增加redis已销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);//幂等//当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {log.info("已经执行过commit阶段");return;}String luaScript = LuaScript.INCREASE_SALED_STOCK;String saledStockKey = CacheSupport.SALED_STOCK;String productStockKey = CacheSupport.buildProductStockKey(skuCode);redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saledStockKey), String.valueOf(saleQuantity), String.valueOf(originSaledStock));//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}@Overridepublic void rollback(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("回滚:增加redis销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);//空回滚处理if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {log.info("redis:出现空回滚");insertEmptyRollbackTag();return;}//幂等处理//try阶段没有完成的情况下,不必执行回滚//如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {log.info("redis:无需回滚");return;}String luaScript = LuaScript.RESTORE_SALE_STOCK;String saleStockKey = CacheSupport.SALE_STOCK;String productStockKey = CacheSupport.buildProductStockKey(skuCode);redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saleStockKey), String.valueOf(saleQuantity), String.valueOf(originSaleStock - saleQuantity));//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}...
}

6.库存服务TCC事务的空悬挂问题

(1)空悬挂问题

(2)解决空悬挂的思路

(1)空悬挂问题

因为网络延迟等原因,分支事务的rollback()方法可能会比try()方法先执行,即rollback()方法进行了空回滚,然后try()方法才执行,从而导致try()方法预留的资源无法被取消。

(2)解决空悬挂的思路

当rollback()方法出现空回滚时,需要进行标识(如在数据库中查一条记录),然后在try()方法里会判断是否发生了空回滚。

@Service
public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {...//这就是TCC的try@Transactional(rollbackFor = Exception.class)@Overridepublic boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {//actionContext上下文获取全局事务xidString xid = actionContext.getXid();String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();//标识try阶段开始执行TccResultHolder.tagTryStart(getClass(), skuCode, xid);//悬挂问题:rollback接口比try接口先执行,即rollback接口进行了空回滚,try接口才执行,导致try接口预留的资源无法被取消//解决空悬挂的思路:即当rollback接口出现空回滚时,需要打一个标识(在数据库中查一条记录),在try()里判断是否发生了空回滚if (isEmptyRollback()) {return false;}log.info("一阶段方法:扣减MySQL销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);int result = productStockDAO.deductSaleStock(skuCode, saleQuantity, originSaleStock);//标识try阶段执行成功if (result > 0) {TccResultHolder.tagTrySuccess(getClass(), skuCode, xid);}return result > 0;}//判断是否发生的空回滚private Boolean isEmptyRollback() {//需要查询本地数据库,看是否发生了空回滚return false;}//插入空回滚标识private void insertEmptyRollbackTag() {//在数据库插入空回滚的标识}@Overridepublic void rollback(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);//空回滚处理if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {log.info("mysql:出现空回滚");//插入空回滚标识insertEmptyRollbackTag();return;}//幂等处理//try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚//如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {log.info("mysql:无需回滚");return;}//1.还原销售库存productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);//2.删除库存扣减日志ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode);if (null != logDO) {productStockLogDAO.removeById(logDO.getId());}//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}...
}

7.库存服务TCC二阶段重试的幂等问题

如果执行commit失败,Seata Server会让分支事务不断重试commit。如果执行cancel失败,Seata Server会让分支事务不断重试cancel。只要出现重试,就需要保证重试操作的方法是幂等的。

当try开始执行时,会添加标识,表明开启了TCC事务。当标识被移除掉后,则说明commit或cancel执行成功。

重复执行commit或cancel时,通过判断标识是否为空,就能拦截掉重复执行的commit或cancel,从而实现幂等。

@Service
public class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {@Autowiredprivate ProductStockDAO productStockDAO;@Autowiredprivate ProductStockLogDAO productStockLogDAO;...@Overridepublic void commit(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("二阶段方法:增加mysql已销售库存,deductStock,xid={}", JSONObject.toJSONString(deductStock), xid);//幂等//当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {return;}//1.增加已销售库存productStockDAO.increaseSaledStock(skuCode, saleQuantity, originSaledStock);//2.插入一条扣减日志表log.info("插入一条扣减日志表");productStockLogDAO.save(buildStockLog(deductStock));//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}@Overridepublic void rollback(BusinessActionContext actionContext) {String xid = actionContext.getXid();DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class);String skuCode = deductStock.getSkuCode();Integer saleQuantity = deductStock.getSaleQuantity();Integer originSaleStock = deductStock.getOriginSaleStock();Integer originSaledStock = deductStock.getOriginSaledStock();log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);//空回滚处理if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) {log.info("mysql:出现空回滚");insertEmptyRollbackTag();return;}//幂等处理//try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚//如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) {log.info("mysql:无需回滚");return;}//1.还原销售库存productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);//2.删除库存扣减日志ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode);if (null != logDO) {productStockLogDAO.removeById(logDO.getId());}//移除标识TccResultHolder.removeResult(getClass(), skuCode, xid);}...
}//存储TCC第一阶段执行结果,用于解决TCC幂等,空回滚,悬挂问题
public class TccResultHolder {//标识TCC try阶段开始执行的标识private static final String TRY_START = "TRY_START";//标识TCC try阶段执行成功的标识private static final String TRY_SUCCESS = "TRY_SUCCESS";//保存TCC事务执行过程的状态private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();...//判断try阶段是否执行成功public static boolean isTrySuccess(Class<?> tccClass, String bizKey, String xid) {String v = getResult(tccClass, bizKey, xid);if (StringUtils.isNotBlank(v) && TRY_SUCCESS.equals(v)) {return true;}return false;}public static String getResult(Class<?> tccClass, String bizKey, String xid) {Map<String, String> results = map.get(tccClass);if (results != null) {return results.get(getTccExecution(xid, bizKey));}return null;}public static void removeResult(Class<?> tccClass, String bizKey, String xid) {Map<String, String> results = map.get(tccClass);if (results != null) {results.remove(getTccExecution(xid, bizKey));}}...
}

8.假设使用异步锁库存方案可能会导致的几种问题

(1)发送库存扣减消息到MQ失败导致超卖

(2)消费库存扣减消息失败重试时出现重复消费

(3)大量并发扣减请求进来扣库存导致大量扣减失败

假如锁定优惠券 + 生成订单数据写库之后,通过发送库存扣减消息到MQ。后续再由消费者消费MQ的库存扣减消息,完成数据库 + 缓存的双写。通过这种异步锁库存的方式,来实现隔离Seata AT模式和Seata TCC模式。那么就可能会出现如下问题:

(1)发送库存扣减消息到MQ失败导致超卖

比如10个库存,发了11次消息,其中1次失败,那么就会造成10个库存扣完了,但是生成了11个订单。

(2)消费库存扣减消息失败重试时出现重复消费

比如10个库存,发了9次消息,其中一条消息重复消费了两次。这样库存扣完了,但只生成9个订单。

(3)大量并发扣减请求进来扣库存导致大量扣减失败

出现生成订单成功,但是后续出现大量扣库存失败,导致退款。

所以,异步锁库存并不科学,锁库存还是要使用同步。也就是生成订单到数据库 + 锁优惠券 + 锁库存,使用AT模式绑定成刚性事务。但由于锁库存使用了TCC模式,所以锁库存的分支事务不用再竞争全局锁,从而提高了锁库存的并发性能,而且TCC模式也保证了双写数据库 + 缓存的数据一致性。

9.生单链路的AT + TCC混合事务方案流程总结

一.生成订单数据 + 锁定优惠券,使用的是AT模式

订单数据和营销数据通常不需要做异构存储,使用数据库存储即可。往数据库写入订单数据 + 锁优惠券,由于都是与用户关联,所以即使并发情况下也不会出现竞争全局锁。

二.锁库存双写数据库 + 缓存,使用的是TCC模式

库存数据需要异构存储,所以扣减库存时,需要操作数据库 + 缓存。双写数据库 + 缓存会面临数据一致性问题,TCC模式可以保证数据一致性。

锁库存使用TCC模式后,即便出现大量并发请求锁库存,也不需要竞争AT模式下的全局锁了。

10.生单链路非分布式事务的纯补偿方案

也有很多公司并没有使用Seata分布式事务这种比较复杂的技术,而是使用纯补偿方案来实现生单链路。

生单链路纯补偿方案需要引入操作日志来实现补偿检查,锁定优惠券需要有操作日志,锁定库存也需要有操作日志。

无论生单是否成功,都要发送消息到MQ,以生单请求为基础去检查锁定优惠券和锁定库存的操作日志。

如果生单成功,但锁定优惠券或锁定库存的操作日志缺失,则进行锁定优惠券或锁定库存的补偿操作。

如果生单失败,但锁定优惠券或锁定库存的操作日志显示锁定成功,则需要释放优惠券或库存的资源。

http://www.dtcms.com/a/278619.html

相关文章:

  • Vue3 学习教程,从入门到精通,Vue 3 表单控件绑定详解与案例(7)
  • 设计模式--适配器模式
  • PHP password_get_info() 函数
  • 第一章 uniapp实现兼容多端的树状族谱关系图,创建可缩放移动区域
  • 商城系统的架构与功能模块
  • flink 中配置hadoop 遇到问题解决
  • 用Python向PDF添加文本:精确插入文本到PDF文档
  • vue3+uniapp 使用vue-plugin-hiprint中实现打印效果
  • Triton Inference Server 架构与前后处理方案梳理
  • 打破空间边界!Nas-Cab用模块化设计重构个人存储逻辑
  • JAVA进阶--JVM
  • 设备发出、接收数据帧的工作机制
  • 无人机迫降模式模块运行方式概述!
  • 掉线监测-tezos rpc不能用,改为残疾网页监测
  • .net winfrom 获取上传的Excel文件 单元格的背景色
  • 深入浅出Kafka Producer源码解析:架构设计与编码艺术
  • 创客匠人:创始人 IP 打造的破局点,藏在 “小而精” 的需求里
  • React源码3:update、fiber.updateQueue对象数据结构和updateContainer()中enqueueUpdate()阶段
  • 分布式系统中设计临时节点授权的自动化安全审计
  • postgreSQL的sql语句
  • 时序预测 | Pytorch实现CNN-LSTM-KAN电力负荷时间序列预测模型
  • 2025 春秋杯夏季个人挑战赛 Web
  • lesson13:Python的datetime模块
  • 登录校验与异常处理(web后端笔记第三期)
  • NAT原理与实验指南:网络地址转换技术解析与实践
  • 中国AI应用“三分天下”:国企成主力、中小企偏订阅、C端仍在观望
  • 使用axios向服务器请求信息并渲染页面
  • TCP心跳机制详解
  • 【Linux系统】进程切换 | 进程调度——O(1)调度队列
  • 如何在服务器上运行一个github项目