分布式之抢购
数据库准备
新建数据库snap_up
DROP TABLE IF EXISTS `t_goods`;
create table `t_goods`(`id` bigint(20) not null auto_increment,`goods_no` varChar(255) DEFAULT NULL COMMENT '商品编号',total int(11) DEFAULT '0' COMMENT '剩余数量',PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;INSERT INTO `t_goods` VALUES(2,'WZY1001',100);DROP TABLE IF EXISTS `user_good`;
create table `user_good`(`id` bigint(20) not null auto_increment,`user_id` bigint(20) DEFAULT NULL ,`goods_no` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
代码准备
创建项目
添加依赖
<!--fastjson依赖-->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.33</version>
</dependency>
<!-- mybatis-spring依赖包 -->
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency>
<!-- mybatisPlus依赖包 -->
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version>
</dependency>
<!-- redis依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
修改配置
把application.properties
修改为application.yml
。
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/snap_up?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: rootredis:password: 123456port: 6379host: 127.0.0.1database: 0
mybatis-plus:mapper-locations: classpath:mappers/*.xml # 扫描mappers映射文件type-aliases-package: com.hsh.pojo # 扫描别名configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 打印sql到控制台map-underscore-to-camel-case: true # 开启驼峰映射
删除文件
删除如下文件
pojo层
package com.hsh.pojo;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;@Data
public class TGoods {@TableIdprivate Integer id;// 商品idprivate String goodsNo;// 商品编号比如 WZY1001private Integer total;// 总库存
}
mapper层
package com.hsh.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hsh.pojo.TGoods;public interface TGoodsMapper extends BaseMapper<TGoods> {}
utils包
RedisUtils
package com.hsh.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;/*** Redis工具类,整合RedisTemplate和StringRedisTemplate* 提供针对字符串和通用对象的全面操作*/
@Component
public class RedisUtils {@AutowiredStringRedisTemplate stringRedisTemplate;@AutowiredRedisTemplate<Object, Object> redisTemplate;
// @Resource(name = "stringRedisTemplate")
// ValueOperations<String, String> valOpsStr;
// @Resource(name = "redisTemplate")
// ValueOperations<Object, Object> valOpsObj;// public String getStr(String key){
// return valOpsStr.get(key);
// }
//
// public long getIncrement(String key) {
// return valOpsStr.increment(key);
// }public String getStr(String key) {return stringRedisTemplate.opsForValue().get(key);}public long getIncrement(String key) {return stringRedisTemplate.opsForValue().increment(key);}/*** 存储对象* @param key Redis键* @param value 存储的对象*/public void setObj(String key, Object value) {redisTemplate.opsForValue().set(key, value);}
}
config包
序列化Redis
package com.hsh.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);//使用Jackson2JsonRedisSerialize替换默认序列化Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);//设置vaLue的序列化规则和key的序列化规则redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();return redisTemplate;}
}
修改启动类
package com.hsh;import com.hsh.mapper.TGoodsMapper;
import com.hsh.pojo.TGoods;
import com.hsh.utils.RedisUtils;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@MapperScan("com.hsh.mapper")
public class SnapUpApplication {public static void main(String[] args) {SpringApplication.run(SnapUpApplication.class, args);}@Autowiredprivate TGoodsMapper tGoodsMapper;@Autowiredprivate RedisUtils redisUtil;@Beanpublic void infoGoods(){TGoods tGoods =tGoodsMapper.selectById(2);redisUtil.setObj("snap_up_goods:"+tGoods.getGoodsNo(),tGoods.getTotal());System.out.println("11111111111");}
}
测试
启动项目,redis存入数据说明成功
controller(核心逻辑)
package com.hsh.controller;import com.hsh.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/*** @author xrkhy* @date 2025/9/25 9:11* @description*/
@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RedisUtils redisUtils;@GetMapping("/snapUp")public String snapUp(Integer userId, String goodsNo){
// goodsNo = "WZY1001";// 判断用户是否已经抢购if (redisUtils.getObj("snap_up_record:"+goodsNo+":"+userId) != null){return "用户"+userId+"已经抢购过了了.·.·商品"+goodsNo;}// 检查库存数量int total = Integer.parseInt(redisUtils.getObj("snap_up_goods:"+goodsNo).toString());if (total > 0){// 购买// 减库存total--;redisUtils.setObj("snap_up_goods:"+goodsNo, total);// 插入购买记录redisUtils.setObj("snap_up_record:"+goodsNo+":"+userId, 1);// 抢到了return "用户"+userId+"商品购买成功.·.·商品"+goodsNo;}else {//库存不足return "商品"+ goodsNo +"库存不足";}}}
测试
访问http://localhost:8080/index/snapUp?userId=1&goodsNo=WZY1001
上锁解决高并发
问题演示
我们这里看似没有问题,但是遇到在秒内有1000多个用户同时访问此接口就会有问题。接下来我就演示一下高并发。
为了方便演示我先修改一下controller代码
package com.hsh.controller;import com.hsh.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RedisUtils redisUtils;Integer userId = 0;@GetMapping("/snapUp")public String snapUp(){// 模拟用户登录String goodsNo = "WZY1001";userId++;// 判断用户是否已经抢购if (redisUtils.getObj("snap_up_record:"+goodsNo+":"+userId) != null){return "用户"+userId+"已经抢购过了了.·.·商品"+goodsNo;}// 检查库存数量int total = Integer.parseInt(redisUtils.getObj("snap_up_goods:"+goodsNo).toString());if (total > 0){// 购买// 减库存total--;redisUtils.setObj("snap_up_goods:"+goodsNo, total);// 插入购买记录redisUtils.setObj("snap_up_record:"+goodsNo+":"+userId, 1);// 抢到了return "用户"+userId+"商品购买成功.·.·商品"+goodsNo;}else {//库存不足return "商品"+ goodsNo +"库存不足";}}
}
接下来我们进行压力测试,需要下载一个工具jmeter
。
安装jmeter
下载网址:https://jmeter.apache.org/download_jmeter.cgi
,
选择Binaries
的apache-jmeter-5.6.3.zip
下载完毕直接解压即可
启动jmeter
打开刚刚的bin文件夹
双击jmeter.bat
启动,等待5s
注意,弹出的黑窗口千万不关闭
界面如下
修改jmeter的语言
修改jmeter的字体大小
添加线程组
添加http请求
http://localhost:8080/index/snapUp?userId=1&goodsNo=WZY1001
重启项目
再重启之前先把redis中的所有数据清空
启动jmeter
会提示你是否保存此次请求的记录点击yes
给请求起个名字保存
查看运行结果
我们发现商品卖了217个,库存剩余71个。这个显然不对。
为什么会造成这个问题呢?
就是下面的代码,当第一个用户进来时还没来的急执行total--
第二个用户就进来了,此时库存还是100,所有就会出现只有一百个库存确卖出了217个的现象。
解决
我们加锁就行了。
如果你还不知道什么是锁请看我的博客java的多线程中的线程同步
这一小节
这里我们使用同步方法
加锁
原理:每次只能一个线程进入,执行完毕以后自动解锁,其他线程才可以进来执行。
// 语法
修饰符 synchronized 返回值类型 方法名称(形参列表) {操作共享资源的代码
}
代码编写
package com.hsh.controller;@RestController
@RequestMapping("/index")
public class IndexController {// ....public synchronized String snapUp(){// ...}}
解决过后的运行结果
再重启之前先把redis中的所有数据清空
再次启动 项目和jmeter,刷新redis的可视化工具(多刷几次)。
分布式锁
问题分析
上面还有问题,比如说我的项目是分布式项目。一个项目使用81端口另一个项目使用80端口,而我们两个项目的同步方法
加锁是独立,他针对的单个项目。如果有多个项目,那么就会出现上面的情况。当81端口项目中
的商品还没来的急--操作
,80端口项目中
又来了一个人。这样就还会出现购买数比库存多的问题。
解决办法
redis的setnx命令
使用redis的setnx
命令
> setnx snap-up 100
1
> setnx snap-up 99
0
查看snap-up
发现并没有被覆盖。可使用redis这个特性做一个redis的全局锁。
思路讲解
分别给81和80端口做一个循环判断,如果上锁就不让进redis进行修改。解锁就是删除setnx的键即可
代码演示
注意这里我把所有的
redisUtils.setObj
改成了redisUtils.setStr
所有的redisUtils.getObj
改成了redisUtils.getStr
因为报错了,说我类型转化错误,估计是序列化有问题。
package com.hsh.controller;import com.hsh.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/*** @author xrkhy* @date 2025/9/25 9:11* @description*/
@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RedisUtils redisUtils;Integer userId = 0;@GetMapping("/snapUp")public String snapUp(){// 模拟用户登录String goodsNo = "WZY1001";userId++;// 判断是否加锁 这个是5秒的锁while (!redisUtils.lock("lock_"+goodsNo, 5L)){// 加锁失败进入try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 定义结果 用于统一收集结果返回String result = "";// 判断用户是否已经抢购if (redisUtils.getStr("snap_up_record:"+goodsNo+":"+userId) != null){result = "用户"+userId+"已经抢购过了了.·.·商品"+goodsNo;}else {// 检查库存数量Integer total = Integer.parseInt(redisUtils.getStr("snap_up_goods:"+goodsNo).toString());if (total > 0){// 购买// 减库存total--;redisUtils.setStr("snap_up_goods:"+goodsNo, total.toString());// 插入购买记录redisUtils.setStr("snap_up_record:"+goodsNo+":"+userId, "1");// 抢到了result = "用户"+userId+"商品购买成功.·.·商品"+goodsNo;}else {//库存不足result = "商品"+ goodsNo +"库存不足";}}// 解锁redisUtils.unLock("lock_"+goodsNo);return result;}
}
运行结果
再重启之前先把redis中的所有数据清空
再次启动 项目和jmeter,刷新redis的可视化工具(多刷几次)。可能需要等待10s左右,因为加锁会影响性能。
代码解析
我们只是在上面代码的基础上再前后加个上锁
和解锁
。这个锁就是个全局的标识说明我能不能进入。
package com.hsh.utils;/*** Redis工具类,整合RedisTemplate和StringRedisTemplate* 提供针对字符串和通用对象的全面操作*/
@Component
public class RedisUtils {@AutowiredStringRedisTemplate stringRedisTemplate;@AutowiredRedisTemplate<Object, Object> redisTemplate;// .../*** 上锁* @param key* @param expire* @return*/public boolean lock(String key,Long expire){RedisConnection redisConnection=redisTemplate.getConnectionFactory().getConnection();//设置序列化方法redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new StringRedisSerializer());if(redisConnection.setNX(key.getBytes(),new byte[]{1})){redisTemplate.expire(key,expire,TimeUnit.SECONDS);redisConnection.close();return true;}else {redisConnection.close();return false;}}/*** 解锁方法* @param key*/public void unLock(String key){redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new StringRedisSerializer());redisTemplate.delete(key);}
}
RabbitMQ解决串行化问题
上面加锁虽然解决了问题,但是我们发现速度太慢了。
如果我们不是分布式还用分布式锁,会影响性能,此时我们就可使用RabbitMQ来解决。
我们定义controller
准备工作
引入RabbitMQ依赖
<!-- RabbitMQ依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加RabbitMQ配置
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/snap_up?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: rootredis:password: 123456port: 6379host: 127.0.0.1database: 0rabbitmq: # 添加RabbitMQ配置listener:simple:auto-startup: true # 启动时自动启动容器prefetch: 1 # 限流(消息者每次从队列中获取的消息数量)max-concurrency: 1 # 最大消费者数量concurrency: 1 # 最小消费者数量acknowledge-mode: manual # 手动反馈username: guestpassword: guesthost: 127.0.0.1port: 5672
mybatis-plus:mapper-locations: classpath:mappers/*.xml # 扫描mappers映射文件type-aliases-package: com.hsh.pojo # 扫描别名configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 打印sql到控制台map-underscore-to-camel-case: true # 开启驼峰映射
新建实体类
package com.hsh.pojo;
import lombok.Data;
import java.io.Serializable;
@Data
public class UserGoods implements Serializable {private Integer id;private String goodsNo;private Integer userId;
}
config
package com.hsh.config;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Beanpublic Queue snapUp() {System.out.println("chsuihi");Map<String, Object> map = new HashMap<>();// 指定消息队列长度map.put("x-max-length", 10);// 当队列满时,多余的消息直接拒绝接收,多余的消息被丢弃map.put("x-overflow", "reject-publish");return new Queue("snap_up",false,false,false,map);}// 注册 RabbitAdmin Bean
// @Bean
// public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
// return new RabbitAdmin(connectionFactory);
// }
}
编写controller
生产者
package com.hsh.controller;import com.hsh.pojo.UserGoods;
import com.hsh.utils.RedisUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author xrkhy* @date 2025/9/25 22:45* @description*/
@RestController
@RequestMapping("/rabbit")
public class RabbitController {@Autowiredprivate RabbitTemplate rabbitTemplate;Integer userId = 0;@GetMapping("/send")public String send(){userId++;UserGoods userGoods = new UserGoods();userGoods.setUserId(userId);userGoods.setGoodsNo("WZY1001");// 向队列发送消息rabbitTemplate.convertAndSend("snap_up", userGoods);return "放入队列排队成功.....";}
}
消费者
package com.hsh.controller;import com.hsh.pojo.UserGoods;
import com.hsh.utils.RedisUtils;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RabbitMQListener {@Autowiredprivate RedisUtils redisUtils;@RabbitListener(queuesToDeclare =@Queue("snap_up"))public void queueListener(Message message, Channel channel){try {UserGoods userGoods = (UserGoods) SerializationUtils.deserialize(message.getBody());System.out.println(userGoods);// 判断是否已经抢购if (redisUtils.getStr("snap_up_record:"+userGoods.getGoodsNo()+":"+userGoods.getUserId()) == null){// 查看储存Integer total = Integer.parseInt(redisUtils.getStr("snap_up_goods:"+userGoods.getGoodsNo()).toString());if (total > 0){total--;redisUtils.setStr("snap_up_goods:"+userGoods.getGoodsNo(), total.toString());// 插入购买记录redisUtils.setStr("snap_up_record:"+userGoods.getGoodsNo()+":"+userGoods.getUserId(), "1");System.out.println("用户"+userGoods.getUserId()+"购买商品"+userGoods.getGoodsNo() + "成功");}else {System.out.println("商品"+userGoods.getGoodsNo() + "库存不足");}}else {System.out.println("用户已经抢购过了"+userGoods.getGoodsNo()+"商品");}// Ack 确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();// Nack 报错重新入队try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} catch (IOException ex) {throw new RuntimeException(ex);}}}
}
运行结果
再重启之前先把redis中的所有数据清空
运行压力测试工具
结果如下