Java万级并发场景-实战解决
今天我们来做一个典型的消费力度能达到万级别的并发场景,老师点名-学生签到
正常情况
正常情况来说是不同班级下的老师发布不同的点名--然后不同班级下的很多学生同一时间进行签到,签到成功就去修改数据库,签到失败就返回,但是这样的话 签到的学生一多,数据库修改每一行的内容,都会加上行锁,那么改的多了,数据库很可能出现卡顿的情况,导致学生明明在规定时间内签到了,但是却出现签到结束的情况,或者说出现其他的冗余签到的情况,这样显然是不希望我们看到的,也不希望学生看到
并发级处理
怎么解决前面的那种签到错误的场景呢?
那么当然就是传统级别的 面对并发情况下的重拳三连了哈哈哈
mysql-redis-rabbitMq
首先 我们这个业务需要怎么写?
redis的key怎么选择,学生的key怎么选都是一个问题,下面我们来一一的进行分析
MySQL表的业务数据关联
因为我们是测试demo,所以我们只做出了关键的表结构关联,像老师表我们是没有做的
看上图,首先我们最顶部有一个课程表,写的有一个课程id和名称,还有还有学生表,学生表和课程表之间有一个中间的表关联,叫学生课程表(student-courses),然后我们老师点名的时候是属于课堂活动表,里面记录的课堂的活动,比如点名和提问,这个表(class_activities)与课程表关联,最后的是每一个学生在该课程下的做出的课堂活动,也就是学生活动表(student-activities),她关联了学生表,课堂活动表和课程表。
主要流程
老师发布点名,然后课堂互动表记录一条会过期的课堂活动,状态是进行中,然后学生签到,签到之后,找到该课程下的该签到过的学生,像学生活动表中添加一条签到过的数据
Redis业务
在redis方面,我们主要做的就是对学生签到数据的存储,对老师发布的签到数据的存储
我们知道 redis的string的数据类型是比较占用空间的,所以对于我们单个的老师发布的签到数据,我们可以用string类型,对于不同班级下的多个学生的签到情况,我们可以用hash结构 ,因为对于ihash结构,我们的数据一般是使用ziplist压缩,更省空间
RabbitMQ业务
我们mq主要做的就是读取redis中的签到过的学生数据,然后把学生数据做一个异步写入mysql,这样减缓签到高峰时段mysql的压力
我们mq首先从redis中查到签到过的学生数据,然后跟该课程下的学生数据做对比,如果该课程下学生有数据,redis中学生签到无数据,那么该学生就是未签到
如果签到,就把签到数据存入数据库
总体代码
老师点名-学生签到
package com.example.tabledemo.controller;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.CourseService;
import com.example.tabledemo.pojo.Result;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.CourseEntity;
import com.example.tabledemo.pojo.request.ClassActivitiesRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;
import static cn.hutool.core.date.DateTime.now;
/**
* @Author: wyz
* @Date: 2025-04-08-16:17
* @Description:课堂活动
*/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/class/activities")
public class ClassActivitiesController {
private final ClassActivitiesService classActivitiesService;
private final CourseService courseService;
private final StringRedisTemplate redisTemplate;
private final RabbitTemplate rabbitTemplate;
/**
* 老师点名
*/
@PostMapping("/teacher/rollCall")
public Result teacherRollCall(@RequestBody ClassActivitiesRequest.TeacherRollCall teacherRollCall) {
//判断是否有课程
CourseEntity course = courseService.getById(teacherRollCall.getCourseId());
if (Objects.isNull(course)) {
return Result.fail("没有该课程");
}
//查看该课程下是否有点名活动
LambdaQueryWrapper<ClassActivitiesEntity> eq = Wrappers.lambdaQuery(ClassActivitiesEntity.class)
.eq(ClassActivitiesEntity::getCourseId, teacherRollCall.getCourseId())
.eq(ClassActivitiesEntity::getActiveType, 1)
.eq(ClassActivitiesEntity::getActiveStatus, 0);
ClassActivitiesEntity one = classActivitiesService.getOne(eq);
if(!Objects.isNull(one)){
return Result.fail("该课程已存在点名,请勿重复点名");
}
//生成签到码
//
// String signCode = RandomUtil.randomNumbers(4);
String signCode = "1234";
ClassActivitiesEntity classActivitiesEntity = new ClassActivitiesEntity();
classActivitiesEntity.setCourseId(teacherRollCall.getCourseId());
// 获取当前时间
DateTime now = now();
classActivitiesEntity.setStartTime(now);
// 使用Calendar计算未来时间
Calendar calendar = Calendar.getInstance();
calendar.setTime(now);
calendar.add(Calendar.SECOND, teacherRollCall.getSignSeconds());
Date endTime = calendar.getTime();
classActivitiesEntity.setEndTime(endTime);
classActivitiesEntity.setActiveType(1);
classActivitiesEntity.setActiveStatus(0);
//课堂活动存入数据库
boolean save = classActivitiesService.save(classActivitiesEntity);
//redis中生成签到码的key
String signCodeKey = "sign_" + teacherRollCall.getCourseId() + "_" + signCode;
redisTemplate.opsForValue().set(signCodeKey, signCode);
//发给rabbitmq 延迟队列 让延迟队列处理 最终的签到情况
//1. 学生查看课堂的活动的信息 应该在 课堂活动表中查看
//2. 延迟队列处理 签到结束后的情况
HashMap<Object, Object> map = new HashMap<>();
map.put("course_id", teacherRollCall.getCourseId());
map.put("class_activities_id", classActivitiesEntity.getId());
map.put("sign_code", signCode);
rabbitTemplate.convertAndSend(RabbitConfig.ROLL_CALL_DEAD_EXCHANGE, RabbitConfig.ROLL_CALL_DEAD_ROUTING_KEY, map, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(teacherRollCall.getSignSeconds()*1000);
return message;
}
});
return Result.success("发布签到成功",signCode);
}
/**
* 学生签到
*/
@PostMapping("/student/sign")
public Result studentSign(@RequestBody ClassActivitiesRequest.StudentSign studentSign) {
//判断该学生是否在班级当中
//这里我们不判断 知道就行
String signCodeKey = "sign_" + studentSign.getCourseId() + "_" + studentSign.getSignCode();
//不为空 证明有该签到
String signCode = redisTemplate.opsForValue().get(signCodeKey);
if (!Objects.isNull(signCode)) {
if (!signCode.equals(studentSign.getSignCode())) {
return Result.fail("签到码错误,签到失败");
}
//学生签到key
String studentSignKey="student_sign_"+studentSign.getStudentId();
if(redisTemplate.opsForHash().hasKey("h"+signCodeKey,studentSignKey)){
return Result.fail("您已经签到成功,请勿重复签到");
}
//value正常应该是 签到时间 我们换成签到码
redisTemplate.opsForHash().put("h"+signCodeKey,studentSignKey,signCode);
return Result.success("签到成功");
} else {
return Result.fail("签到已过期或已被删除");
}
}
}
mq配置
package com.example.tabledemo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: wyz
* @Date: 2025-04-08-17:19
* @Description:
*/
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter() {
// 定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return jackson2JsonMessageConverter;
}
// //点名延迟交换机
// public static final String ROLL_CALL_EXCHANGE = "roll_call_exchange";
// //点名延迟队列
// public static final String ROLL_CALL_QUEUE = "roll_call_queue";
//点名死信交换机
public static final String ROLL_CALL_DEAD_EXCHANGE = "roll_call_dead_exchange";
//点名死信队列
public static final String ROLL_CALL_DEAD_QUEUE = "roll_call_dead_queue";
public static final String ROLL_CALL_DEAD_ROUTING_KEY = "roll_call";
/**
* 绑定 点名消息队列 -> 点名私信交换机->点名私信队列
*
* @return
*/
// @Bean
// public Queue bindMsgDeadQueue() {
// return QueueBuilder.durable(ROLL_CALL_QUEUE)
// .deadLetterExchange(ROLL_CALL_DEAD_EXCHANGE)
// .deadLetterRoutingKey(ROLL_CALL_DEAD_ROUTING_KEY)
//
// .build();
// }
//
//
//
//
// /**
// * 声明点名交换机
// */
// @Bean
// Exchange rollCallExchange() {
// return ExchangeBuilder.directExchange(ROLL_CALL_EXCHANGE)
// .durable(true)
// .build();
// }
//
// /**
// * 绑定 点名 交换机队列
// */
// @Bean
// Binding bingingRollCallExchangeQueue() {
// return BindingBuilder.bind(bindMsgDeadQueue())
// .to(rollCallExchange())
// .with(ROLL_CALL_DEAD_ROUTING_KEY).noargs();
// }
/**
* 声明点名死信队列
*/
@Bean
Queue rollCallDeadQueue() {
return QueueBuilder.durable(ROLL_CALL_DEAD_QUEUE).build();
}
/**
* 声明点名 死信交换机
*/
@Bean
Exchange rollCallDeadExchange() {
return ExchangeBuilder.directExchange(ROLL_CALL_DEAD_EXCHANGE)
.delayed()
.durable(true)
.build();
}
/**
* 绑定点名 私信交换机队列
*/
@Bean
Binding bindingRollCallExchangeQueue() {
return BindingBuilder
.bind(rollCallDeadQueue())
.to(rollCallDeadExchange())
.with(ROLL_CALL_DEAD_ROUTING_KEY)
.noargs();
}
}
消费者配置
package com.example.tabledemo.consumer;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.StudentActivitiesService;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.StudentActivitiesEntity;
import com.example.tabledemo.student.StudentCoursesEntity;
import com.example.tabledemo.student.service.StudentCoursesService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static java.time.LocalTime.now;
/**
* @Author: wyz
* @Date: 2025-04-08-20:40
* @Description:处理学生签到的消费者
*/
@Component
@Slf4j
public class SignConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private StudentCoursesService studentCoursesService;
@Autowired
private ClassActivitiesService classActivitiesService;
@Autowired
private StudentActivitiesService studentActivitiesService;
@RabbitListener(queues = RabbitConfig.ROLL_CALL_DEAD_QUEUE)
@RabbitHandler// 直接引用队列名
public void studentSignConsumer(HashMap<Object, Object> map, Channel channel, Message message) throws IOException {
try {
log.info(now() + "----------老师点名延迟消息处理开始----------");
//解析消息
Integer courseId = (Integer) map.get("course_id");
Integer classActivitiesId = (Integer) map.get("class_activities_id");
String signCode = (String) map.get("sign_code");
//业务幂等性判断
ClassActivitiesEntity byId = classActivitiesService.getById(classActivitiesId);
//证明已经消费过了 本来是额外存的这里 只用状态判断
if(byId.getActiveStatus()==1){
return;
}
//拿到redis中的学生签到数据
String signCodeKey = "sign_" + courseId + "_" + signCode;
Map<Object, Object> studentSignMap = redisTemplate.opsForHash().entries("h" + signCodeKey);
//课堂活动状态改为已经结束
LambdaUpdateWrapper<ClassActivitiesEntity> eq1 = Wrappers.lambdaUpdate(ClassActivitiesEntity.class)
.set(ClassActivitiesEntity::getActiveStatus, 1)
.eq(ClassActivitiesEntity::getId, classActivitiesId);
classActivitiesService.update(eq1);
//学生签到key
//String studentSignKey="student_sign_"+studentSign.getStudentId();
List<Integer> studentSignIdList = studentSignMap.entrySet().stream()
.map(i -> {
String studentSignKey = (String) i.getKey();
log.info("学生信息为{}", studentSignKey);
Integer studentId = Integer.valueOf(studentSignKey.split("_")[2]);
log.info("学生id为{}", studentId);
return studentId;
}).collect(Collectors.toList());
//查出该课程下 的所有学生id
LambdaQueryWrapper<StudentCoursesEntity> eq = Wrappers.lambdaQuery(StudentCoursesEntity.class)
.eq(StudentCoursesEntity::getCourseId, courseId);
List<StudentCoursesEntity> list = studentCoursesService.list(eq);
List<Integer> studentIds = list.stream().map(i -> i.getStudentId()).collect(Collectors.toList());
//正常是 会有课程状态 课程结课什么的 ,这里我们模拟 不做处理
ArrayList<StudentActivitiesEntity> studentActivitiesEntities = new ArrayList<>();
studentIds.stream()
.forEach(studentId -> {
StudentActivitiesEntity studentActivitiesEntity = new StudentActivitiesEntity();
studentActivitiesEntity.setStudentId(studentId);
studentActivitiesEntity.setClassActivitiesId(classActivitiesId);
studentActivitiesEntity.setCourseId(courseId);
studentActivitiesEntity.setStudentActivitiesStatus(0);
if (studentSignIdList.contains(studentId)) {
log.info("有学生签到了");
studentActivitiesEntity.setStudentActivitiesStatus(1);
}
studentActivitiesEntities.add(studentActivitiesEntity);
});
//构建学生活动表的数据
studentActivitiesService.saveBatch(studentActivitiesEntities);
//删除redis数据
redisTemplate.delete(signCodeKey);
redisTemplate.delete("h" + signCodeKey);
//true 和false 代表着 是否 确认该条消息之前的 true 是确认 false 不确认
// 假设队列中有消息 deliveryTag=5,6,7 现在是6
// 结果:仅消息6被确认删除,消息5和7仍在队列中
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info(now() + "----------老师点名延迟消息处理结束----------");
} catch (Exception e) {
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (redelivered) {
log.info(now() + "----------老师点名延迟消息处理异常,已被重新投递,丢弃消息----------");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
log.info(now() + "----------老师点名延迟消息处理异常,消息重新投递----------");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
throw e;
}
}
}
测试流程
接口测试
jmeter 压测
数据库数据查看
可见 已经测试成功了