当前位置: 首页 > news >正文

Java并发编程-线程池(四)

文章目录

  • 线程池实现原理
    • Worker
      • Worker 核心设计总结
    • runWorker(Worker w)
    • 总结

线程池实现原理

上一篇我们看了 addWork 方法,那接下来就让我们详细看看内部类Worker。

Worker

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{final Thread thread; //worker自己的线程Runnable firstTask;Worker(Runnable firstTask) {setState(-1); // 默认禁止中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); // 调用ThreadPoolExecutor的runWorker方法(后面会讲)}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { //仅当状态为0时获取锁,不可重入setExclusiveOwnerThread(Thread.currentThread());//课后习题return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {//调用链:shutdownNow()->interruptWorkers()->interruptIfStarted()Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {// 中断条件:运行中/运行结束 && 线程初始化过 && 没有被中断过try {t.interrupt();} catch (SecurityException ignore) {}}}
}

Worker类也继承了AQS,主要目的是利用state实现线程执行任务时的中断控制,确保线程池的优雅关闭机制。利用AQS的state字段来区分线程的初始状态、运行状态及中断权限。

  • 初始化锁状态:在Worker的构造函数中调用setState(-1),将状态设置为非0值(-1),表示**默认禁止中断** 。这样可以避免新创建的Worker线程在未执行任务前(即未调用runWorker方法时)被中断干扰。

    • 实际任务执行时:在runWorker方法中,通过调用unlock()方法将状态从-1重置为0,表示允许中断。
  • shutdownNow()的中断条件:运行中/运行结束 && 线程初始化过 && 没有被中断过

  • shutdown()的中断条件:

final void runWorker(Worker w) {...w.unlock(); // 允许中断...w.lock();...w.unlock();...
}
//调用链:shutdown()->interruptIdleWorkers()
private void interruptIdleWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {// 中断条件:没有被中断 && 没有任务在执行try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}}} finally {mainLock.unlock();}
}

Worker 核心设计总结

  • 同步机制:通过AQS的自定义锁管理,实现任务的原子执行和中断控制。

  • 状态标记:利用AQS的state字段区分线程的初始状态、运行状态及中断权限。

  • 线程池管理:支持线程池在关闭时区分处理空闲线程和运行线程,确保任务完整性。

Worker会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点:

runWorker(Worker w)

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断//标识线程是否因异常(如任务抛出未捕获错误)而非正常退出boolean completedAbruptly = true; try {// 循环获取工作队列里的任务来执行while (task != null || (task = getTask()) != null) {// 当前有任务 或者 能从队列中拿到任务w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||//若线程池正在关闭(STOP 状态)(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&// Thread.interrupted() 会清除当前线程的中断状态(返回历史值),// 若为 true 且线程池已停止,需重新标记中断防止代码忽略此请求。!wt.isInterrupted())//在强制关闭(如 shutdownNow())时,快速中断正在处理的任务wt.interrupt();try {//空方法(钩子),可被子类覆写以添加日志、监控或自定义逻辑(如任务执行时间统计)beforeExecute(wt, task);Throwable thrown = null;try {task.run(); // 执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;//避免任务对象被错误复用或内存泄漏w.completedTasks++;//统计Worker完成的任务数(用于线程池监控或扩容策略)w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly)//补偿操作:若线程异常终止(未正常调整计数),//需主动减少工作线程计数(workerCount)以保持准确性decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//累计完成数workers.remove(w);//移除失效线程} finally {mainLock.unlock();}//当前线程退出后可能满足线程池终止条件(如所有线程退出且任务队列为空)//尝试将线程池状态过渡到TERMINATED(需符合SHUTDOWN/STOP状态且无活动线程和待处理任务)tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//线程池仍允许处理任务(如RUNNING/SHUTDOWN)if (!completedAbruptly) {//若允许核心线程超时(allowCoreThreadTimeOut),最小值为 0;//否则为 corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;// 队列非空时至少保留一个线程处理任务if (workerCountOf(c) >= min)return; //当前线程数足够,无需补充}addWorker(null, false);//补充新Worker}
}

总结

ThreadPoolExecutor中线程执行任务的示意图
在这里插入图片描述

线程池中的线程执行任务分两种情况。

  1. 在execute()方法中创建一个线程时,会让这个线程执行当前任务。

  2. 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

相关文章:

  • 安全版4.5.8开启审计后,hac+读写分离主备切换异常
  • 基于springboot+vue的机场乘客服务系统
  • 图像对比度调整(局域拉普拉斯滤波)
  • 记一次缓存填坑省市区级联获取的操作
  • 2025-5-16Vue3快速上手
  • SqlHelper 实现类,支持多数据库,提供异步操作、自动重试、事务、存储过程、分页、缓存等功能。
  • Spring MVC 中请求处理流程及核心组件解析
  • RK3588 ADB使用
  • 衡量 5G 和未来网络的安全性
  • 算法练习:19.JZ29 顺时针打印矩阵
  • 官方 Elasticsearch SQL NLPChina Elasticsearch SQL
  • 将嵌入映射到 Elasticsearch 字段类型:semantic_text、dense_vector、sparse_vector
  • HarmonyOS开发组件基础
  • 前端脚手架开发指南:提高开发效率的核心操作
  • 在 Kotlin 中,什么是解构,如何使用?
  • 探索嵌入式硬件的世界:技术、应用与未来趋势
  • 小结:jvm 类加载过程
  • python中列表的操作
  • VR场景制作如何完成?
  • 服务器内部可以访问外部网络,docker内部无法访问外部网络,只能docker内部访问
  • 美联储官员:美国经济增速可能放缓,现行关税政策仍将导致物价上涨
  • 戛纳打破“疑罪从无”惯例,一法国男演员被拒之门外
  • 国家统计局向多省份反馈统计督察意见
  • 证券日报:降准今日正式落地,年内或还有降准空间
  • 美叙领导人25年来首次会面探索关系正常化,特朗普下令解除对叙经济制裁
  • 秦洪看盘|指标股发力,A股渐有突破态势