ThreadPoolExecutor 最佳实践
1. 基本定位
ThreadPoolExecutor
是 Java 并发包核心线程池实现,管理线程生命周期和任务调度,主要解决:
线程复用(减少频繁创建销毁线程的开销)
任务调度(控制并发度、任务队列、拒绝策略)
资源隔离(不同任务用不同线程池,避免互相阻塞)
它实现了:
Executor
(任务执行接口)ExecutorService
(任务提交/关闭接口)支持生命周期管理(
shutdown
、shutdownNow
)内置任务统计与监控 API
2. 构造方法与参数详解
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 非核心线程空闲回收时间TimeUnit unit, // 时间单位BlockingQueue<Runnable> workQueue,// 任务队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler // 拒绝策略
)
参数 | 含义 | 核心作用 | 调优建议 |
---|---|---|---|
corePoolSize | 核心线程数 | 常驻执行任务的线程数(默认不回收) | CPU 密集型 = CPU 核数+1;IO 密集型 = CPU 核数 × 2~4 |
maximumPoolSize | 最大线程数 | 峰值时可扩容到的线程数 | 有界队列下才有意义 |
keepAliveTime | 空闲时间 | 非核心线程的回收时间 | 流量波动大时调短,减少空闲线程占用 |
unit | 时间单位 | keepAliveTime 的单位 | 常用 SECONDS |
workQueue | 任务队列 | 决定任务缓冲与调度策略 | 有界队列防 OOM |
threadFactory | 线程工厂 | 线程命名、优先级、守护状态 | 建议自定义,方便排查 |
handler | 拒绝策略 | 队列与线程均满时的处理逻辑 | 核心任务用 CallerRunsPolicy |
3. 执行流程(源码级)
调用 execute(Runnable)
时,流程如下:
核心线程不足
→ 直接创建新线程执行任务(addWorker(command, true)
)核心线程已满
→ 尝试放入任务队列(workQueue.offer(command)
)队列已满
→ 尝试创建非核心线程(addWorker(command, false)
)非核心线程已满
→ 执行拒绝策略(handler.reject()
)
源码关键片段:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private static int runStateOf(int c) { return c & ~COUNT_MASK; }private static int workerCountOf(int c) { return c & COUNT_MASK; }
4. 内部核心结构
4.1 ctl(线程池状态 + 线程数)
高 3 位表示线程池状态(RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED)
低 29 位表示当前线程数
4.2 Worker(工作线程)
ThreadPoolExecutor.Worker
继承 AQS,封装了线程对象和首个任务持有独占锁,防止在执行任务时被错误中断
4.3 getTask()(任务获取逻辑)
核心线程:
queue.take()
(阻塞等待)非核心线程:
queue.poll(keepAliveTime)
(超时等待,超时回收)
5. 常用任务队列类型
队列类型 | 是否有界 | 特点 | 适用场景 |
---|---|---|---|
SynchronousQueue | 容量=0 | 任务直接交给线程,不排队 | 高并发短任务 |
LinkedBlockingQueue | 默认无界 | 可缓冲大量任务,FIFO | 低风险流量,需指定容量 |
ArrayBlockingQueue | 有界 | 固定容量,FIFO | 内存可控,高峰可触发扩容 |
PriorityBlockingQueue | 默认无界 | 按优先级执行 | 调度系统 |
DelayQueue | 默认无界 | 延迟任务队列 | 定时任务 |
6. 拒绝策略(RejectedExecutionHandler)
AbortPolicy(默认) → 抛异常
CallerRunsPolicy → 调用方线程执行任务
DiscardPolicy → 直接丢弃
DiscardOldestPolicy → 丢弃最老任务,重试入队
生产建议:
核心业务:CallerRunsPolicy + 日志告警
非关键业务:DiscardPolicy
7. 最佳实践配置
7.1 CPU 密集型任务
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,Runtime.getRuntime().availableProcessors() + 1,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
7.2 IO 密集型任务
new ThreadPoolExecutor(cpu * 2,cpu * 4,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2000),threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
7.3 流量波动场景(核心线程可回收)
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 20,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(500),threadFactory
);
executor.allowCoreThreadTimeOut(true);
8. 调优要点
避免无界队列(LinkedBlockingQueue 默认无界 → maximumPoolSize 失效)
与容器资源绑定
corePoolSize ≈ CPU Limit
避免超配导致 CPU 抢占
监控指标
activeCount / poolSize / queue.size()
completedTaskCount
优雅停机
@PreDestroy
(spring)
调用shutdown()
+awaitTermination
隔离不同业务
核心业务与非核心业务用不同线程池
拒绝策略监控
自定义 handler 记录拒绝任务日志
9. 常见误区
newFixedThreadPool
+LinkedBlockingQueue
默认无界 → OOMkeepAliveTime=0
且不allowCoreThreadTimeOut
→ 核心线程常驻浪费资源无界队列下 maximumPoolSize 无效
任务执行时间过长 → 队列堆积,延迟高
线程池被 Spring 代理关闭 → 任务丢失(需显式
@Bean(destroyMethod="shutdown")
)