当前位置: 首页 > news >正文

线程池自顶向下

在一些场景下,线程会被频繁创建和销毁,但他们却始终在完成相似的任务

这个场景下我们回去引入一个线程池的概念

可以简单总结为:

任务提交 → 核心线程执行 → 任务队列缓存 → 非核心线程执行 → 拒绝策略处理。

话不多说先看一个简单的线程池代码

通过 ThreadPoolExecutor 自定义(推荐)

先创建一个线程池

 
int corePoolSize = 5;           // 核心线程数
int maxPoolSize = 10;           // 最大线程数
long keepAliveTime = 60;        // 线程空闲时间
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100); // 任务队列
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    keepAliveTime,
    unit,
    workQueue,
    threadFactory,
    handler
);

提交线程池任务

// 提交Runnable任务
executor.execute(() -> {
    System.out.println("Task executed by " + Thread.currentThread().getName());
});
// 提交Callable任务(获取返回值)
Future<String> future = executor.submit(() -> {
    return "Result from Callable";
});
try {
    String result = future.get(); // 阻塞等待结果
    System.out.println(result);
} catch (Exception e) {
    e.printStackTrace();
}

但是在springboot的项目中我们一般不会像上面这样做,我们会像下面这样去做

这是一个项目结构

src/main/java
└── com.example.demo
    ├── AdminApplication.java (启动类)
    ├── config
    │   └── ExecutorConfig.java (线程池配置)
    ├── controller
    │   └── AsyncController.java
    └── service
        └── AsyncService.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication(scanBasePackages = {"com.example"})
@EnableAsync(proxyTargetClass = true) // 启用异步支持
public class AdminApplication {
    public static void main(String[] args) {
        SpringApplication.run(AdminApplication.class, args);
    }
}

 

  1. YAML 配置文件 (application.yml)

async:
  executor:
    thread:
      core_pool_size: 5       # 核心线程数
      max_pool_size: 5         # 最大线程数
      queue_capacity: 99999    # 队列容量
      name:
        prefix: 异步执行线程池  # 线程名称前缀
  1. 线程池配置类

 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync // 启用异步支持
public class ExecutorConfig {
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix(namePrefix + "-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

  1. 在 Service 层方法使用

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
    // 使用指定线程池执行异步方法
    @Async("asyncServiceExecutor") // 必须指定配置的线程池名称
    public void asyncMethod() {
        System.out.println("异步执行开始 - 线程名:" + Thread.currentThread().getName());
        try {
            // 模拟耗时操作
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("异步执行结束");
    }
}
  1. Controller 中调用异步方法

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncController {
    private final AsyncService asyncService;
    public AsyncController(AsyncService asyncService) {
        this.asyncService = asyncService;
    }
    @GetMapping("/execute-async")
    public String executeAsync() {
        System.out.println("主线程开始处理请求 - 线程名:" + Thread.currentThread().getName());
        asyncService.asyncMethod(); // 触发异步执行
        System.out.println("主线程继续处理其他任务");
        return "异步任务已提交";
    }
}

这样我们就会去异步执行这个线程池的任务

但是线程池内部是怎么执行的呢

任务提交 → 核心线程执行 → 任务队列缓存 → 非核心线程执行 → 拒绝策略处理。

第一步,线程池通过 submit() 提交任务。

ExecutorService threadPool = Executors.newFixedThreadPool(5);threadPool.submit(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "办理业务");});

第二步,线程池会先创建核心线程来执行任务。

 

if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; }}

第三步,如果核心线程都在忙,任务会被放入任务队列中。

 

workQueue.offer(task);

第四步,如果任务队列已满,且当前线程数量小于最大线程数,线程池会创建新的线程来处理任务。

 

if (!addWorker(command, false))

第五步,如果线程池中的线程数量已经达到最大线程数,且任务队列已满,线程池会执行拒绝策略。

 

handler.rejectedExecution(command, this);

线程池的参数

线程池有 7 个参数,需要重点关注的有核心线程数、最大线程数、等待队列、拒绝策略。

一、核心线程数(corePoolSize)

作用
  • 线程池常驻线程数量:即使空闲也不会被销毁(除非设置 allowCoreThreadTimeOut

  • 任务处理的基础保障

设置建议
场景类型推荐值原理说明
​CPU密集型CPU核数 + 1避免线程切换开销
​IO密集型CPU核数 × 2利用等待IO的时间处理其他任务
​混合型根据任务比例动态调整需监控CPU和IO等待时间
代码示例(Spring Boot)

 

async: executor: core-pool-size: 10 # CPU核数为4时,IO密集型设为8


二、最大线程数(maxPoolSize)

作用
  • 线程池扩容上限:应对突发流量高峰

  • 与核心线程数的差值决定弹性能力

设置建议
场景计算公式典型值
​秒杀场景maxPoolSize = corePoolSize × 250 → 100
​普通Web服务maxPoolSize = corePoolSize + 5020 → 70
​批处理任务maxPoolSize = 总任务数 / 单线程耗时按需计算
注意事项
  • 内存限制:每个线程占用约 1MB 栈空间,100线程需 100MB

  • JVM限制:通过 -Xss 调整栈大小(如 -Xss256k


三、等待队列(workQueue)

类型对比与选型
队列类型特性适用场景
​无界队列容量无限(LinkedBlockingQueue)任务量可控,防止OOM
​有界队列容量固定(ArrayBlockingQueue)严格控制内存使用
​同步移交队列容量0(SynchronousQueue)实时处理,拒绝策略配合使用

①、ArrayBlockingQueue:一个有界的先进先出的阻塞队列,底层是一个数组,适合固定大小的线程池。

②、LinkedBlockingQueue:底层是链表,如果不指定大小,默认大小是 Integer.MAX_VALUE,几乎相当于一个无界队列。

③、PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。任务按照其自然顺序或 Comparator 来排序。

适用于需要按照给定优先级处理任务的场景,比如优先处理紧急任务。

④、DelayQueue:类似于 PriorityBlockingQueue,由二叉堆实现的无界优先级阻塞队列。

Executors 中的 newScheduledThreadPool() 就使用了 DelayQueue 来实现延迟执行。

⑤、SynchronousQueue:每个插入操作必须等待另一个线程的移除操作,同样,任何一个移除操作都必须等待另一个线程的插入操作。

Executors.newCachedThreadPool() 就使用了 SynchronousQueue,这个线程池会根据需要创建新线程,如果有空闲线程则会重复使用,线程空闲 60 秒后会被回收。


四、拒绝策略(RejectedExecutionHandler)

策略对比与选择
策略行为使用场景
​AbortPolicy抛出异常(默认)需要快速失败场景
​CallerRunsPolicy提交线程执行任务控制提交速率,平滑削峰
​DiscardPolicy直接丢弃任务允许任务丢失(日志等场景)
​DiscardOldestPolicy丢弃最旧任务后重试实时性要求高场景
配置实践
// 自定义拒绝策略(记录日志并丢弃)
executor.setRejectedExecutionHandler(runnable -> {
    log.warn("Task rejected: {}", runnable);
});
// Spring Boot 配置示例
@Bean
public ThreadPoolTaskExecutor executor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}

五、参数联动配置方案

场景驱动配置模板
场景corePoolSizemaxPoolSizequeueCapacity拒绝策略
​电商秒杀50100100CallerRunsPolicy
​微服务API网关20501000AbortPolicy
​数据清洗服务1020无界队列DiscardOldestPolicy
​后台批处理48500DiscardPolicy

相关文章:

  • 关于 @Autowired 和 @Value 使用 private 字段的警告问题分析与解决方案
  • # C++初阶——内存管理
  • 【mysql】日志:binLog、redoLog和undoLog
  • openwebui和keycloak集成,使用keycloak的用户名和密码登录
  • Ubuntu 安全限制遭突破:攻击者可利用内核漏洞提权
  • 如何使用AI去水印(ChatGPT去除图片水印)
  • Proxmox pct 部署debian
  • Elasticsearch安全加固指南:启用登录认证与SSL加密
  • Linux服务器组建与管理
  • 使用 Selenium 构建简单高效的网页爬虫
  • 4.1 代码随想录第三十二天打卡
  • ​Android 集成 Facebook 登录
  • 2025.4.6机器学习笔记:文献阅读
  • AI与.NET技术实操系列(四):使用 Semantic Kernel 和 DeepSeek 构建AI应用
  • Sink Token
  • Java关于抽象类和抽象方法
  • 使用Python解析PPT文件并生成JSON结构详解
  • 25 python 迭代器与生成器
  • 教你快速理解linux中的NUMA节点探测是干什么用的?
  • 配置多区域集成IS-IS和抓包分析
  • 习近平会见哥伦比亚总统佩特罗
  • Manus向全球用户开放注册
  • 一海南救护车在西藏无任务拉警笛开道,墨脱警方:已处罚教育
  • 从这些电影与影像,看到包容开放的上海
  • 盖茨说对中国技术封锁起到反作用
  • 马上评丨75万采购300元设备,仅仅终止采购还不够