当前位置: 首页 > news >正文

东营建网站wordpress商城汉化主题

东营建网站,wordpress商城汉化主题,专做化妆品网站,淘宝客免费建网站前言 之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。…

前言

之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。

参考文章

【redis缓存】怎么使用 Redis 实现一个延时队列?_redis实现延时队列-CSDN博客

Redisson 的延迟队列真的能用吗?一文看透原理 + 坑点_redission延时队列原理-CSDN博客

Spring Boot 集成 Redisson 实现消息队列_springboot redis消息队列-CSDN博客


引入相关依赖

         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.3</version></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>2.13.5</version></dependency>

redission配置类

@Configuration
@Slf4j
public class RedissonConfig {private final static String REDISSON_PREFIX = "redis://";@ResourceRedisProperties redisProperties;@Beanpublic RedissonClient redissonClient() {Config config = new Config();String url = REDISSON_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort();config.useSingleServer().setAddress(url).setPassword(redisProperties.getPassword()).setDatabase(redisProperties.getDatabase()).setPingConnectionInterval(2000);  //设置2秒心跳间隔config.setLockWatchdogTimeout(10000L);     //看门狗超时缩短(分布式锁自动续期更灵敏)显式设置为10秒try {return Redisson.create(config);} catch (Exception e) {log.error("RedissonClient init redis url:{}Exception:{}", url, e.getMessage());return null;}}
}

延迟队列配置类

相关的配置文件

server:port: ${SERVER_PORT:9211}# Spring
spring:application:# 应用名称name: ruoyi-redis-msg2redis:host: localhostport: 6379password: 123456mvc:pathmatch:matching-strategy: ant_path_matcher# 是否启用redis延迟队列
redission:delayqueue:enable: truequeues:- queueName: goodsDelayQueuebeanId: goodsDelayConsumeHandlerdesc: 商品消费的延迟队列- queueName: orderDelayQueuebeanId: orderDelayConsumeHandlerdesc: 订单消费的延迟队列
@Configuration
@ConfigurationProperties(prefix = "redission.delayqueue")
@Slf4j
@Data
public class RedisDelayQueueConfigProperties {private List<QueueConfig> queues;@Datapublic static class QueueConfig {private String queueName;private String desc;private String beanId;}
}

上面的配置文件中配置了2个重要参数

  1. 队列名
  2. 每个队列的处理类

下面的配置类,会为每个队列创建一个新的线程并且循环运行,阻塞监听队列(当队列有元素则take获取,如果没有则阻塞等待)

/*** @Description: redis延迟队列配置*/
@Configuration
@ConditionalOnProperty(value = "redission.delayqueue.enable")
@Slf4j
public class RedisDelayQueueConfig {@Autowiredprivate RedisDelayQueueConfigProperties configProperties;/*** @Description 线程池*/@Bean("delayExecutor")public ExecutorService getDelayExecutor() {return Executors.newFixedThreadPool(5);}@Beanpublic List<RedisDelayQueueConfigProperties.QueueConfig> startRedisDelayQueue(ApplicationContext applicationContext,RedissonClient redissonClient,@Qualifier("delayExecutor") ExecutorService executorService) {//根据配置的队列创建对应的线程,1个队列对应1个线程List<RedisDelayQueueConfigProperties.QueueConfig> queueConfigs = configProperties.getQueues();for (RedisDelayQueueConfigProperties.QueueConfig queueConfig : queueConfigs) {startThread(applicationContext, redissonClient, executorService, queueConfig);}return queueConfigs;}private <T> void startThread(ApplicationContext applicationContext,RedissonClient redissonClient,ExecutorService executorService,RedisDelayQueueConfigProperties.QueueConfig queueConfig) {//redissonClient获取阻塞队列RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueConfig.getQueueName());// 由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {log.info("启动监听队列线程:{}", queueConfig.getQueueName());while (true) {try {// 获取到执行类RedisDelayQueueHandler redisDelayQueueHandler = applicationContext.getBean(queueConfig.getBeanId(),RedisDelayQueueHandler.class);// 从阻塞队列中获取被执行对象,为空时阻塞,作为参数传递给redisDelayQueueHandlerT t = blockingFairQueue.take();log.info("监听队列成功:{},交给处理类:{}", queueConfig.getQueueName(),queueConfig.getBeanId());//池线程执行消费executorService.submit(() -> redisDelayQueueHandler.exec(t));} catch (Exception e) {log.error("监听队列错误,", e);try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.fillInStackTrace();}}}});thread.setName(queueConfig.getQueueName());thread.start();}
}

消费者处理类

这里我们创建一个接口,这样在刚才配置中根据配置文件,就能根据队列名选择对应的消费者

/*** @Description: 延迟队列执行方法,需要具体实现*/
public interface RedisDelayQueueHandler<T> {/*** @Description 执行方法*/void exec(T t);
}/*** @Description: 商品延时消费处理类*/
@Component
@Slf4j
public class GoodsDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}/*** @Description: 订单延时消费处理类*/
@Component
@Slf4j
public class OrderDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}

生产者和工具类

@Data
public class Msg {String queueName;String content;long delay;
}

生产者只需要传入队列名、消息体、延迟时间

@RestController
@RequestMapping("/msg")
@Api(tags = "MsgController")
@Slf4j
public class MsgController {@ResourceRedisDelayQueueConfigProperties properties;@Resourceprivate RedisDelayQueueUtil redisDelayQueueUtil;@PostMapping("/addDelayQueue")@ApiOperation(value = "添加延时消息")public R<?> addDelayQueue(@RequestBody Msg msg) {// 模拟业务中添加延迟任务boolean b = redisDelayQueueUtil.addDelayQueue(msg.getQueueName(),msg.getContent(),msg.getDelay());return R.toR(b);}@PostMapping("/findQueues")@ApiOperation(value = "查询可用队列")public R<?> findQueues() {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();return R.okList(queues,queues.size());}}
/*** @Description: 延迟队列增删工具类*/
@Slf4j
@Component
public class RedisDelayQueueUtil {@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedisDelayQueueConfigProperties properties;/*** 添加延时队列* @param queueName* @param content* @param delay* @return*/public boolean addDelayQueue(String queueName, String content,Long delay) {return addDelayQueue(queueName,content,delay,TimeUnit.SECONDS);}/*** 添加延时队列* @param queueName* @param content* @param endTime* @return*/public  boolean addDelayQueue(String queueName, String content, Date endTime) {long seconds = DateUtils.diffTime(DateUtils.getNowDate(), endTime);if (seconds <= 0) {log.error("不能小于当前时间");throw new RuntimeException("不能小于当前时间");}return addDelayQueue(queueName,content, seconds, TimeUnit.SECONDS);}/*** @Description 添加延迟队列*/private boolean addDelayQueue(String queueName,String content,Long delay,TimeUnit timeUnit) {validateParam(queueName, content);try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(content, delay, timeUnit);log.info("添加延时队列成功:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.info("添加延时队列失败:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");throw new RuntimeException(e.getMessage());}return true;}/*** 获取延迟队列** @param queueName*/public Object getDelayQueue(String queueName){if (StringUtils.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);Object value = delayedQueue.poll();return value;}/*** 删除指定队列中的消息** @param content 指定删除的消息对象队列值(同队列需保证唯一性)* @param queueName 指定队列键*/public boolean removeDelayedQueue(String queueName,String content) {validateParam(queueName, content);RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);return delayedQueue.remove(content);}/*** 校验参数** @param queueName 消息主题* @param content   消息体*/private void validateParam(String queueName, String content) {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();if(CollUtil.isEmpty(queues)){throw new ServiceException("请配置队列名");}if (CharSequenceUtil.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}boolean b = queues.stream().noneMatch(q -> q.getQueueName().equals(queueName));if (b) {throw new ServiceException("没有配置该队列");}if (CharSequenceUtil.isBlank(content)) {throw new ServiceException("消息体不能为空");}}}

测试流程

1、首先查一下可用的队列,只有配置了的才能使用

2、然后我们往goodsDelayQueue队列插入两条数据,延时时间分别为100s、60s

3、往redis插入数据

这里每个创建两个key

1、redisson_delay_queue:{队列名}

2、redisson_delay_queue_timeout:{队列名}

第1个key是由redisson_delay_queue固定前缀+队列名组成,里面的值是ist集合

第2个key是由redisson_delay_queue_timeout固定前缀+队列名组成,里面的值是Zset集合

Zset集合解释

redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。

当Zset到了执行时间,队列的线程就会从take中获取到数据,然后再开一个线程交给消费者处理

http://www.dtcms.com/a/506598.html

相关文章:

  • 合肥昱天建设有限公司网站2016手机网站制作规范
  • 网站制作 青岛seo工具下载
  • 做初中物理题目的网站photoshop 做网站logo
  • 网站开发技术项目邢台网站建设最新报价
  • 木地板企业网站模版网站空间到期怎么办
  • 佛山著名网站建设公司赣州瑞金网站建设
  • 免费模板下载网站推荐免费asp企业网站源码
  • 华企立方网站深圳广告设计公司深圳画册设计
  • 网站制作全包价格中国商标商标查询网
  • 做学校和企业对接的网站做企业网站接单
  • 开发区网站开发语言网站建设5000费用
  • 自己做网站用什么软件深圳建设交易宝安
  • 建站教程下载网站建设首页模板下载
  • 网站建设中英文版软件科技公司网站模板
  • 内部网站建设青青网站怎么做
  • 做孵化的网站php网站开发书籍
  • 手机网站app辅助wordpress 页面编辑器
  • 网站建设免费视频教程wordpress5.0.3下载
  • 网站的推广代码是什么河南搜索引擎推广公司
  • 小程序代理与加盟windows优化
  • 做网站的软件wd的叫啥源码网
  • 怎么用html5做自适应网站品牌营销策略论文
  • 网站专题报道页面怎么做的建设牌安全带官方网站
  • 一家只做正品的网站河北人工智能建站系统软件
  • 学校网站制作素材美橙互联 wordpress
  • jsp建设网站教程网站建设合同违约金细节
  • 烟台有哪些网站建站推广公司云凡济南网站建设开发
  • 货运 东莞网站建设装修网站建设案例
  • 本地顺德网站建设上海网站设计价
  • 找外包做网站不给代码网站建设投标书免费