当前位置: 首页 > news >正文

Java万级并发场景-实战解决

今天我们来做一个典型的消费力度能达到万级别的并发场景,老师点名-学生签到

正常情况

正常情况来说是不同班级下的老师发布不同的点名--然后不同班级下的很多学生同一时间进行签到,签到成功就去修改数据库,签到失败就返回,但是这样的话 签到的学生一多,数据库修改每一行的内容,都会加上行锁,那么改的多了,数据库很可能出现卡顿的情况,导致学生明明在规定时间内签到了,但是却出现签到结束的情况,或者说出现其他的冗余签到的情况,这样显然是不希望我们看到的,也不希望学生看到

并发级处理

怎么解决前面的那种签到错误的场景呢?

那么当然就是传统级别的 面对并发情况下的重拳三连了哈哈哈

mysql-redis-rabbitMq

首先 我们这个业务需要怎么写?

redis的key怎么选择,学生的key怎么选都是一个问题,下面我们来一一的进行分析

MySQL表的业务数据关联

因为我们是测试demo,所以我们只做出了关键的表结构关联,像老师表我们是没有做的

看上图,首先我们最顶部有一个课程表,写的有一个课程id和名称,还有还有学生表,学生表和课程表之间有一个中间的表关联,叫学生课程表(student-courses),然后我们老师点名的时候是属于课堂活动表,里面记录的课堂的活动,比如点名和提问,这个表(class_activities)与课程表关联,最后的是每一个学生在该课程下的做出的课堂活动,也就是学生活动表(student-activities),她关联了学生表,课堂活动表和课程表。

主要流程

老师发布点名,然后课堂互动表记录一条会过期的课堂活动,状态是进行中,然后学生签到,签到之后,找到该课程下的该签到过的学生,像学生活动表中添加一条签到过的数据

Redis业务

在redis方面,我们主要做的就是对学生签到数据的存储,对老师发布的签到数据的存储

我们知道 redis的string的数据类型是比较占用空间的,所以对于我们单个的老师发布的签到数据,我们可以用string类型,对于不同班级下的多个学生的签到情况,我们可以用hash结构 ,因为对于ihash结构,我们的数据一般是使用ziplist压缩,更省空间

RabbitMQ业务

我们mq主要做的就是读取redis中的签到过的学生数据,然后把学生数据做一个异步写入mysql,这样减缓签到高峰时段mysql的压力

我们mq首先从redis中查到签到过的学生数据,然后跟该课程下的学生数据做对比,如果该课程下学生有数据,redis中学生签到无数据,那么该学生就是未签到

如果签到,就把签到数据存入数据库

总体代码

老师点名-学生签到

package com.example.tabledemo.controller;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.CourseService;
import com.example.tabledemo.pojo.Result;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.CourseEntity;
import com.example.tabledemo.pojo.request.ClassActivitiesRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;

import static cn.hutool.core.date.DateTime.now;

/**
 * @Author: wyz
 * @Date: 2025-04-08-16:17
 * @Description:课堂活动
 */
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/class/activities")
public class ClassActivitiesController {
    private final ClassActivitiesService classActivitiesService;
    private final CourseService courseService;
    private final StringRedisTemplate redisTemplate;
    private final RabbitTemplate rabbitTemplate;

    /**
     * 老师点名
     */
    @PostMapping("/teacher/rollCall")
    public Result teacherRollCall(@RequestBody ClassActivitiesRequest.TeacherRollCall teacherRollCall) {
        //判断是否有课程
        CourseEntity course = courseService.getById(teacherRollCall.getCourseId());
        if (Objects.isNull(course)) {
            return Result.fail("没有该课程");
        }
        //查看该课程下是否有点名活动
        LambdaQueryWrapper<ClassActivitiesEntity> eq = Wrappers.lambdaQuery(ClassActivitiesEntity.class)
                .eq(ClassActivitiesEntity::getCourseId, teacherRollCall.getCourseId())
                .eq(ClassActivitiesEntity::getActiveType, 1)
                .eq(ClassActivitiesEntity::getActiveStatus, 0);
        ClassActivitiesEntity one = classActivitiesService.getOne(eq);
        if(!Objects.isNull(one)){
            return Result.fail("该课程已存在点名,请勿重复点名");
        }

        //生成签到码
       //
        // String signCode = RandomUtil.randomNumbers(4);
        String signCode = "1234";

        ClassActivitiesEntity classActivitiesEntity = new ClassActivitiesEntity();
        classActivitiesEntity.setCourseId(teacherRollCall.getCourseId());
        // 获取当前时间
        DateTime now = now();
        classActivitiesEntity.setStartTime(now);
        // 使用Calendar计算未来时间
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(now);
        calendar.add(Calendar.SECOND, teacherRollCall.getSignSeconds());
        Date endTime = calendar.getTime();
        classActivitiesEntity.setEndTime(endTime);
        classActivitiesEntity.setActiveType(1);
        classActivitiesEntity.setActiveStatus(0);
        //课堂活动存入数据库
        boolean save = classActivitiesService.save(classActivitiesEntity);
        //redis中生成签到码的key
        String signCodeKey = "sign_" + teacherRollCall.getCourseId() + "_" + signCode;
            redisTemplate.opsForValue().set(signCodeKey, signCode);
            //发给rabbitmq 延迟队列 让延迟队列处理 最终的签到情况
            //1. 学生查看课堂的活动的信息 应该在 课堂活动表中查看
            //2. 延迟队列处理 签到结束后的情况
            HashMap<Object, Object> map = new HashMap<>();

            map.put("course_id", teacherRollCall.getCourseId());
            map.put("class_activities_id", classActivitiesEntity.getId());
            map.put("sign_code", signCode);
            rabbitTemplate.convertAndSend(RabbitConfig.ROLL_CALL_DEAD_EXCHANGE, RabbitConfig.ROLL_CALL_DEAD_ROUTING_KEY, map, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {

                    message.getMessageProperties().setDelay(teacherRollCall.getSignSeconds()*1000);
                    return message;
                }
            });
            return Result.success("发布签到成功",signCode);

    }


    /**
     * 学生签到
     */
    @PostMapping("/student/sign")
    public Result studentSign(@RequestBody ClassActivitiesRequest.StudentSign studentSign) {
        //判断该学生是否在班级当中
        //这里我们不判断 知道就行
        String signCodeKey = "sign_" + studentSign.getCourseId() + "_" + studentSign.getSignCode();
        //不为空 证明有该签到
        String signCode = redisTemplate.opsForValue().get(signCodeKey);
        if (!Objects.isNull(signCode)) {
            if (!signCode.equals(studentSign.getSignCode())) {
                return Result.fail("签到码错误,签到失败");
            }
            //学生签到key
            String studentSignKey="student_sign_"+studentSign.getStudentId();
            if(redisTemplate.opsForHash().hasKey("h"+signCodeKey,studentSignKey)){
                return  Result.fail("您已经签到成功,请勿重复签到");
            }
            //value正常应该是 签到时间  我们换成签到码
            redisTemplate.opsForHash().put("h"+signCodeKey,studentSignKey,signCode);
            return Result.success("签到成功");
        } else {
            return Result.fail("签到已过期或已被删除");
        }


    }
}

mq配置

package com.example.tabledemo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: wyz
 * @Date: 2025-04-08-17:19
 * @Description:
 */
@Configuration
public class RabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        // 定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        return jackson2JsonMessageConverter;
    }

//    //点名延迟交换机
//    public static final String ROLL_CALL_EXCHANGE = "roll_call_exchange";
//    //点名延迟队列
//    public static final String ROLL_CALL_QUEUE = "roll_call_queue";
    //点名死信交换机
    public static final String ROLL_CALL_DEAD_EXCHANGE = "roll_call_dead_exchange";
    //点名死信队列
    public static final String ROLL_CALL_DEAD_QUEUE = "roll_call_dead_queue";
    public static final String ROLL_CALL_DEAD_ROUTING_KEY = "roll_call";

    /**
     * 绑定 点名消息队列 -> 点名私信交换机->点名私信队列
     *
     * @return
     */
//    @Bean
//    public Queue bindMsgDeadQueue() {
//        return QueueBuilder.durable(ROLL_CALL_QUEUE)
//                .deadLetterExchange(ROLL_CALL_DEAD_EXCHANGE)
//                .deadLetterRoutingKey(ROLL_CALL_DEAD_ROUTING_KEY)
//
//                .build();
//    }
//
//
//
//
//    /**
//     * 声明点名交换机
//     */
//    @Bean
//    Exchange rollCallExchange() {
//        return ExchangeBuilder.directExchange(ROLL_CALL_EXCHANGE)
//                .durable(true)
//                .build();
//    }
//
//    /**
//     * 绑定 点名 交换机队列
//     */
//    @Bean
//    Binding bingingRollCallExchangeQueue() {
//        return BindingBuilder.bind(bindMsgDeadQueue())
//                .to(rollCallExchange())
//                .with(ROLL_CALL_DEAD_ROUTING_KEY).noargs();
//    }

    /**
     * 声明点名死信队列
     */

    @Bean
    Queue rollCallDeadQueue() {
        return QueueBuilder.durable(ROLL_CALL_DEAD_QUEUE).build();
    }

    /**
     * 声明点名 死信交换机
     */
    @Bean
    Exchange rollCallDeadExchange() {
        return ExchangeBuilder.directExchange(ROLL_CALL_DEAD_EXCHANGE)
                .delayed()
                .durable(true)
                .build();
    }

    /**
     * 绑定点名 私信交换机队列
     */
    @Bean
    Binding bindingRollCallExchangeQueue() {
        return BindingBuilder
                .bind(rollCallDeadQueue())
                .to(rollCallDeadExchange())
                .with(ROLL_CALL_DEAD_ROUTING_KEY)
                .noargs();
    }

}

消费者配置

package com.example.tabledemo.consumer;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.StudentActivitiesService;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.StudentActivitiesEntity;
import com.example.tabledemo.student.StudentCoursesEntity;
import com.example.tabledemo.student.service.StudentCoursesService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;


import static java.time.LocalTime.now;

/**
 * @Author: wyz
 * @Date: 2025-04-08-20:40
 * @Description:处理学生签到的消费者
 */
@Component
@Slf4j
public class SignConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private StudentCoursesService studentCoursesService;
    @Autowired
    private ClassActivitiesService classActivitiesService;
    @Autowired
    private StudentActivitiesService studentActivitiesService;


    @RabbitListener(queues = RabbitConfig.ROLL_CALL_DEAD_QUEUE)
    @RabbitHandler// 直接引用队列名
    public void studentSignConsumer(HashMap<Object, Object> map, Channel channel, Message message) throws IOException {
        try {

            log.info(now() + "----------老师点名延迟消息处理开始----------");
            //解析消息
            Integer courseId = (Integer) map.get("course_id");
            Integer classActivitiesId = (Integer) map.get("class_activities_id");
            String signCode = (String) map.get("sign_code");
            //业务幂等性判断

            ClassActivitiesEntity byId = classActivitiesService.getById(classActivitiesId);
            //证明已经消费过了 本来是额外存的这里 只用状态判断
            if(byId.getActiveStatus()==1){
                    return;
            }

           //拿到redis中的学生签到数据
            String signCodeKey = "sign_" + courseId + "_" + signCode;
            Map<Object, Object> studentSignMap = redisTemplate.opsForHash().entries("h" + signCodeKey);

            //课堂活动状态改为已经结束
            LambdaUpdateWrapper<ClassActivitiesEntity> eq1 = Wrappers.lambdaUpdate(ClassActivitiesEntity.class)
                    .set(ClassActivitiesEntity::getActiveStatus, 1)
                    .eq(ClassActivitiesEntity::getId, classActivitiesId);
            classActivitiesService.update(eq1);
            //学生签到key
            //String studentSignKey="student_sign_"+studentSign.getStudentId();
            List<Integer> studentSignIdList = studentSignMap.entrySet().stream()
                    .map(i -> {
                        String studentSignKey = (String) i.getKey();
                        log.info("学生信息为{}", studentSignKey);
                        Integer studentId = Integer.valueOf(studentSignKey.split("_")[2]);
                        log.info("学生id为{}", studentId);
                        return studentId;
                    }).collect(Collectors.toList());
            //查出该课程下 的所有学生id
            LambdaQueryWrapper<StudentCoursesEntity> eq = Wrappers.lambdaQuery(StudentCoursesEntity.class)
                    .eq(StudentCoursesEntity::getCourseId, courseId);
            List<StudentCoursesEntity> list = studentCoursesService.list(eq);
            List<Integer> studentIds = list.stream().map(i -> i.getStudentId()).collect(Collectors.toList());
            //正常是 会有课程状态 课程结课什么的 ,这里我们模拟 不做处理
            ArrayList<StudentActivitiesEntity> studentActivitiesEntities = new ArrayList<>();
            studentIds.stream()
                    .forEach(studentId -> {
                        StudentActivitiesEntity studentActivitiesEntity = new StudentActivitiesEntity();
                        studentActivitiesEntity.setStudentId(studentId);
                        studentActivitiesEntity.setClassActivitiesId(classActivitiesId);
                        studentActivitiesEntity.setCourseId(courseId);
                        studentActivitiesEntity.setStudentActivitiesStatus(0);
                        if (studentSignIdList.contains(studentId)) {
                            log.info("有学生签到了");
                            studentActivitiesEntity.setStudentActivitiesStatus(1);
                        }
                        studentActivitiesEntities.add(studentActivitiesEntity);
                    });
            //构建学生活动表的数据
            studentActivitiesService.saveBatch(studentActivitiesEntities);
            //删除redis数据
            redisTemplate.delete(signCodeKey);
            redisTemplate.delete("h" + signCodeKey);

            //true 和false 代表着 是否 确认该条消息之前的  true 是确认  false 不确认
            // 假设队列中有消息 deliveryTag=5,6,7  现在是6
            // 结果:仅消息6被确认删除,消息5和7仍在队列中
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info(now() + "----------老师点名延迟消息处理结束----------");
        } catch (Exception e) {
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if (redelivered) {
                log.info(now() + "----------老师点名延迟消息处理异常,已被重新投递,丢弃消息----------");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                log.info(now() + "----------老师点名延迟消息处理异常,消息重新投递----------");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            throw e;
        }
    }
}

测试流程

接口测试

jmeter 压测

数据库数据查看 

可见 已经测试成功了 

相关文章:

  • AI大模型原理可视化工具:深入浅出理解大语言模型的工作原理
  • 机器学习02——RNN
  • 【2025年五一数学建模竞赛A题】完整思路和代码
  • 代码随想录动态规划part02
  • 【信息系统项目管理师】高分论文:论信息系统项目的范围管理(电网公司保供电可视化系统)
  • 图像处理算法面经1
  • 产品需求设计评审会:三步精准定位需求核心
  • std::enable_shared_from_this 模板类的作用是什么?
  • KEGG注释脚本kofam2kegg.py--脚本010
  • 小程序页面传值的多种方式
  • SQL语言
  • 力扣hot100_技巧_python版本
  • Multisim使用说明详尽版--(2025最新版)
  • 高效爬虫:一文掌握 Crawlee 的详细使用(web高效抓取和浏览器自动化库)
  • CS5346 - Interactivity in Visualization 可视化中的交互
  • Java 架构设计:从单体架构到微服务的转型之路
  • 大语言模型深度思考与交互增强
  • 策略模式随笔~
  • 适合单片机裸机环境的运行的软件定时器框架
  • Linux 下 Module 工具的介绍与使用
  • 解放日报:这是一场需要定力和实力的“科技长征”
  • 逛了6个小时的上海车展。有些不太成熟的感受。与你分享。
  • 对谈|李钧鹏、周忆粟:安德鲁·阿伯特过程社会学的魅力
  • 王毅出席金砖国家外长会晤
  • 坚守刑事检察一线13年,“在我心中每次庭审都是一次大考”
  • 人民日报:光荣属于每一个挺膺担当的奋斗者