刺猬的开发笔记之企业级兑换奖励c端链路开发
业务代码逻辑链路如下
// facade 使用了门面设计模式public RedPacketResult redPacketOpen(RedPacketOpenRequest request)// service public RedPacketResult redPacketOpen(long redPacketId, long userId, long userGameId) {// 上锁redisMutexLock.tryLock(lockName, lockId)// 获取红包状态RedPacketExchange redPacketExchange = redPacketExchangeDomainService.getByRedPacketId(userId,redPacketId);// 获得档位int stage = redPacketExchange.getStage();// 获得档位对应元素数量、红包金额Optional<ExchangeConfig> exchangeConfigOptional =getExchangeConfigs().stream().filter(o -> {return o.getStage() == stage;}).findFirst();if (!exchangeConfigOptional.isPresent()) {throw new BizException(ErrorCode.RED_PACKET_EXCHANGE_CONFIG_NON);}ExchangeConfig exchangeConfig = exchangeConfigOptional.get();int groups = exchangeConfig.getGroups();// 消耗资源 具体方法逻辑详见后文checkAndDecrItems(items, dh, ActivityTypeEnum.ACTIVITY.getType());// 发放红包 具体方法逻辑详见后文try {redPacketExchange = redPacketExchangeDomainService.exchangeRedPacket(userGameId, userId, redPacketId);}catch (Exception e){log.error("redPacketOpen error, 道具资源回滚,userGameId:{},userId:{},redPacketId:{}", userGameId, userId,redPacketId,e);rewardOrDecrItem(items, dh, ActivityTypeEnum.ACTIVITY.getType());throw e;}// 返回最新红包状态ActivityDO activityDO = getActivityDO(dh);return getRedPacketResult(activityDO, redPacketExchange);// 释放锁redisMutexLock.releaseLock(lockName, lockId);}
扣减资源
public void checkAndDecrItems(List<Item> items, SimpleMgcDataHolder dataHolder, Integer activityId) {ActivityDO activityDO = getActivityDO(dataHolder);for (Item item : items) {if (MapUtils.isEmpty(activityDO.getItems())) {throw new BizException(ErrorCode.ACTIVITY_ITEM_NOT_ENOUGH);}Integer itemNum = activityDO.getItems().get(item.getItemId());if (Objects.isNull(itemNum)) {throw new BizException(ErrorCode.ACTIVITY_ITEM_NOT_ENOUGH);}if (item.getItemNum() > itemNum) {throw new BizException(ErrorCode.ACTIVITY_ITEM_NOT_ENOUGH);}}// 扣减 对应changeType为2for (Item item : items) {activityDO.getItems().put(item.getItemId(), activityDO.getItems().get(item.getItemId()) - item.getItemNum());}saveActivityDO(dataHolder);log.info("checkAndDecrItems, activityId:{}, userGameId:{}, items:{}", activityId, dataHolder.getEntityId(), items);// 发起事件ItemChangeEvent.builder().activityId(activityId).userGameId(dataHolder.getEntityId()).changeType(2).changeItems(items).remainItems(activityDO.getItems()).dataHolder(dataHolder).build().fire();}
发送奖励
public RedPacketExchange exchangeRedPacket(long userGameId, long userId, long redPacketId) {RedPacketExchange redPacketExchange = redPacketExchangeRepository.getByRedPacketId(userId, redPacketId);if (redPacketExchange == null) {throw new BizException(ErrorCode.RED_PACKET_NON);}if (RedPacketExchangeStatusEnum.EXCHANGED.getStatus() == redPacketExchange.getStatus()) {throw new BizException(ErrorCode.RED_PACKET_EXCHANGED);}int update = redPacketExchangeRepository.updateRedPacketExchanged(userId, redPacketId);if (update != 1) {throw new BizException(ErrorCode.RED_PACKET_EXCHANGED_FAIL);}// 实际发送奖励的逻辑 自主实现即可boolean result = orderGateway.sendCash();if (!result) {// 回滚红包状态 具体方法逻辑详见后文redPacketExchangeRepository.updateRedPacketExchangeFailed(userId, redPacketId);throw new BizException(ErrorCode.RED_PACKET_EXCHANGED_FAIL);}// 解锁下一档位unlockNext(userId);return redPacketExchange;
}
public int updateRedPacketExchangeFailed(long userId, long redPacketId) {RedPacketExchangeExample example = new RedPacketExchangeExample();example.createCriteria().andUserIdEqualTo(userId).andIdEqualTo(redPacketId).andStatusEqualTo(RedPacketExchangeStatusEnum.EXCHANGED.getStatus());RedPacketExchange redPacketExchange = new RedPacketExchange();redPacketExchange.setStatus(RedPacketExchangeStatusEnum.EXCHANGE_FAILED.getStatus());return redPacketExchangeMapper.updateByExampleSelective(redPacketExchange, example);}
这里给一个发送奖励的例子,仅有逻辑,保证企业级资金安全。
public boolean sendCash() {// 构造参数ResSendReq req = new ResSendReq();// ...省略...// 预校验// 直接回滚资源 // ...省略...// 实际发放// 1. 当出现未知错误码时进行一次重试// 1.1 若重试出现异常则认为发放成功 不回滚资源// 2. 当出现库存不足及其他已知报错 回滚资源// 3. 超时认为成功 回滚资源
借助EventBus采取在资源变化的时候进行事件监听的机制,注册同步和异步的监听器,实现在资源集齐时对红包可领取状态的修改
事件总线注册器
@Component
@SuppressWarnings("UnstableApiUsage")
public class EventBusRegistrant implements InitializingBean {@Resourceprivate AsyncSubscriber asyncSubscriber;@Resourceprivate SyncSubscriber syncSubscriber;@Resource(name = "asyncEventBus")private EventBus asyncEventBus;@Resource(name = "eventBus")private EventBus eventBus;@Overridepublic void afterPropertiesSet() {this.eventBus.register(syncSubscriber);this.asyncEventBus.register(asyncSubscriber);}
}
@Service
@Slf4j
public class AsyncSubscriber{@Subscribe@AllowConcurrentEventspublic void itemChane(ItemChangeEvent itemChangeEvent) {log.info("itemChane:{}", JsonUtils.object2JsonStr(itemChangeEvent));if (CollectionUtils.isEmpty(itemChangeEvent.getChangeItems())) {return;}Map<Integer, Integer> itemMap =itemChangeEvent.getChangeItems().stream().collect(Collectors.toMap(Item::getItemId,Item::getItemNum, Integer::sum));Map<Integer, Integer> remainItemMap = itemChangeEvent.getRemainItems();// 道具增加时的逻辑if (hasChange) {try {long userId = mgcPlayerInnerService.queryUserId(itemChangeEvent.getUserGameId());RedPacketExchange redPacketExchange =redPacketExchangeDomainService.getLatestByUserId(userId);if (redPacketExchange != null) {ActivityDO activityDO = getActivityDO(dataHolder);// 切换红包领取状态ImmutablePair<RedPacketExchangeProgressEnum, Integer> progressPair =redPacketExchangeDomainService.getRedPacketExchangeProgressWithDiff(redPacketExchange, activityDO);if (RedPacketExchangeProgressEnum.EXCHANGEABLE == progressPair.getLeft()) {int stage = redPacketExchange.getStage();// 每个stage档位只推送一次String stageKey = "push_" + userId + "_" + stage;try {// 使用Redis的setnx确保每个档位只推送一次,过期时间设置为30天StoreKey storeKey = StoreKey("activity", stageKey);// 设置过期时间为30天boolean isFirstTime = redisStoreClient.setnx(storeKey, 1, 30 * 24 * 3600);if (isFirstTime) {log.info("用户{}首次达到stage{}档位,触发推送", userId, stage);// 主动推送triggerRewardNotify(userId);} else {log.info("用户{}的stage{}档位已推送过,跳过推送", userId, stage);}} catch (Exception e) {log.warn("检查stage推送状态失败,用户:{}, stage:{}", userId, stage, e);}}}} catch (Exception e) {log.warn("处理兑换推送逻辑失败", e);}}}}
}