当前位置: 首页 > news >正文

Redission 实现延迟队列

前言

之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。

参考文章

【redis缓存】怎么使用 Redis 实现一个延时队列?_redis实现延时队列-CSDN博客

Redisson 的延迟队列真的能用吗?一文看透原理 + 坑点_redission延时队列原理-CSDN博客

Spring Boot 集成 Redisson 实现消息队列_springboot redis消息队列-CSDN博客


引入相关依赖

         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.3</version></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>2.13.5</version></dependency>

redission配置类

@Configuration
@Slf4j
public class RedissonConfig {private final static String REDISSON_PREFIX = "redis://";@ResourceRedisProperties redisProperties;@Beanpublic RedissonClient redissonClient() {Config config = new Config();String url = REDISSON_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort();config.useSingleServer().setAddress(url).setPassword(redisProperties.getPassword()).setDatabase(redisProperties.getDatabase()).setPingConnectionInterval(2000);  //设置2秒心跳间隔config.setLockWatchdogTimeout(10000L);     //看门狗超时缩短(分布式锁自动续期更灵敏)显式设置为10秒try {return Redisson.create(config);} catch (Exception e) {log.error("RedissonClient init redis url:{}Exception:{}", url, e.getMessage());return null;}}
}

延迟队列配置类

相关的配置文件

server:port: ${SERVER_PORT:9211}# Spring
spring:application:# 应用名称name: ruoyi-redis-msg2redis:host: localhostport: 6379password: 123456mvc:pathmatch:matching-strategy: ant_path_matcher# 是否启用redis延迟队列
redission:delayqueue:enable: truequeues:- queueName: goodsDelayQueuebeanId: goodsDelayConsumeHandlerdesc: 商品消费的延迟队列- queueName: orderDelayQueuebeanId: orderDelayConsumeHandlerdesc: 订单消费的延迟队列
@Configuration
@ConfigurationProperties(prefix = "redission.delayqueue")
@Slf4j
@Data
public class RedisDelayQueueConfigProperties {private List<QueueConfig> queues;@Datapublic static class QueueConfig {private String queueName;private String desc;private String beanId;}
}

上面的配置文件中配置了2个重要参数

  1. 队列名
  2. 每个队列的处理类

下面的配置类,会为每个队列创建一个新的线程并且循环运行,阻塞监听队列(当队列有元素则take获取,如果没有则阻塞等待)

/*** @Description: redis延迟队列配置*/
@Configuration
@ConditionalOnProperty(value = "redission.delayqueue.enable")
@Slf4j
public class RedisDelayQueueConfig {@Autowiredprivate RedisDelayQueueConfigProperties configProperties;/*** @Description 线程池*/@Bean("delayExecutor")public ExecutorService getDelayExecutor() {return Executors.newFixedThreadPool(5);}@Beanpublic List<RedisDelayQueueConfigProperties.QueueConfig> startRedisDelayQueue(ApplicationContext applicationContext,RedissonClient redissonClient,@Qualifier("delayExecutor") ExecutorService executorService) {//根据配置的队列创建对应的线程,1个队列对应1个线程List<RedisDelayQueueConfigProperties.QueueConfig> queueConfigs = configProperties.getQueues();for (RedisDelayQueueConfigProperties.QueueConfig queueConfig : queueConfigs) {startThread(applicationContext, redissonClient, executorService, queueConfig);}return queueConfigs;}private <T> void startThread(ApplicationContext applicationContext,RedissonClient redissonClient,ExecutorService executorService,RedisDelayQueueConfigProperties.QueueConfig queueConfig) {//redissonClient获取阻塞队列RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueConfig.getQueueName());// 由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {log.info("启动监听队列线程:{}", queueConfig.getQueueName());while (true) {try {// 获取到执行类RedisDelayQueueHandler redisDelayQueueHandler = applicationContext.getBean(queueConfig.getBeanId(),RedisDelayQueueHandler.class);// 从阻塞队列中获取被执行对象,为空时阻塞,作为参数传递给redisDelayQueueHandlerT t = blockingFairQueue.take();log.info("监听队列成功:{},交给处理类:{}", queueConfig.getQueueName(),queueConfig.getBeanId());//池线程执行消费executorService.submit(() -> redisDelayQueueHandler.exec(t));} catch (Exception e) {log.error("监听队列错误,", e);try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.fillInStackTrace();}}}});thread.setName(queueConfig.getQueueName());thread.start();}
}

消费者处理类

这里我们创建一个接口,这样在刚才配置中根据配置文件,就能根据队列名选择对应的消费者

/*** @Description: 延迟队列执行方法,需要具体实现*/
public interface RedisDelayQueueHandler<T> {/*** @Description 执行方法*/void exec(T t);
}/*** @Description: 商品延时消费处理类*/
@Component
@Slf4j
public class GoodsDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}/*** @Description: 订单延时消费处理类*/
@Component
@Slf4j
public class OrderDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}

生产者和工具类

@Data
public class Msg {String queueName;String content;long delay;
}

生产者只需要传入队列名、消息体、延迟时间

@RestController
@RequestMapping("/msg")
@Api(tags = "MsgController")
@Slf4j
public class MsgController {@ResourceRedisDelayQueueConfigProperties properties;@Resourceprivate RedisDelayQueueUtil redisDelayQueueUtil;@PostMapping("/addDelayQueue")@ApiOperation(value = "添加延时消息")public R<?> addDelayQueue(@RequestBody Msg msg) {// 模拟业务中添加延迟任务boolean b = redisDelayQueueUtil.addDelayQueue(msg.getQueueName(),msg.getContent(),msg.getDelay());return R.toR(b);}@PostMapping("/findQueues")@ApiOperation(value = "查询可用队列")public R<?> findQueues() {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();return R.okList(queues,queues.size());}}
/*** @Description: 延迟队列增删工具类*/
@Slf4j
@Component
public class RedisDelayQueueUtil {@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedisDelayQueueConfigProperties properties;/*** 添加延时队列* @param queueName* @param content* @param delay* @return*/public boolean addDelayQueue(String queueName, String content,Long delay) {return addDelayQueue(queueName,content,delay,TimeUnit.SECONDS);}/*** 添加延时队列* @param queueName* @param content* @param endTime* @return*/public  boolean addDelayQueue(String queueName, String content, Date endTime) {long seconds = DateUtils.diffTime(DateUtils.getNowDate(), endTime);if (seconds <= 0) {log.error("不能小于当前时间");throw new RuntimeException("不能小于当前时间");}return addDelayQueue(queueName,content, seconds, TimeUnit.SECONDS);}/*** @Description 添加延迟队列*/private boolean addDelayQueue(String queueName,String content,Long delay,TimeUnit timeUnit) {validateParam(queueName, content);try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(content, delay, timeUnit);log.info("添加延时队列成功:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.info("添加延时队列失败:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");throw new RuntimeException(e.getMessage());}return true;}/*** 获取延迟队列** @param queueName*/public Object getDelayQueue(String queueName){if (StringUtils.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);Object value = delayedQueue.poll();return value;}/*** 删除指定队列中的消息** @param content 指定删除的消息对象队列值(同队列需保证唯一性)* @param queueName 指定队列键*/public boolean removeDelayedQueue(String queueName,String content) {validateParam(queueName, content);RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);return delayedQueue.remove(content);}/*** 校验参数** @param queueName 消息主题* @param content   消息体*/private void validateParam(String queueName, String content) {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();if(CollUtil.isEmpty(queues)){throw new ServiceException("请配置队列名");}if (CharSequenceUtil.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}boolean b = queues.stream().noneMatch(q -> q.getQueueName().equals(queueName));if (b) {throw new ServiceException("没有配置该队列");}if (CharSequenceUtil.isBlank(content)) {throw new ServiceException("消息体不能为空");}}}

测试流程

1、首先查一下可用的队列,只有配置了的才能使用

2、然后我们往goodsDelayQueue队列插入两条数据,延时时间分别为100s、60s

3、往redis插入数据

这里每个创建两个key

1、redisson_delay_queue:{队列名}

2、redisson_delay_queue_timeout:{队列名}

第1个key是由redisson_delay_queue固定前缀+队列名组成,里面的值是ist集合

第2个key是由redisson_delay_queue_timeout固定前缀+队列名组成,里面的值是Zset集合

Zset集合解释

redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。

当Zset到了执行时间,队列的线程就会从take中获取到数据,然后再开一个线程交给消费者处理

http://www.dtcms.com/a/358447.html

相关文章:

  • 鸿蒙NEXT布局全解析:从线性到瀑布流,构建自适应UI界面
  • Notepad++常用设置
  • 金仓数据库迁移评估系统(KDMS)V4正式上线,助力企业高效完成数据库国产化替代
  • 果蔬采摘机器人:自动驾驶融合视觉识别,精准定位,高效作业
  • 【SoC】【W800】基于W800的PWM实现
  • 类和反射的机制
  • hashmap计算key的hash的时候为什么要右移16位
  • 鸿蒙ArkTS 核心篇-16-循环渲染(组件)
  • Ruoyi-vue-plus-5.x第一篇Sa-Token权限认证体系深度解析:1.3 权限控制与注解使用
  • 【计算机组成原理】LRU计数器问题
  • Vue3 + GeoScene 地图点击事件系统设计
  • Selenium + PO 框架进阶实践:接入 Allure 报告与 Jenkins 持续集成
  • macOs上ffmpeg带入libx264库交叉编译
  • docker 启动一个clickhouse , docker 创建ck数据库
  • Python远程文件管理移动端适配与跨平台优化实战
  • vue3多个el-checkbox勾选框设置必选一个
  • 【OpenGL ES】光栅化插值原理和射线拾取原理
  • Day17(前端:JavaScript基础阶段)
  • Cocos游戏中自定义按钮组件(BtnEventComponent)的详细分析与实现
  • HAProxy 负载均衡全解析:从基础部署、负载策略到会话保持及性能优化指南
  • Spring : 事务管理
  • 音视频学习(六十一):H265中的VPS
  • Prompt Engineering:高效构建智能文本生成的策略与实践
  • 深层语义在自然语言处理中的理论框架与技术融合研究
  • AI大模型:(二)5.2 文生视频(Text-to-Video)模型训练实践
  • FPGA增量式方差与均值计算
  • 响应式编程框架Reactor【4】
  • FPGA学习笔记——SPI读写FLASH
  • 优化器全指南:从原理到调优实战
  • 原子操作与锁实现