当前位置: 首页 > news >正文

线程池ThreadPoolExecutor

1 概述

JDK提供了一个线程池的实现ThreadPoolExecutor,线程的创建和销毁都比较耗费CPU和内存资源,线程池通过线程共享的方式,减少这些资源的消耗,但想用好线程池,需要对其原理有一定的了解。

2 原理

2.1 支持的参数

ThreadPoolExecutor最多支持7个参数:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
}
  • corePoolSize:保持在线程池的线程数量,如果没有设置allowCoreThreadTimeOut为true,即使线程是空闲的也保持着,allowCoreThreadTimeOut默认为false。在线程数没有达到corePoolSize时,新的任务会启动新的线程执行。
  • maximumPoolSize:线程池运行的最大线程数量。如果任务数超过最大线程数,则会在队列中等待。
  • keepAliveTime:当线程是空闲的时候,如果线程数量超过corePoolSize,空闲线程最大的等待时间为keepAliveTime,超过这个时间会关闭该空闲线程。
  • unit:等待时间keepAliveTime的单位。
  • workQueue:在任务被执行前,任务先放到该队列中。此队列仅用于用execute方法提交的任务。
  • threadFactory:创建线程的工厂。
  • handler:当到达线程数量限制或者队列容量限制时的处理方式。默认为AbortPolicy,即抛异常拒绝。

2.2 线程工厂ThreadFactory

java.util.concurrent.Executors提供了两种创建线程的工厂。

1) 默认工厂

主要设置了线程的名称带pool标识,设置成非后台线程,优先级为普通。

static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}

2) 带权限校验工厂

static class PrivilegedThreadFactory extends DefaultThreadFactory {private final AccessControlContext acc;private final ClassLoader ccl;PrivilegedThreadFactory() {super();SecurityManager sm = System.getSecurityManager();if (sm != null) {// Calls to getContextClassLoader from this class// never trigger a security check, but we check// whether our callers have this permission anyways.sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);// Fail fastsm.checkPermission(new RuntimePermission("setContextClassLoader"));}this.acc = AccessController.getContext();this.ccl = Thread.currentThread().getContextClassLoader();}public Thread newThread(final Runnable r) {return super.newThread(new Runnable() {public void run() {AccessController.doPrivileged(new PrivilegedAction<Void>() {public Void run() {Thread.currentThread().setContextClassLoader(ccl);r.run();return null;}}, acc);}});}
}

2.3 拒绝策略RejectedExecutionHandler

在ThreadPoolExecutor里提供了下面4种拒绝策略:

1) 直接使用调用者线程执行CallerRunsPolicy,适用于不允许数据丢弃的场景,用调用者线程直接执行,也延缓调用者提交数据的速度。

public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}

2) 抛异常拒绝AbortPolicy,适用于数据不允许丢弃的场景,抛异常阻止数据提交,让提交者处理。

/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/
public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
}

3) 直接丢弃,适用于数据没有时效性,允许丢弃的场景。

/*** A handler for rejected tasks that silently discards the* rejected task.*/
public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 啥也没做}
}

4) 丢弃最老的而执行当前的,适用于老数据没有新数据重要,允许数据丢弃的场景。

/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}

2.4 线程池控制状态 ctl(AtomicInteger)

线程池控制状态 ctl 是一个原子整数,它封装了两个概念性字段:
  • workerCount:表示有效线程数量
  • runState:表示线程池状态(如运行中、正在关闭等)
    • RUNNING:接受新任务并处理队列中的任务。
    • SHUTDOWN:不接受新任务,但会处理队列中的任务。
    • STOP:不接受新任务,不处理队列中的任务,且会中断正在执行的任务。
    • TIDYING:所有任务已终止,workerCount 为 0,即将转换到该状态的线程会执行 terminated() 钩子方法。
    • TERMINATED:terminated() 方法已执行完成。
// java.util.concurrent.ThreadPoolExecutor
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 控制状态
private static final int COUNT_BITS = Integer.SIZE - 3; // 状态比特3位,在高位
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 容量比特(32-3)=29位,在低位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
这样设计的好处:
  • 保证操作的原子性,避免竞态条件
    • 线程池的状态(如从 RUNNING 转为 SHUTDOWN)和工作线程数量(如新增或减少线程)在并发场景下需要协同修改。
      • 当线程池接收新任务时,需要同时检查 “当前状态是否允许接收任务” 和 “当前线程数是否未达上限”。
      • 当线程池关闭时,需要同时修改状态并调整线程数量。
    • 通过一个 AtomicInteger 存储,可通过 CAS 操作(compareAndSet) 原子性地同时修改两个字段。
  • 减少同步加锁开销,提升性能
    • 线程池是高并发场景下的核心组件,需要频繁处理任务提交、线程创建 / 销毁、状态变更等操作。
    • 若使用两个独立的 AtomicInteger 分别存储状态和数量,每次协同操作(如 “检查状态并调整线程数”)需要加锁来保证两者的一致性,开销较高。
    • 合并为一个变量后,一次 CAS 操作即可完成状态和数量的协同更新,操作更轻量,减少了并发冲突的可能性,显著提升高并发场景下的性能。
  • 简化代码逻辑,降低复杂度
    • 线程池的状态转换和线程数量调整存在强关联(例如 STOP 状态必须伴随线程中断和数量清零)。将两者封装在一个变量中,可通过统一的位运算逻辑处理状态判断和数量计算,避免了两个变量之间的协调逻辑,使代码更紧凑、可读性更高。

3 小结

  • 池化:初始化比较耗资源的对象应该使用池化技术,线程池、对象池、连接池等。
    • 保留的数量、最高数量、空闲处理(超时关闭/直接关闭)
  • 多数据不用锁:合并到一个AtomicInteger或AtomicLong里。
  • 队列满处理策略:新来的同步处理、直接丢弃、留新的丢弃老的、抛异常拒绝
http://www.dtcms.com/a/533674.html

相关文章:

  • 合同的系统培训约定
  • cf租号网站怎么做的河南建设安全协会网站
  • 建设的网站别人登录密码公司企业邮箱号
  • 电商网站怎样做天津网站建设哪个好
  • 长沙网站建设接单wordpress站点名没有更改
  • 手机wap网站建设一套完整的app 开发流程
  • 在线制作wap网站中牟建设工程信息网站
  • K8S--标签(labels)和选择器(selectors)的作用
  • G1 垃圾收集器
  • 湛江免费建站公司磁力在线搜索引擎
  • 广东省城乡建设厅网站首页如何判断网站有cdn加速
  • Bugku-Web题目-no select
  • 健康门户网站建设内容求职网站开发
  • Linux 信号处理视角下的 volatile 关键字
  • 广告文案优秀网站网络推广营销方式
  • 增城高端定制网站建设网站建设全域云
  • 小型深圳网站页面设计网页制作基础教程代码
  • 周学习记录
  • 建设网站职业证书查询wordpress菜单前面加图标
  • 南昌网站建设服务平台兰州网站设计公司
  • 河南网站建设公司价格wordpress 引导页
  • C-文件操作
  • 【第一章】基于Simulink的控制器开发教程——目录
  • 重庆个人网站建设单页网站制作视频教程
  • Dubbo 消费者是如何与 Spring 融合的?
  • 徐州发布网站怎样临沂网站建设
  • 怎么免费做一个网站盛世阳光-网站建设
  • Nav2 Lifecycle Manager:生命周期管理器的设计哲学与源码级运行机制
  • 云服务器上安装Tomcat
  • 高端网站建设 n磐石网络广州市增城区住房和建设局网站