大型微服务项目:听书——多端重复提交订单问题适配器模式实现不同支付方式的选择零钱支付逻辑
14 多端重复提交订单问题&适配器模式实现不同支付方式的选择&零钱支付逻辑
14.1 Redis + 有代表性的Token解决多端重复提交订单问题
-
多端重复提交问题,指的是同一个用户在不同设备(如手机、电脑、平板等)同时或短时间内多次提交同一订单的情况。在
13.9 提交订单接口业务层校验实现
中实现的Redis + Lua 脚本防重方案只能解决单端重复提交,但无法完全解决多端重复提交,原因:- Redis + Lua 脚本防重的核心逻辑:
- 第一次提交:Redis 中存在
orderRepeatSubmitKey
,Lua 脚本删除它并返回1
,允许提交; - 重复提交:Redis 中不存在
orderRepeatSubmitKey
(说明已经提交过,被删除了),Lua 脚本返回0
,抛出异常;
- 第一次提交:Redis 中存在
- 如果用户同时在设备A和设备B提交订单:
- 设备A先执行,删除
orderRepeatSubmitKey
,返回1
,允许提交; - 设备B紧接着执行,由于
orderRepeatSubmitKey
已被删除,返回0
,抛出异常; - 但实际上,由于网络延迟或并发问题,可能出现以下情况:设备A和设备B几乎同时执行
exists
,此时orderRepeatSubmitKey
仍存在,两者都返回1
,导致 两笔订单都被放行(多端重复提交成功);
- 设备A先执行,删除
- 虽然 Lua 脚本是原子性的,但
trade()
和submitOrder()
是两个独立的方法:trade()
生成tradeNo
并写入 Redis(set orderRepeatSubmitKey "1"
);submitOrder()
用 Lua 脚本检查并删除orderRepeatSubmitKey
;- 如果设备A调用
trade()
生成tradeNo
,但还未调用submitOrder()
,此时设备B也调用trade()
生成相同的tradeNo
(虽然概率低,但可能发生),导致两笔订单使用同一个tradeNo
,最终都能提交;
- Redis + Lua 脚本防重的核心逻辑:
-
可以用**Redis + 有代表性的Token(即有代表性的键名)**来解决,只要是同一个商品,让他们的 Token 一致即可;
-
修改结算页展示接口:
/*** 展示结算页** @param tradeVo* @return*/ @Override public OrderInfoVo trade(TradeVo tradeVo) {// ……其它逻辑List<OrderDetailVo> productList = orderInfoVo.getOrderDetailVoList().stream().collect(Collectors.toList());// productList不为空才生成签名if(!CollectionUtils.isEmpty(productList)){return orderInfoVo;}// 签名(Sign),防止订单重复提交// 设置当前时间戳到orderInfoVo对象中,用于后续签名验证orderInfoVo.setTimestamp(System.currentTimeMillis());// 将orderInfoVo对象转换为Map,调用SignHelper.getSign方法生成签名,并将签名设置回orderInfoVo对象String sign = SignHelper.getSign(JSONObject.parseObject(JSONObject.toJSONString(orderInfoVo), Map.class));orderInfoVo.setSign(sign);// 使用MD5加密生成一个代表商品列表的唯一令牌String representativeToken = MD5.encrypt(new String(productList + "")); // 此处最好不要用productList.toString()// 组合用户ID和商品令牌生成一个唯一的Redis键,用于标识订单是否重复提交String orderRepeatSubmitKey = userId + ":" + representativeToken; // 有代表性(代表买的商品内容)// 将生成的键存入Redis,值为"1",表示该订单尚未提交过,用于防止重复提交redisTemplate.opsForValue().set(orderRepeatSubmitKey, "1"); // 表示订单未提交过return orderInfoVo; }
-
修改提交订单接口:
/*** 提交订单* @param orderInfoVo* @return*/ @Override public Map<String, Object> submitOrder(OrderInfoVo orderInfoVo) {// 1.3 订单重复提交的校验// 第一次提交订单之后,未看见系统有响应(可能是网络原因、接口的并发高导致处理慢等),但是用户误以为失败或者没点上,接着又点击一次提交订单,这就导致了重复提交// 接口幂等性保证的解决方案:各种锁机制(分布式锁、本地锁)、MySQL的唯一索引+防重表+本地事务、Redis+Token(常用)// 处理单端重复提交。TODO:多端重复提交如何实现?// 判断Redis中是否存在指定的key(防重提交标识):// 1. 如果key存在(值为1),说明是第一次提交,删除key并返回1// 2. 如果key不存在(值为0),说明已经提交过了,key已经被删除了,是重复提交,返回0 // String orderRepeatSubmitKey = AuthContextHolder.getUserId() + ":" + orderInfoVo.getTradeNo(); // String luaScript = "if redis.call(\"exists\",KEYS[1])\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; // Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList(orderRepeatSubmitKey)); // if (execute == 0) { // throw new ShisanException(201, "订单重复提交"); // }// 处理多端重复提交Long userId = AuthContextHolder.getUserId();List<OrderDetailVo> productList = orderInfoVo.getOrderDetailVoList().stream().collect(Collectors.toList());String representativeToken = MD5.encrypt(new String(productList + ""));String orderRepeatSubmitKey = userId + ":" + representativeToken;Long increment = redisTemplate.opsForValue().increment(orderRepeatSubmitKey);if (increment > 2) {throw new ShisanException(201, "订单重复提交");}// TODO 若不允许购买一个商品多次,需要查询一下订单表是否有该商品的记录。若允许则忽略此步骤// TODO 若用户提交订单后未支付,再次购买相同商品时,应提示“订单未支付”(具体要看用户支付流程是怎样的,不一定需要这一步)// 2.保存订单信息// 3.生成订单编号// 4.判断支付方式// 5.订单编号返回Map<String, Object> map = new HashMap<>();map.put("orderNo", "");redisTemplate.delete(orderRepeatSubmitKey);return map; }
14.2 保存订单信息
-
修改:
/*** 保存订单信息* @param orderInfoVo* @param userId* @param orderNo* @return*/ OrderInfo saveOrderInfo(OrderInfoVo orderInfoVo, Long userId, String orderNo);
-
修改:
@Autowired private OrderDetailMapper orderDetailMapper;@Autowired private OrderDerateMapper orderDerateMapper;/*** 保存订单信息* @param orderInfoVo* @param userId* @param orderNo* @return*/ @Transactional(rollbackFor = Exception.class) @Override public OrderInfo saveOrderInfo(OrderInfoVo orderInfoVo, Long userId, String orderNo) {OrderInfo orderInfo = null;try {// 保存订单基本信息orderInfo = saveOrderBasicInfo(orderInfoVo, userId, orderNo);// 保存订单详情saveOrderDetail(orderInfo.getId(), orderInfoVo);// 保存订单减免saveOrderDerate(orderInfo.getId(), orderInfoVo);} catch (Exception e) {throw new ShisanException(400, "服务内部解析数据出现了异常");}return orderInfo; }/*** 保存订单基本信息* @param orderInfoVo* @param userId* @param orderNo* @return*/ private OrderInfo saveOrderBasicInfo(OrderInfoVo orderInfoVo, Long userId, String orderNo) {OrderInfo orderInfo = new OrderInfo();orderInfo.setUserId(userId);orderInfo.setOrderTitle(orderInfoVo.getOrderDetailVoList().get(0).getItemName()); // 订单标题orderInfo.setOrderNo(orderNo); // 订单编号orderInfo.setOrderStatus(SystemConstant.ORDER_STATUS_UNPAID); // 订单状态:未支付orderInfo.setOriginalAmount(orderInfoVo.getOriginalAmount()); // 订单的原始金额orderInfo.setDerateAmount(orderInfoVo.getDerateAmount()); // 订单的减免金额orderInfo.setOrderAmount(orderInfoVo.getOrderAmount()); // 订单的实际金额orderInfo.setItemType(orderInfoVo.getItemType()); // 付款项类型orderInfo.setPayWay(orderInfoVo.getPayWay()); // 支付方式orderInfoMapper.insert(orderInfo);return orderInfo; }/*** 保存订单详情* @param orderId* @param orderInfoVo*/ private void saveOrderDetail(Long orderId, OrderInfoVo orderInfoVo) {orderInfoVo.getOrderDetailVoList().stream().forEach(orderDetailVo -> {OrderDetail orderDetail = new OrderDetail();orderDetail.setOrderId(orderId);orderDetail.setItemId(orderDetailVo.getItemId());orderDetail.setItemName(orderDetailVo.getItemName());orderDetail.setItemUrl(orderDetailVo.getItemUrl());orderDetail.setItemPrice(orderDetailVo.getItemPrice());orderDetailMapper.insert(orderDetail);}); }/*** 保存订单减免* @param orderId* @param orderInfoVo*/ private void saveOrderDerate(Long orderId, OrderInfoVo orderInfoVo) {orderInfoVo.getOrderDerateVoList().forEach(orderDerateVo -> {OrderDerate orderDerate = new OrderDerate();orderDerate.setOrderId(orderId);orderDerate.setDerateType(orderDerateVo.getDerateType()); // 减免类型orderDerate.setDerateAmount(orderDerateVo.getDerateAmount()); // 减免金额orderDerate.setRemarks("商品有减免");orderDerateMapper.insert(orderDerate);}); }
14.3 提交订单完整实现(适配器设计模式实现不同支付方式的选择)
-
修改:
@Autowired private List<PayWay> payWayService;/*** 提交订单** @param orderInfoVo* @return*/ @Override public Map<String, Object> submitOrder(OrderInfoVo orderInfoVo) {// 大致流程:// 将前端提交的数据 保存到tingshu_order库下的三张表中// 生成订单编号// 支付方法的判断,零钱支付?微信支付?// 将订单编号返回// orderInfoVo(originalAmount:200)--->Map(originalAmount:200)---签名---200 | "atguigu123"---md5---"123456789xb"// 在将数据存储到数据库之前,要先做提交订单的校验// 1. 基础数据的校验(Spring已经做好了,比如OrderInfoVO上的@NotEmpty注解,submitOrder()接口上的@Validated注解等)// 2. 业务层的校验// a)验证订单信息的完整性// b)验证订单信息的时效性(保证订单尽可能小的被篡改的风险)// c)重复提交的校验// d)非法请求的校验// e)请求次数限制的校验// e)请求的ip做地域的校验// f)请求ip做黑白名单校验// g)在窗口期做请求速率的限制// .....// 1.业务层的校验// 1.1 校验请求的非法性,即交易号不能为空String tradeNo = orderInfoVo.getTradeNo();if (StringUtils.isEmpty(tradeNo)) {throw new ShisanException(201, "非法请求");}// 1.2 校验订单信息的完整性以及时效性String orderInfoVoStr = JSONObject.toJSONString(orderInfoVo); // 将orderInfoVo对象转成json字符串Map signMap = JSONObject.parseObject(orderInfoVoStr, Map.class); // 将json字符串转成map对象signMap.put("payWay", ""); // 往map中添加一个key为payWay的value为空的键值对SignHelper.checkSign(signMap); // 对 signMap 进行 签名校验(确保数据未被篡改)// 1.3 订单重复提交的校验// 第一次提交订单之后,未看见系统有响应(可能是网络原因、接口的并发高导致处理慢等),但是用户误以为失败或者没点上,接着又点击一次提交订单,这就导致了重复提交// 接口幂等性保证的解决方案:各种锁机制(分布式锁、本地锁)、MySQL的唯一索引+防重表+本地事务、Redis+Token(常用)// 处理单端重复提交。TODO:多端重复提交如何实现?// 判断Redis中是否存在指定的key(防重提交标识):// 1. 如果key存在(值为1),说明是第一次提交,删除key并返回1// 2. 如果key不存在(值为0),说明已经提交过了,key已经被删除了,是重复提交,返回0 // String orderRepeatSubmitKey = AuthContextHolder.getUserId() + ":" + orderInfoVo.getTradeNo(); // String luaScript = "if redis.call(\"exists\",KEYS[1])\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; // Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList(orderRepeatSubmitKey)); // if (execute == 0) { // throw new ShisanException(201, "订单重复提交"); // }// 处理多端重复提交Long userId = AuthContextHolder.getUserId();List<OrderDetailVo> productList = orderInfoVo.getOrderDetailVoList().stream().collect(Collectors.toList());String representativeToken = MD5.encrypt(new String(productList + ""));String orderRepeatSubmitKey = userId + ":" + representativeToken;Long increment = redisTemplate.opsForValue().increment(orderRepeatSubmitKey);if (increment > 2) {throw new ShisanException(201, "订单重复提交");}// TODO 若不允许购买一个商品多次,需要查询一下订单表是否有该商品的记录。若允许则忽略此步骤// TODO 若用户提交订单后未支付,再次购买相同商品时,应提示“订单未支付”(具体要看用户支付流程是怎样的,不一定需要这一步)// 2.生成订单编号String orderNo = RandomStringUtils.random(12, true, true);// 3.判断支付方式,保存订单信息String payWay = orderInfoVo.getPayWay(); // if ("1101".equals(payWay)) { // // 微信处理逻辑 // } else if ("1102".equals(payWay)) { // // 支付宝处理逻辑 // } else { // // 零钱支付方式 // }// 适配器设计模式for (PayWay way : payWayService) {if (way.isSupport(payWay)) {way.dealPayWay(orderInfoVo, userId, orderNo);}}// 5.订单编号返回Map<String, Object> map = new HashMap<>();map.put("orderNo", "");redisTemplate.delete(orderRepeatSubmitKey);return map; }
-
新建:其它三个实现类集成
PayWay
接口,并先暂时实现其中的两个方法,暂时不编写逻辑/*** 支付方式适配器*/ public interface PayWay {/*** 定义适配某一种具体支付方式的适配方式*/public boolean isSupport(String payWay);/*** 具体支付方式的处理支付逻辑*/public void dealPayWay(OrderInfoVo orderInfoVo,Long userId,String orderNo); }
14.3 远程调用用户微服务,查询并锁定用户余额(利用CAS思想+分布式锁)&记录用户账户流水接口
-
修改:记得给
UserAccountFeignClient
接口的@FeignClient
注解加上path = "/api/inner/accountinfo"
参数/*** 查询并锁定用户余额* @param accountLockVo* @return*/ @PostMapping("/checkAndLockAmount") Result<AccountLockResultVo> checkAndLockAmount(AccountLockVo accountLockVo);
-
新建:
/*** 查询并锁定用户余额* @param accountLockVo* @return*/ @PostMapping("/checkAndLockAmount") Result<AccountLockResultVo> checkAndLockAmount(@RequestBody AccountLockVo accountLockVo) {return userAccountService.checkAndLockAmount(accountLockVo); }
-
修改:
/*** 查询并锁定用户余额* @param accountLockVo* @return*/ Result<AccountLockResultVo> checkAndLockAmount(AccountLockVo accountLockVo);/*** 记录用户账户流水*/ void log(Long userId, BigDecimal amount, String content, String orderNo, String tradeType);
-
修改:
@Autowired private UserAccountMapper userAccountMapper;@Autowired private StringRedisTemplate redisTemplate;@Autowired private UserAccountDetailMapper userAccountDetailMapper;/*** 查询并锁定用户余额* @param accountLockVo* @return*/ // 注意事项一:查询余额是否充足 // 如果余额不充足,抛出异常 // 如果余额充足,锁定余额(利用CAS思想,即Compare And Swap比较并交换,在锁定余额之前,查询一下要锁定的余额是否是符合预期的,将查询和锁定余额的操作写在同一条SQL上) // 如果不将查询和锁定余额的操作写在同一条SQL上,可能会有以下情况: // (1)线程A查询余额是否充足,充足,来锁定余额; // (2)在线程A锁定余额之前,线程B查询余额是否充足,充足,来锁定余额; // (3)在线程B锁定余额之前,线程A已经将余额锁定,此时线程B锁定的余额就是已经被线程A修改过后的余额,此时就出现了错误// 注意事项二:利用分布式锁解决并发问题 // 线程A在执行业务的时候,线程B不能进来执行业务。线程A在执行业务之前,加分布式锁,执行业务之后,将锁定金额返回对象保存到Redis中,然后删除分布式锁 // 线程A执行完业务的时候,线程B才能进来执行业务。线程B进来之后,先查询Redis中是否有锁定金额返回对象,如果有,说明线程A已经执行完业务了,线程B直接将结果返回给用户 @Override @Transactional(rollbackFor = Exception.class) public Result<AccountLockResultVo> checkAndLockAmount(AccountLockVo accountLockVo) {// 1.获取变量Long userId = accountLockVo.getUserId();BigDecimal amount = accountLockVo.getAmount();String content = accountLockVo.getContent();String orderNo = accountLockVo.getOrderNo();String dataKey = "user:account:amount:" + orderNo; // 锁定金额返回对象的keyString lockKey = "user:account:amount:lock" + orderNo; // 余额的分布式锁key// 2.加分布式锁Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");if (!aBoolean) {return Result.build(null, ResultCodeEnum.ACCOUNT_LOCK_REPEAT); // 线程2进来,线程1已经锁定了,线程2不能进来}// 3.将判断余额是否充足和修改余额作为原子操作(利用CAS思想,即比较并交换)try {String result = redisTemplate.opsForValue().get(dataKey);if (!StringUtils.isEmpty(result)) { // 如果redis中已经存在锁定金额返回对象,说明已经锁定过余额了,直接将结果返回给线程Breturn Result.build(JSONObject.parseObject(result, AccountLockResultVo.class), ResultCodeEnum.ACCOUNT_LOCK_REPEAT);}int count = userAccountMapper.checkAndLockAmount(userId, amount);// 3.1 锁定失败if (count == 0) {return Result.build(null, ResultCodeEnum.ACCOUNT_LOCK_ERROR);}// 3.2 锁定成功,构建锁定金额返回对象,即AccountLockResultVoAccountLockResultVo accountLockResultVo = new AccountLockResultVo();accountLockResultVo.setUserId(userId);accountLockResultVo.setAmount(amount);accountLockResultVo.setContent(content);// 3.3 记录用户账户流水,即用户对钱有哪些操作this.log(userId, amount, "锁定:" + content, orderNo, "1202"); // 流水号 1202 代表锁定金额// 3.4 将锁定金额返回对象保存到redis中,主要为了解锁和消费的时候取数据方便redisTemplate.opsForValue().set(dataKey, JSONObject.toJSONString(accountLockResultVo));// 3.5 返回锁定对象return Result.ok(accountLockResultVo);} catch (Exception e) {// TODO:OpenFeign业务执行期间出现了异常,让OpenFeign进行重试// OpenFeign只会对超时和网络异常进行重试,业务异常不进行重试throw new ShisanException(400, "服务内部处理数据出现了异常");} finally {redisTemplate.delete(lockKey); // 线程1执行完,删除锁,其它线程才可以进来操作余额} }/*** 记录用户账户流水* @param userId* @param amount* @param content* @param orderNo* @param tradeType*/ @Override public void log(Long userId, BigDecimal amount, String content, String orderNo, String tradeType) {UserAccountDetail userAccountDetail = new UserAccountDetail();userAccountDetail.setUserId(userId);userAccountDetail.setTitle(content);userAccountDetail.setTradeType(tradeType);userAccountDetail.setAmount(amount);userAccountDetail.setOrderNo(orderNo);userAccountDetailMapper.insert(userAccountDetail); }
-
修改:
/*** 判断余额是否充足并修改余额* @param userId* @param amount* @return*/ int checkAndLockAmount(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
-
修改:
<update id="checkAndLockAmount">update user_accountset lock_amount=lock_amount + #{amount},available_amount=available_amount - #{amount}where user_account.user_id = #{userId}and user_account.available_amount >= #{amount} </update>
-
修改:
@Override public Result<AccountLockResultVo> checkAndLockAmount(AccountLockVo accountLockVo) {return Result.fail(); }
14.4 本地消息表
-
具体介绍见
14.6
; -
运行sql脚本:创建本地消息表,主要字段就两个,即消息内容和状态
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0;-- ---------------------------- -- Table structure for t_local_msg -- ---------------------------- DROP TABLE IF EXISTS `t_local_msg`; CREATE TABLE `t_local_msg` (`id` bigint NOT NULL AUTO_INCREMENT,`msg_content` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,`status` tinyint NOT NULL,`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;-- ---------------------------- -- Records of t_local_msg -- ----------------------------SET FOREIGN_KEY_CHECKS = 1;
-
创建本地事务表的实体类对象:
@TableName("t_local_msg") @Data public class LocalMsg {@TableId(type = IdType.AUTO)private Long id;@TableField(value = "msg_content")private String msgContent;@TableField(value = "status")private Integer status;@TableField("create_time")private Date createTime; // Mon Sep 02 10:38:08 CST 2024@JsonIgnore // 不会参与序列化@TableField("update_time")private Date updateTime; }
-
修改:在RabbitMQ的常量类中添加本地消息表的交换机、路由键、队列的常量
/*** 本地消息表*/ public static final String EXCHANGE_LOCAL_MSG = "local.msg.exchange"; public static final String ROUTING_LOCAL_MSG = "local.msg.rk"; public static final String QUEUE_LOCAL_MSG = "local.msg.queue";
-
新建:
public interface MqOpsService {/*** 修改本地消息表的状态* @param content*/void updateLocalMsgStatus(String content); }
-
新建:
@Service public class MqOpsServiceImpl implements MqOpsService {@Autowiredprivate LocalMsgMapper localMsgMapper;/*** 修改本地消息表的状态* @param content*/@Overridepublic void updateLocalMsgStatus(String content) {LambdaQueryWrapper<LocalMsg> wrapper = new LambdaQueryWrapper<LocalMsg>();wrapper.eq(LocalMsg::getMsgContent, content);LocalMsg localMsg = localMsgMapper.selectOne(wrapper);if (localMsg != null) {localMsg.setStatus(1);localMsgMapper.updateById(localMsg);}} }
-
新建:
@Mapper public interface LocalMsgMapper extends BaseMapper<LocalMsg> {}
14.5 下游监听MQ,然后解锁“锁定金额返回对象”或消费
-
修改:
@Autowired private RabbitService rabbitService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_ACCOUNT_UNLOCK, durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_ACCOUNT, durable = "true"),key = MqConst.ROUTING_ACCOUNT_UNLOCK )) @SneakyThrows public void listenUserAccountUnlock(String orderNo, Message message, Channel channel) {// 判断消息是否存在if (StringUtils.isEmpty(orderNo)) {// 消息不存在,直接返回return;}// 消费消息long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息的唯一标识try {mqOpsService.listenUserAccountUnlock(orderNo);// 向本地消息表发送签收消息rabbitService.sendMessage(MqConst.EXCHANGE_LOCAL_MSG, MqConst.ROUTING_LOCAL_MSG, orderNo);channel.basicAck(deliveryTag, false);} catch (ShisanException e) {String msgRetryKey = "msg:retry:" + orderNo;Long count = redisTemplate.opsForValue().increment(msgRetryKey);// 三次重试if (count >= 3) { // 不能重试log.error("消息重试{}次失败,异常:{}", count, e.getMessage());channel.basicNack(deliveryTag, false, false);redisTemplate.delete(msgRetryKey); // 删除计数器} else {log.error("消息第{}次重试", count);channel.basicNack(deliveryTag, false, true);}} catch (Exception e) {log.error("网络故障导致手动应答消息失败,异常:{}", e.getMessage());channel.basicNack(deliveryTag, false, false);} }@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_ACCOUNT_MINUS, durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_ACCOUNT, durable = "true"),key = MqConst.ROUTING_ACCOUNT_MINUS )) @SneakyThrows public void listenUserAccountMinus(String orderNo, Message message, Channel channel) {// 判断消息是否存在if (StringUtils.isEmpty(orderNo)) {// 消息不存在,直接返回return;}// 消费消息long deliveryTag = message.getMessageProperties().getDeliveryTag();try {mqOpsService.listenUserAccountMinus(orderNo);// 向本地消息表发送签收消息rabbitService.sendMessage(MqConst.EXCHANGE_LOCAL_MSG, MqConst.ROUTING_LOCAL_MSG, orderNo);channel.basicAck(deliveryTag, false);} catch (ShisanException e) {String msgRetryKey = "msg:retry:" + orderNo;Long count = redisTemplate.opsForValue().increment(msgRetryKey);// 三次重试if (count >= 3) { // 不能重试log.error("消息重试{}次失败,异常:{}", count, e.getMessage());channel.basicNack(deliveryTag, false, false);redisTemplate.delete(msgRetryKey); // 删除计数器} else {log.error("消息第{}次重试", count);channel.basicNack(deliveryTag, false, true);}} catch (Exception e) {log.error("网络故障导致手动应答消息失败,异常:{}", e.getMessage());channel.basicNack(deliveryTag, false, false);} }
-
修改:
/*** 解锁“锁定金额返回对象”* @param orderNo*/ void listenUserAccountUnlock(String orderNo);/*** 消费* @param orderNo*/ void listenUserAccountMinus(String orderNo);
-
修改:
@Autowired private StringRedisTemplate redisTemplate;@Autowired private UserAccountService userAccountService;/*** 解锁“锁定金额返回对象”* @param orderNo*/ @Override public void listenUserAccountUnlock(String orderNo) {// 修改tingshu_account表的lock_amount(-)以及avaliable_amount(+)// 1、从Redis获取锁定的对象String dataKey = "user:account:amount:" + orderNo;String lockKey = "user:account:amount:lock:retry" + orderNo;String lockAmountResultStr = redisTemplate.opsForValue().get(dataKey); // CAS思想。防止正在解锁的时候,又来了一个解锁请求if (StringUtils.isEmpty(lockAmountResultStr)) { // 如果redis中已经没有锁定金额返回对象,说明已经解锁了return;}AccountLockResultVo accountLockResultVo = JSONObject.parseObject(lockAmountResultStr, AccountLockResultVo.class);Long userId = accountLockResultVo.getUserId();BigDecimal amount = accountLockResultVo.getAmount();String content = accountLockResultVo.getContent();Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");if (aBoolean) {// 2.解锁try {int count = userAccountMapper.unLock(userId, amount);// 3.在流水表中记录解锁操作userAccountService.log(userId, amount, "解锁:" + content, orderNo, "1203-解锁");redisTemplate.delete(dataKey); // 正常干完活才能删除缓存标识key(不能放到finally中)} catch (Exception e) {throw new ShisanException(201, "解锁失败");} finally {// 4.删除缓存中的锁定对象redisTemplate.delete(lockKey);}} }/*** 消费* @param orderNo*/ @Override public void listenUserAccountMinus(String orderNo) {// 修改 lock_amount(-) 以及total_amount(-)// 1、从Redis获取锁定的对象String dataKey = "user:account:amount:" + orderNo;String lockKey = "user:account:minus:lock:" + orderNo;String lockAmountResultStr = redisTemplate.opsForValue().get(dataKey);if (StringUtils.isEmpty(lockAmountResultStr)) {return;}AccountLockResultVo accountLockResultVo = JSONObject.parseObject(lockAmountResultStr, AccountLockResultVo.class);Long userId = accountLockResultVo.getUserId();BigDecimal amount = accountLockResultVo.getAmount();String content = accountLockResultVo.getContent();// 2.解锁Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");if (aBoolean) {try {int count = userAccountMapper.minus(userId, amount);// 3.在流水表中记录消费操作userAccountService.log(userId, amount, "消费:" + content, orderNo, "1204-消费");// 4.删除缓存中的锁定对象redisTemplate.delete(dataKey);} catch (Exception e) {throw new ShisanException(201, "消费失败");} finally {redisTemplate.delete(lockKey);}} }
-
修改:
/*** 解锁“锁定金额返回对象”* @param userId* @param amount* @return*/ int unLock(@Param("userId") Long userId, @Param("amount") BigDecimal amount);/*** 消费* @param userId* @param amount* @return*/ int minus(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
-
修改:
<update id="unLock">update user_accountset lock_amount=lock_amount - #{amount},available_amount=available_amount + #{amount}where user_account.user_id = #{userId} </update><update id="minus">update user_accountset lock_amount=lock_amount - #{amount},total_amount=total_amount - #{amount},total_pay_amount=total_pay_amount + #{amount}where user_account.user_id = #{userId} </update>
14.6 零钱支付逻辑(定时任务+本地消息表解决消息队列导致的分布式事务问题)
-
修改:
package com.shisan.tingshu.order.adapter.impl;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.shisan.tingshu.account.client.UserAccountFeignClient; import com.shisan.tingshu.common.execption.ShisanException; import com.shisan.tingshu.common.rabbit.constant.MqConst; import com.shisan.tingshu.common.rabbit.service.RabbitService; import com.shisan.tingshu.common.result.Result; import com.shisan.tingshu.common.result.ResultCodeEnum; import com.shisan.tingshu.model.order.LocalMsg; import com.shisan.tingshu.order.adapter.PayWay; import com.shisan.tingshu.order.mapper.LocalMsgMapper; import com.shisan.tingshu.order.service.OrderInfoService; import com.shisan.tingshu.vo.account.AccountLockResultVo; import com.shisan.tingshu.vo.account.AccountLockVo; import com.shisan.tingshu.vo.order.OrderInfoVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils;import java.util.List;/*** 零钱支付逻辑*/ @Service public class RechargePayWayImpl implements PayWay {@Autowiredprivate OrderInfoService orderInfoService;@Autowiredprivate UserAccountFeignClient userAccountFeignClient;@Autowiredprivate RabbitService rabbitService;@Autowiredprivate LocalMsgMapper localMsgMapper;@Autowiredprivate StringRedisTemplate redisTemplate;String unLockCacheKey = "";String minusCacheKey = "";/*** 定时任务。让“发消息给解锁队列”这个操作一直执行。只有等下游回复解锁成功后,才停止* @param* @return*/@Scheduled(fixedDelay = 1000 * 60)public void scanLocalMsgAndRetryMsg() {// 1.查询本地消息表LambdaQueryWrapper<LocalMsg> wrapper = new LambdaQueryWrapper<>();wrapper.eq(LocalMsg::getStatus, 0); // 取出状态为0的消息// 2.获取消息没有修改的订单编号List<LocalMsg> localMsgs = localMsgMapper.selectList(wrapper);for (LocalMsg localMsg : localMsgs) {// 3.发送检索消息状态是0的消息 选择发送String unLockStr = redisTemplate.opsForValue().get(unLockCacheKey);if (!StringUtils.isEmpty(unLockStr)) {rabbitService.sendMessage(MqConst.EXCHANGE_ACCOUNT, MqConst.ROUTING_ACCOUNT_UNLOCK, localMsg.getMsgContent());}String minusStr = redisTemplate.opsForValue().get(minusCacheKey);if (!StringUtils.isEmpty(minusStr)) {rabbitService.sendMessage(MqConst.EXCHANGE_ACCOUNT, MqConst.ROUTING_ACCOUNT_MINUS, localMsg.getMsgContent());}}}@Overridepublic boolean isSupport(String payWay) {return "1103".equals(payWay);}/*** 分布式事务产生的原因及解决:* 1、若是RPC远程调用导致的分布式事务,解决1:异常机制+反向操作(反向rpc或反向发消息);解决2:使用Seata的分布式事务框架* 2、若是消息队列导致分布式事务,解决:本地消息表+重试机制(比如定时任务的重试)* <p>* 1.使用OpenFeign的时候,由于重试机制,所以会导致接口被重复调用,因此一定要解决由于OpenFeign重试机制的幂等性* 2.使用消息队列的时候,由于发送方为了保证消息百分百发送成功,也会引入重试机制(定时任务的重试)。那么也会导致下游消费者会重复消费消息,所以也一定要控制幂等性** 零钱支付方式的处理支付逻辑* @param orderInfoVo* @param userId* @param orderNo*/@Overridepublic void dealPayWay(OrderInfoVo orderInfoVo, Long userId, String orderNo) {// 零钱支付逻辑// 1.检查零钱是否充足,并锁定余额// 如果零钱充足,才保存订单相关信息// 如果零钱不充足,不用保存订单相关信息// 远程调用账户微服务查询并锁定余额,此处只能用RPC,不能用消息队列AccountLockVo accountLockVo = prePareUserAccountVo(orderNo, userId, orderInfoVo);Result<AccountLockResultVo> lockResultVo = userAccountFeignClient.checkAndLockAmount(accountLockVo);if (lockResultVo.getCode() != 200) {throw new ShisanException(ResultCodeEnum.ACCOUNT_LOCK_ERROR);}// 2.保存订单信息try {// 2.1 初始化本地消息表initLocalMsg(orderNo);// 2.2 保存订单信息orderInfoService.saveOrderInfo(orderInfoVo, userId, orderNo);// 3.解锁以及真正地扣减余额(tingshu_account表的total_amount字段和lock_amount字段),使用同步RPC或者异步消息队列都可以rabbitService.sendMessage(MqConst.EXCHANGE_ACCOUNT, MqConst.ROUTING_ACCOUNT_MINUS, orderNo);} catch (ShisanException e) {// 捕获`2.2 保存订单信息`抛出的异常,即如果保存订单信息失败// 反向解锁“锁定金额返回对象”rabbitService.sendMessage(MqConst.EXCHANGE_ACCOUNT, MqConst.ROUTING_ACCOUNT_UNLOCK, orderNo); // 这一行代码不一定能够执行成功,所以需要使用本地消息表+定时任务重试机制unLockCacheKey = "unlock:fail:flag" + orderNo; // 在Redis中存储一个key,当定时任务执行的时候,会去查询这个key是否存在,如果存在,就继续执行解锁操作redisTemplate.opsForValue().set(unLockCacheKey, "1");throw new ShisanException(201, "反向解锁“锁定金额返回对象”失败");} catch (Exception e) { // 捕获` 3.解锁以及真正地扣减余额`抛出的异常// 反向修改tingshu_account表的total_amount字段和lock_amount字段,使用同步RPC或者异步消息队列都可以rabbitService.sendMessage(MqConst.EXCHANGE_ACCOUNT, MqConst.ROUTING_ACCOUNT_MINUS, orderNo); // 这一行代码不一定能够执行成功,所以需要使用本地消息表+定时任务重试机制minusCacheKey = "minus:fail:flag" + orderNo; // 在Redis中存储一个key,当定时任务执行的时候,会去查询这个key是否存在,如果存在,就继续执行消费操作redisTemplate.opsForValue().set(minusCacheKey, "1");throw new ShisanException(201, "反向扣减余额失败");}}/*** 初始化本地消息表* @param orderNo*/public void initLocalMsg(String orderNo) {LocalMsg localMsg = new LocalMsg();localMsg.setMsgContent(orderNo);localMsg.setStatus(0);localMsgMapper.insert(localMsg);}/*** 准备用户账户信息* @param orderNo* @param userId* @param orderInfoVo* @return*/private AccountLockVo prePareUserAccountVo(String orderNo, Long userId, OrderInfoVo orderInfoVo) {AccountLockVo accountLockVo = new AccountLockVo();accountLockVo.setOrderNo(orderNo);accountLockVo.setUserId(userId);accountLockVo.setAmount(orderInfoVo.getOrderAmount());// 实际买商品要花的钱accountLockVo.setContent(orderInfoVo.getOrderDetailVoList().get(0).getItemName());return accountLockVo;} }