当前位置: 首页 > news >正文

Spring Boot+Redis Zset:三步构建高可靠延迟队列系统

系统设计

架构图

+----------------+       +-----------------+       +----------------+
|                |       |                 |       |                |
|  生产者        |------>| Redis ZSet      |------>| 定时任务消费者  |
|  (添加延迟任务) |       | (延迟队列存储)  |       | (扫描并处理任务)|
|                |       |                 |       |                |
+----------------+       +-----------------+       +----------------+↑                                              ||                                              ↓|                                   +---------------------++-----------------------------------|     任务处理器       || (执行具体业务逻辑)   |+---------------------+

核心流程

  1. 生产者将任务添加到Redis ZSet中,score为任务执行时间戳

  2. 定时任务定期扫描ZSet,找出score小于当前时间的任务

  3. 消费者线程池处理到期的任务

  4. 任务处理完成后从ZSet中移除

实现步骤

步骤一:添加依赖(pom.xml)

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Redis集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 定时任务 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok简化代码 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

步骤二:配置Redis(application.yml)

spring:redis:host: localhostport: 6379password: database: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 2max-wait: 10000ms# 自定义延迟队列配置
delay:queue:key: "delay_queue"  # Redis ZSet键名batch-size: 10      # 每次处理任务数量interval: 5000      # 定时任务执行间隔(ms)thread-pool-size: 5 # 消费者线程池大小

步骤三:创建任务模型(DelayTask.java)

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayTask {/*** 任务类型枚举*/public enum TaskType {ORDER_TIMEOUT,   // 订单超时处理EMAIL_REMINDER,  // 邮件提醒TASK_EXECUTION   // 定时任务执行}private TaskType type;    // 任务类型private String taskId;    // 任务唯一IDprivate String content;   // 任务内容private long createTime;  // 任务创建时间private long executeTime; // 任务执行时间// 重写toString方法用于序列化@Overridepublic String toString() {return "DelayTask{" +"type=" + type +", taskId='" + taskId + '\'' +", content='" + content + '\'' +", createTime=" + createTime +", executeTime=" + executeTime +'}';}
}

步骤四:创建Redis配置类(RedisConfig.java)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用Jackson序列化GenericJackson2JsonRedisSerializer jacksonSerializer = new GenericJackson2JsonRedisSerializer();// Key序列化template.setKeySerializer(new StringRedisSerializer());// Value序列化template.setValueSerializer(jacksonSerializer);// Hash Key序列化template.setHashKeySerializer(new StringRedisSerializer());// Hash Value序列化template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}

步骤五:创建线程池配置类(ThreadPoolConfig.java)

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class ThreadPoolConfig {@Value("${delay.queue.thread-pool-size:5}")private int threadPoolSize;@Bean("taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数executor.setCorePoolSize(threadPoolSize);// 最大线程数executor.setMaxPoolSize(threadPoolSize * 2);// 队列大小executor.setQueueCapacity(100);// 线程名前缀executor.setThreadNamePrefix("delay-task-");// 拒绝策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}

步骤六:创建延迟队列服务(DelayQueueService.java)

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.Executor;@Slf4j
@Service
public class DelayQueueService {@Value("${delay.queue.key}")private String delayQueueKey;@Value("${delay.queue.batch-size}")private int batchSize;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate Executor taskExecutor;/*** 添加延迟任务* @param task 任务对象* @param delaySeconds 延迟秒数*/public void addTask(DelayTask task, long delaySeconds) {long executeTime = System.currentTimeMillis() + (delaySeconds * 1000);task.setExecuteTime(executeTime);// 添加到Redis ZSetredisTemplate.opsForZSet().add(delayQueueKey, task, executeTime);log.info("添加延迟任务成功, 任务ID: {}, 执行时间: {}", task.getTaskId(), executeTime);}/*** 定时扫描任务(每5秒执行一次)*/@Scheduled(fixedRateString = "${delay.queue.interval}")public void scanExpiredTasks() {long now = System.currentTimeMillis();log.debug("开始扫描延迟队列, 当前时间: {}", now);// 获取当前时间之前的所有任务Set<ZSetOperations.TypedTuple<Object>> tasks = redisTemplate.opsForZSet().rangeByScoreWithScores(delayQueueKey, 0, now, 0, batchSize);if (tasks == null || tasks.isEmpty()) {log.debug("未找到待处理任务");return;}log.info("发现 {} 个待处理任务", tasks.size());for (ZSetOperations.TypedTuple<Object> tuple : tasks) {Object taskObj = tuple.getValue();if (taskObj instanceof DelayTask) {DelayTask task = (DelayTask) taskObj;// 使用线程池异步处理任务taskExecutor.execute(() -> processTask(task));}}}/*** 处理任务* @param task 延迟任务*/@Asyncpublic void processTask(DelayTask task) {try {log.info("开始处理任务: {}", task.getTaskId());// 根据任务类型执行不同逻辑switch (task.getType()) {case ORDER_TIMEOUT:handleOrderTimeout(task);break;case EMAIL_REMINDER:sendReminderEmail(task);break;case TASK_EXECUTION:executeScheduledTask(task);break;default:log.warn("未知任务类型: {}", task.getType());}// 处理完成后从队列中移除redisTemplate.opsForZSet().remove(delayQueueKey, task);log.info("任务处理完成并移除: {}", task.getTaskId());} catch (Exception e) {log.error("任务处理失败: {}", task.getTaskId(), e);handleProcessingError(task);}}// 示例:订单超时处理private void handleOrderTimeout(DelayTask task) {log.info("处理订单超时任务: {}", task.getContent());// 实际业务逻辑:取消订单、释放库存等// 模拟处理时间try {Thread.sleep(1000);} catch (InterruptedException ignored) {}}// 示例:发送提醒邮件private void sendReminderEmail(DelayTask task) {log.info("发送提醒邮件: {}", task.getContent());// 实际业务逻辑:调用邮件服务发送邮件}// 示例:执行定时任务private void executeScheduledTask(DelayTask task) {log.info("执行定时任务: {}", task.getContent());// 实际业务逻辑:执行定时任务}// 错误处理private void handleProcessingError(DelayTask task) {log.error("任务处理失败,加入死信队列: {}", task.getTaskId());// 可以将失败任务移到死信队列redisTemplate.opsForList().rightPush("delay:dead-letter", task);}
}

步骤七:创建测试Controller(DelayQueueController.java)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/delay")
public class DelayQueueController {@Autowiredprivate DelayQueueService delayQueueService;/*** 添加延迟任务* @param type 任务类型 (1-订单超时, 2-邮件提醒, 3-定时任务)* @param seconds 延迟秒数* @param content 任务内容*/@PostMapping("/add")public String addDelayTask(@RequestParam("type") int type,@RequestParam("seconds") long seconds,@RequestParam("content") String content) {// 创建任务IDString taskId = "TASK-" + System.currentTimeMillis();// 转换任务类型DelayTask.TaskType taskType;switch (type) {case 1: taskType = DelayTask.TaskType.ORDER_TIMEOUT; break;case 2: taskType = DelayTask.TaskType.EMAIL_REMINDER; break;case 3: taskType = DelayTask.TaskType.TASK_EXECUTION; break;default: throw new IllegalArgumentException("无效的任务类型");}// 创建任务DelayTask task = new DelayTask(taskType, taskId, content, System.currentTimeMillis(), 0);// 添加任务delayQueueService.addTask(task, seconds);return "任务添加成功! ID: " + taskId;}/*** 查看队列状态*/@GetMapping("/status")public String queueStatus() {long size = delayQueueService.getQueueSize();return "当前延迟队列任务数量: " + size;}
}

启动类(DelayQueueApplication.java)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 启用定时任务
@EnableAsync     // 启用异步方法
public class DelayQueueApplication {public static void main(String[] args) {SpringApplication.run(DelayQueueApplication.class, args);}
}

方案优势与注意事项

优势

  1. 高性能:利用Redis内存操作和ZSet有序特性

  2. 低延迟:定时任务扫描保证任务及时处理

  3. 高可靠:任务处理失败可进入死信队列

  4. 可扩展:线程池支持并行处理多个任务

  5. 灵活配置:支持批量处理大小、扫描间隔等参数配置

注意事项

  1. 任务幂等性:确保任务可重复处理而不产生副作用

  2. 任务超时处理:长时间任务需考虑超时机制

  3. Redis持久化:根据业务需求配置RDB或AOF

  4. 分布式环境:多实例部署时需考虑任务竞争问题

  5. 监控告警:添加队列积压监控和任务失败告警

扩展建议

  1. 添加管理界面

    • 查看队列中的任务

    • 手动重试失败任务

    • 统计任务处理成功率

  2. 分布式锁优化

    // 在scanExpiredTasks方法中
    String lockKey = "delay_queue_lock";
    Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.SECONDS);if (lockAcquired != null && lockAcquired) {try {// 执行扫描任务逻辑} finally {redisTemplate.delete(lockKey);}
    }

  3. 任务优先级支持

    // 在添加任务时,可将优先级加入score计算
    double score = executeTime + (priority * 0.001);
  4. 延迟时间精确控制

    • 使用Redisson的DelayedQueue组件

    • 或使用Redis的Keyspace通知功能

这个实现方案提供了一个完整、可扩展的延迟队列系统,适用于订单超时处理、定时提醒、延迟任务执行等多种业务场景。

http://www.dtcms.com/a/292868.html

相关文章:

  • 博物馆智慧导览系统AR交互与自动感应技术:从虚实融合到智能讲解的技术实践
  • Kubernetes调度器
  • 数据结构 堆(2)---堆的实现
  • 第三章 Freertos物联网实战esp8266模块
  • MySQL 学习一 存储结构和log
  • JDBC编程
  • 刀客doc:Netflix与YouTube开始在广告战场正面交锋
  • 数组——初识数据结构
  • 算法第26天|贪心算法:用最少数量的箭引爆气球、无重叠区间、划分字母区间
  • 35.安卓逆向2-frida hook技术-过root检测
  • 元宇宙游戏与VR的关联性及发展分析(截至2025年7月)
  • 【Spring拦截器实战】路径拦截与访问控制系统设计
  • MybatisPlus入门指南
  • SonarQube 代码分析工具
  • docker 中安装 ONLYOFFICE 服务
  • C++基础学习——文件操作详解
  • netframe4.5 的mvc 框架 layui 组件的引用
  • 模运算常见定律
  • .net 警告【代码 CS1998】此异步方法缺少 “await“ 运算符,将以同步方式运行。
  • Linux命令集锦-个人整理(偏向进程和端口的查询)
  • CS231n-2017 Lecture5卷积神经网络笔记
  • 如何把jar包打成docker镜像(SpringBoot项目打包成Docker )部署到Linux
  • CMOS知识点 离子注入工艺
  • OpenCV Mat UMat GpuMat Matx HostMem InputArray等设计哲学
  • Arduino学习笔记【快速入门】
  • 蓝牙通信架构(Bluetooth/BLE)
  • Windows系统暂停更新工具
  • 每日面试题12:JVM垃圾回收机制
  • 分布式数据库中间件ShardingSphere
  • Unity UI的未来之路:从UGUI到UI Toolkit的架构演进与特性剖析(1)