说一下Java里面线程池的拒绝策略
文章目录
- 四种标准的拒绝策略
- 1. `ThreadPoolExecutor.AbortPolicy` (中止策略)
- 2. `ThreadPoolExecutor.CallerRunsPolicy` (调用者运行策略)
- 3. `ThreadPoolExecutor.DiscardPolicy` (丢弃策略)
- 4. `ThreadPoolExecutor.DiscardOldestPolicy` (丢弃最旧策略)
- 总结对比
- 自定义拒绝策略
Java 线程池的 拒绝策略 (Rejection Policy) 是一个非常核心且重要的概念。当线程池的 工作队列已满,并且 活动线程数已达到最大线程数时,新提交的任务就会被拒绝。此时,线程池会根据设定的拒绝策略来处理这个被拒绝的任务。
这个策略由 RejectedExecutionHandler
接口定义,ThreadPoolExecutor
提供了四种开箱即用的标准实现。
四种标准的拒绝策略
这四种策略都作为 ThreadPoolExecutor
的内部静态类存在。
1. ThreadPoolExecutor.AbortPolicy
(中止策略)
- 行为: 这是默认的拒绝策略。当新任务被拒绝时,它会直接抛出
RejectedExecutionException
异常。这是一个运行时异常 (Unchecked Exception)。 - 优点: 能够清晰地告知调用者(任务提交方)线程池已满,迫使调用者捕获异常并处理这个问题(例如,可以进行重试、降级处理或记录日志)。
- 适用场景: 适用于那些不能容忍任务丢失的关键业务。通过抛出异常,可以及时发现系统瓶颈并进行相应处理。
示例代码:
// 默认策略,不指定 RejectedExecutionHandler 时就是它
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1), // 队列容量为1new ThreadPoolExecutor.AbortPolicy() // 明确指定
);pool.execute(() -> { /* 任务1 */ }); // 核心线程执行
pool.execute(() -> { /* 任务2 */ }); // 入队
pool.execute(() -> { /* 任务3 */ }); // 创建非核心线程执行// 此时池已满 (1个核心 + 1个队列 + 1个非核心)
pool.execute(() -> { /* 任务4 */ }); // 触发拒绝策略,抛出 RejectedExecutionException
2. ThreadPoolExecutor.CallerRunsPolicy
(调用者运行策略)
- 行为: 当新任务被拒绝时,该任务不会被丢弃,也不会抛出异常。而是由提交该任务的线程来亲自执行。例如,如果
main
线程调用pool.execute(task)
时任务被拒绝,那么task
就会在main
线程中同步执行。 - 优点:
- 不会丢失任务。
- 通过占用提交任务的线程,可以减慢任务提交的速度,形成一种天然的“反压”或“降速”机制,给线程池一个喘息和处理积压任务的机会。
- 适用场景: 适用于需要处理高负载,但又不希望丢失任务的场景。它能有效地调节任务的生产和消费速度。
3. ThreadPoolExecutor.DiscardPolicy
(丢弃策略)
- 行为: 当新任务被拒绝时,它会悄无声息地丢弃该任务,不做任何处理,也不会抛出任何异常。
- 优点: 实现简单,对系统影响小。
- 缺点: 任务会静默丢失,这在大多数业务场景下是不可接受的,因为你无法知道哪些任务因为系统繁忙而被丢弃了。
- 适用场景: 仅适用于那些允许任务丢失的非关键业务,例如一些无关紧要的日志记录。
4. ThreadPoolExecutor.DiscardOldestPolicy
(丢弃最旧策略)
- 行为: 当新任务被拒绝时,它会丢弃工作队列队头的一个任务(即等待时间最长的任务),然后尝试重新提交当前被拒绝的任务。
- 优点: 这是一种权衡策略,优先保证新任务的执行,因为它认为新任务的优先级可能更高。
- 缺点: 同样会导致任务丢失,并且丢弃的是最先进入队列的任务。如果队列中的任务有严格的先后顺序要求,这个策略会破坏业务逻辑。
- 适用场景: 适用于那些希望尽可能处理最新数据,而可以丢弃旧数据的场景,例如发布/订阅模型中的最新消息处理。
总结对比
ps:注意新旧任务区别,在阻塞队列里面,旧任务是队头元素,新任务是队尾元素,因为是先入先出嘛!!!
策略 (Policy) | 行为描述 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
AbortPolicy | 抛出 RejectedExecutionException | 能及时反馈系统状态 | 中断调用者流程 | 默认,关键业务,不容忍任务丢失 |
CallerRunsPolicy | 由提交任务的线程执行 | 不丢任务,有反压效果 | 会阻塞提交任务的线程 | 高负载,需要限流,不希望丢任务 |
DiscardPolicy | 直接丢弃新任务 | 实现简单 | 任务静默丢失 | 非关键业务,允许数据丢失 |
DiscardOldestPolicy | 丢弃队列中最旧的任务,再提交新任务 | 优先处理新任务 | 旧任务会丢失 | 追求最新数据处理的场景 |
自定义拒绝策略
除了以上四种,你还可以通过实现 RejectedExecutionHandler
接口来创建自己的拒绝策略。这个接口只有一个方法需要实现:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
这提供了极大的灵活性,你可以根据业务需求实现任何逻辑,例如:
- 记录日志:将无法执行的任务信息记录到日志中,方便后续排查。
- 持久化存储:将任务保存到数据库、Redis 或消息队列(如 Kafka/RabbitMQ)中,等待系统负载降低后,再由一个专门的线程从这些地方取出任务重新执行。
- 带超时的重试:尝试在一定时间内重新将任务放入队列。
自定义策略示例(记录日志并丢弃):
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LoggingDiscardPolicy implements RejectedExecutionHandler {private static final Logger logger = LoggerFactory.getLogger(LoggingDiscardPolicy.class);@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 记录被拒绝的任务信息logger.warn("Task {} rejected from executor {}", r.toString(), executor.toString());// 然后什么也不做(即丢弃)}
}// 使用自定义策略
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),new LoggingDiscardPolicy() // 设置自定义策略
);
选择哪种策略取决于你的业务需求,正确配置拒绝策略是构建一个健壮、高可用的并发程序的关键一环。