当前位置: 首页 > news >正文

【线程池】ThreadPoolTaskExecutor和redis的配置案例

【线程池】ThreadPoolTaskExecutor和redis的配置案例

  • 【一】功能描述
    • 【1】线程池配置 (ThreadPoolTaskExecutor)
    • 【2】分布式锁
    • 【3】线程编排
    • 【4】异常处理
    • 【5】缓存策略
  • 【二】配置过程
    • 【1】配置config
      • (1)RedisConfig配置
      • (2)ThreadConfig
    • 【2】controller
    • 【3】model
    • 【4】impl

【一】功能描述

【1】线程池配置 (ThreadPoolTaskExecutor)

(1)通过@Bean定义线程池并注入到 Service 中
(2)使用executor.submit()提交有返回值的任务(返回Future)
(3)使用executor.execute()提交无返回值的任务
(4)所有异步操作都显式通过线程池执行,更灵活可控

【2】分布式锁

(1)使用 Redisson 的RLock实现分布式锁
(2)采用tryLock方式获取锁,避免死锁
(3)设置合理的锁等待时间和自动释放时间
(4)确保锁的释放放在finally块中,保证资源释放

【3】线程编排

(1)使用CountDownLatch实现多线程同步等待
(2)支持超时控制(latch.await(10, TimeUnit.SECONDS))
(3)收集所有线程的Future结果,统一处理

【4】异常处理

(1)线程任务中使用try-catch捕获异常
(2)通过Future获取任务执行结果时处理异常
(3)线程中断处理(Thread.currentThread().interrupt())

【5】缓存策略

(1)实现查询缓存:先查 Redis,再查数据库
(2)更新时采用缓存延时双删策略:
1-更新前删除缓存
2-数据库更新完成后,延迟 500ms 再次删除缓存
(3)缓存设置随机过期时间(5-10 分钟),避免缓存雪崩

【二】配置过程

【1】配置config

(1)RedisConfig配置

package com.example.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {/*** RedisTemplate配置*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 字符串序列化器StringRedisSerializer stringSerializer = new StringRedisSerializer();// JSON序列化器GenericJackson2JsonRedisSerializer jsonSerializer = new GenericJackson2JsonRedisSerializer();template.setKeySerializer(stringSerializer);template.setValueSerializer(jsonSerializer);template.setHashKeySerializer(stringSerializer);template.setHashValueSerializer(jsonSerializer);template.afterPropertiesSet();return template;}/*** Redisson配置(分布式锁)*/@Beanpublic RedissonClient redissonClient() {Config config = new Config();// 单机模式(实际生产环境建议配置集群)config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);return Redisson.create(config);}
}

(2)ThreadConfig

package com.example.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class ThreadConfig {/*** 自定义线程池配置*/@Bean(name = "businessExecutor")public Executor businessExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数executor.setCorePoolSize(5);// 最大线程数executor.setMaxPoolSize(10);// 队列容量executor.setQueueCapacity(20);// 线程空闲时间(秒)executor.setKeepAliveSeconds(60);// 线程名称前缀executor.setThreadNamePrefix("business-");// 拒绝策略:当线程和队列都满时,由提交任务的线程执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 线程池关闭时等待所有任务完成executor.setWaitForTasksToCompleteOnShutdown(true);// 等待时间(秒)executor.setAwaitTerminationSeconds(60);// 初始化executor.initialize();return executor;}
}

【2】controller

package com.example.controller;import com.example.model.Product;
import com.example.service.ProductService;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;@RestController
@RequestMapping("/products")
public class ProductController {private final ProductService productService;public ProductController(ProductService productService) {this.productService = productService;}/*** 获取商品信息*/@GetMapping("/{id}")public Product getProduct(@PathVariable Long id) {return productService.getProduct(id);}/*** 更新商品信息*/@PutMappingpublic String updateProduct(@RequestBody Product product) {productService.updateProduct(product);return "商品更新成功";}/*** 扣减库存*/@PostMapping("/{id}/deduct")public String deductStock(@PathVariable Long id, @RequestParam int quantity) {boolean success = productService.deductStock(id, quantity);return success ? "库存扣减成功" : "库存扣减失败";}/*** 批量处理商品*/@PostMapping("/batch-process")public String batchProcessProducts() throws InterruptedException, ExecutionException {// 测试批量处理商品ID 1-5List<Long> productIds = Arrays.asList(1L, 2L, 3L, 4L, 5L);productService.batchProcessProducts(productIds);return "批量处理完成";}
}

【3】model

package com.example.model;import java.io.Serializable;
import java.math.BigDecimal;public class Product implements Serializable {private Long id;private String name;private BigDecimal price;private Integer stock;private String description;// 构造函数、getter和setterpublic Product() {}public Product(Long id, String name, BigDecimal price, Integer stock, String description) {this.id = id;this.name = name;this.price = price;this.stock = stock;this.description = description;}// getter和setter方法public Long getId() { return id; }public void setId(Long id) { this.id = id; }public String getName() { return name; }public void setName(String name) { this.name = name; }public BigDecimal getPrice() { return price; }public void setPrice(BigDecimal price) { this.price = price; }public Integer getStock() { return stock; }public void setStock(Integer stock) { this.stock = stock; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }
}

【4】impl

package com.example.service;import com.example.model.Product;
import java.util.List;
import java.util.concurrent.ExecutionException;public interface ProductService {// 获取商品信息(带缓存)Product getProduct(Long id);// 更新商品信息(带缓存双删)void updateProduct(Product product);// 扣减库存(带分布式锁)boolean deductStock(Long productId, int quantity);// 批量处理商品(线程池+CountDownLatch)void batchProcessProducts(List<Long> productIds) throws InterruptedException, ExecutionException;
}
package com.example.service.impl;import com.example.model.Product;
import com.example.service.ProductService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;@Service
public class ProductServiceImpl implements ProductService {// Redis缓存键前缀private static final String CACHE_KEY_PREFIX = "product:";// 分布式锁键前缀private static final String LOCK_KEY_PREFIX = "lock:product:";// 随机数生成器(用于缓存过期时间)private static final Random RANDOM = new Random();private final ThreadPoolTaskExecutor businessExecutor;private final RedisTemplate<String, Object> redisTemplate;private final RedissonClient redissonClient;// 构造函数注入依赖public ProductServiceImpl(ThreadPoolTaskExecutor businessExecutor,RedisTemplate<String, Object> redisTemplate,RedissonClient redissonClient) {this.businessExecutor = businessExecutor;this.redisTemplate = redisTemplate;this.redissonClient = redissonClient;}/*** 获取商品信息(先查缓存,再查数据库)*/@Overridepublic Product getProduct(Long id) {String cacheKey = CACHE_KEY_PREFIX + id;// 1. 先查询Redis缓存Product product = (Product) redisTemplate.opsForValue().get(cacheKey);if (product != null) {System.out.println("从缓存获取商品信息: " + id);return product;}// 2. 缓存未命中,查询数据库(实际项目中替换为真实DB查询)System.out.println("从数据库获取商品信息: " + id);product = mockDbQuery(id);if (product != null) {// 3. 写入缓存,设置随机过期时间(5-10分钟)防止缓存雪崩int expireTime = 300 + RANDOM.nextInt(300);redisTemplate.opsForValue().set(cacheKey, product, expireTime, TimeUnit.SECONDS);System.out.println("商品信息写入缓存: " + id + ", 过期时间: " + expireTime + "秒");}return product;}/*** 更新商品信息(缓存延时双删策略)*/@Overridepublic void updateProduct(Product product) {String cacheKey = CACHE_KEY_PREFIX + product.getId();// 1. 先删除缓存redisTemplate.delete(cacheKey);System.out.println("更新前删除缓存: " + product.getId());// 2. 更新数据库(实际项目中替换为真实DB更新)System.out.println("更新数据库商品信息: " + product.getId());mockDbUpdate(product);// 3. 延迟一段时间后再次删除缓存(解决更新期间的脏读)businessExecutor.execute(() -> {try {// 延迟500ms,确保数据库更新完成TimeUnit.MILLISECONDS.sleep(500);redisTemplate.delete(cacheKey);System.out.println("延迟删除缓存: " + product.getId());} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("延迟删除缓存异常: " + e.getMessage());}});}/*** 扣减库存(使用Redisson分布式锁)*/@Overridepublic boolean deductStock(Long productId, int quantity) {String lockKey = LOCK_KEY_PREFIX + productId;RLock lock = redissonClient.getLock(lockKey);boolean success = false;try {// 尝试获取锁,最多等待3秒,10秒后自动释放boolean locked = lock.tryLock(3, 10, TimeUnit.SECONDS);if (locked) {System.out.println("获取分布式锁成功: " + lockKey);// 查询商品信息Product product = getProduct(productId);if (product == null) {System.out.println("商品不存在: " + productId);return false;}// 检查库存if (product.getStock() < quantity) {System.out.println("库存不足: " + productId + ", 当前库存: " + product.getStock() + ", 请求扣减: " + quantity);return false;}// 扣减库存(实际项目中替换为真实DB操作)product.setStock(product.getStock() - quantity);mockDbUpdate(product);System.out.println("库存扣减成功: " + productId + ", 剩余库存: " + product.getStock());// 更新缓存redisTemplate.opsForValue().set(CACHE_KEY_PREFIX + productId, product);success = true;} else {System.out.println("获取分布式锁失败: " + lockKey);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("扣减库存异常: " + e.getMessage());} finally {// 释放锁(只有持有锁的线程才能释放)if (lock.isHeldByCurrentThread()) {lock.unlock();System.out.println("释放分布式锁: " + lockKey);}}return success;}/*** 批量处理商品(使用线程池和CountDownLatch编排)*/@Overridepublic void batchProcessProducts(List<Long> productIds) throws InterruptedException, ExecutionException {if (productIds == null || productIds.isEmpty()) {return;}System.out.println("开始批量处理商品,共" + productIds.size() + "个");CountDownLatch latch = new CountDownLatch(productIds.size());long startTime = System.currentTimeMillis();// 提交所有任务到线程池List<Future<Product>> futures = productIds.stream().map(id -> businessExecutor.submit(() -> {try {// 模拟处理逻辑:查询商品并更新最后访问时间Product product = getProduct(id);if (product != null) {System.out.println("线程" + Thread.currentThread().getName() + "处理商品: " + id);// 模拟耗时操作TimeUnit.MILLISECONDS.sleep(100 + RANDOM.nextInt(200));}return product;} catch (Exception e) {System.err.println("处理商品" + id + "异常: " + e.getMessage());throw e; // 抛出异常,让Future捕获} finally {latch.countDown(); // 计数器减1}})).collect(Collectors.toList());// 等待所有任务完成,最多等待10秒boolean allCompleted = latch.await(10, TimeUnit.SECONDS);long endTime = System.currentTimeMillis();if (allCompleted) {System.out.println("所有商品处理完成,耗时: " + (endTime - startTime) + "ms");// 处理所有任务结果List<Product> processedProducts = futures.stream().map(future -> {try {return future.get();} catch (Exception e) {System.err.println("获取任务结果异常: " + e.getMessage());return null;}}).filter(p -> p != null).collect(Collectors.toList());System.out.println("成功处理商品数量: " + processedProducts.size());} else {System.out.println("部分商品处理超时,已完成" + (productIds.size() - latch.getCount()) + "/" + productIds.size());}}/*** 模拟数据库查询*/private Product mockDbQuery(Long id) {// 模拟数据库查询延迟try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 模拟数据return new Product(id, "商品" + id, new java.math.BigDecimal("99.99"), 100, "这是商品" + id + "的描述");}/*** 模拟数据库更新*/private void mockDbUpdate(Product product) {// 模拟数据库更新延迟try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
http://www.dtcms.com/a/347007.html

相关文章:

  • 《UE教程》第一章第十一回——UE5.6打包安卓
  • Python 字符串查找,计数,判断,修改
  • Linux服务器利用Systemd配置定时任务
  • 手机横屏适配方案
  • Python 实战:内网渗透中的信息收集自动化脚本(2)
  • Python爬虫实战:构建港口物流数据采集和分析系统
  • 英伟达显卡GPU驱动的本质
  • Ubuntu 的 apt-get 强制使用 IPv4 网络
  • rust语言 (1.88) egui (0.32.1) 学习笔记(逐行注释)(九)数值拖拽控件、进度条、滑动条
  • JupyterLab在线调试实验室
  • 【C语言16天强化训练】从基础入门到进阶:Day 7
  • 【Github】SourceTree远端链接Github
  • 173-基于Flask的微博舆情数据分析系统
  • Dism++备份系统时报错[句柄无效]的解决方法
  • 大模型训练方法全面解析:SFT、RFT、TRPO、DPO、PPO、GRPO、RLH、RLHF技术深度剖析
  • chromadb使用hugging face模型时利用镜像网站下载注意事项
  • SQL Server Service Broker超全介绍
  • linux内核 - slab 分配器
  • 微信小程序界面常用操作
  • 【200页PPT】IT战略规划架构设计报告(附下载方式)
  • SpringAi和LangChain4j揭开面纱
  • 高速CANFD收发器ASM1042在割草机器人轮毂电机通信系统中的适配性研究
  • LeakyReLU和ReLU的区别
  • 【51单片机学习】直流电机驱动(PWM)、AD/DA、红外遥控(外部中断)
  • 脚本:git push直到成功(windows powershell命令)(Github连不上、Github断开)
  • UE5.3 中键盘按键和操作绑定
  • 37_基于深度学习的铝材缺陷检测识别系统(yolo11、yolov8、yolov5+UI界面+Python项目源码+模型+标注好的数据集)
  • openharmony之一多开发:产品形态配置讲解
  • 解码LLM量化:深入剖析最常见8位与4位核心算法
  • 【机器学习深度学习】多模态典型任务与应用全景