#简易线程池...实现原理
代码实现(带详细注释)
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 简单线程池实现
* 1. 使用阻塞队列管理待执行任务
* 2. 固定数量的工作线程处理任务
* 3. 支持优雅关闭
*/
public class SimpleThreadPool {
// 任务队列:用于存储待执行的任务,采用线程安全的LinkedBlockingQueue实现
private final BlockingQueue<Runnable> taskQueue;
// 工作线程数组:用于执行任务的实际线程
private final Worker[] workers;
// 线程池状态标志:volatile保证多线程可见性
private volatile boolean isStopped = false;
/**
* 构造函数:初始化线程池
* @param numThreads 线程池中工作线程的数量
*/
public SimpleThreadPool(int numThreads) {
// 创建无界任务队列(可根据需求改为有界队列)
this.taskQueue = new LinkedBlockingQueue<>();
// 初始化工作线程数组
this.workers = new Worker[numThreads];
// 创建并启动所有工作线程
for (int i = 0; i < numThreads; i++) {
workers[i] = new Worker();
workers[i].start(); // 启动工作线程
// !!!重点!!!! 是 SimpleThreadPool 的构造方法,在被创建后就会执行,开启的多个线程就start了
}
}
/**
* 提交任务到线程池
* @param task 待执行的任务(Runnable接口实现)
* @throws IllegalStateException 如果线程池已关闭
*/
public void submit(Runnable task) {
// 检查线程池状态
if (!isStopped) {
try {
// 将任务放入队列(如果队列满会阻塞)
taskQueue.put(task);
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
}
} else {
throw new IllegalStateException("ThreadPool is stopped");
}
}
/**
* 关闭线程池
* 1. 设置停止标志
* 2. 中断所有工作线程
*/
public void shutdown() {
// 设置停止标志
isStopped = true;
// 中断所有工作线程
for (Worker worker : workers) {
worker.interrupt();
}
}
/**
* 工作线程内部类
* 负责从任务队列中获取并执行任务
*/
private class Worker extends Thread {
@Override
public void run() {
// 只要线程池未关闭,就持续执行任务
while (!isStopped) {
try {
// 从队列获取任务(队列空时阻塞)
Runnable task = taskQueue.take();
// 执行任务(同步执行)
task.run();
/* !!!!重点!!!!
//我们都知道一个Runnable对象的run方法,在直接调用时,它是没有开启一个新的线程的
//在这个案例中,每一个Worker是一个线程, 其中执行runnable对象的run方法 ,
// 续上句: 这就相当于是 开了新的线程去执行Runnable对象的run方法 , 虽然这里对线程对象复用了,但原理是这样.
//线程对象 的任务是 从阻塞队列 取出Runnable对象, 并执行其run方法
*/
} catch (InterruptedException e) {
// 检查是否是因关闭线程池导致的中断
if (isStopped) {
break; // 如果是关闭操作,则退出循环
}
// 其他中断情况继续循环
}
}
}
}
/**
* 测试主方法
*/
public static void main(String[] args) {
// 创建包含3个工作线程的线程池
SimpleThreadPool threadPool = new SimpleThreadPool(3);
// 提交10个任务
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
threadPool.submit(() -> {
System.out.println("开始执行任务 " + taskNumber + ",执行线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("完成任务 " + taskNumber);
});
}
// 关闭线程池
threadPool.shutdown();
}
}
执行流程与原理详解
1. 初始化阶段
- 创建线程池实例:调用构造函数
SimpleThreadPool(3)
创建包含3个工作线程的线程池 - 初始化任务队列:创建
LinkedBlockingQueue
作为任务缓冲区 - 创建工作线程:
- 创建3个
Worker
线程实例 - 调用
start()
方法启动每个工作线程
- 创建3个
- 工作线程启动:
- 每个
Worker
线程进入run()
方法 - 立即调用
taskQueue.take()
尝试获取任务(此时队列为空,线程阻塞)
- 每个
2. 任务提交阶段
- 提交任务:主线程调用
submit()
方法提交任务 - 任务入队:
taskQueue.put(task)
将任务放入队列- 如果队列已满,该方法会阻塞(本示例使用无界队列,不会出现)
- 唤醒工作线程:
put()
操作会唤醒一个阻塞在take()
的工作线程- 被唤醒的线程获取到任务并开始执行
3. 任务执行阶段
- 任务获取:工作线程从
taskQueue.take()
获取任务 - 同步执行:直接调用
task.run()
执行任务 - 执行完成:
- 任务执行完毕后,工作线程再次调用
take()
获取新任务 - 如果队列为空,线程再次阻塞等待
- 任务执行完毕后,工作线程再次调用
4. 线程池关闭阶段
- 设置关闭标志:
shutdown()
方法将isStopped
设为true - 中断工作线程:调用每个工作线程的
interrupt()
方法 - 线程终止:
- 工作线程捕获中断异常
- 检查
isStopped
为true后退出循环 - 线程自然终止
核心设计原理
1. 生产者-消费者模式
- 生产者:调用
submit()
方法的主线程 - 消费者:
Worker
工作线程 - 缓冲区:
BlockingQueue
任务队列
2. 工作线程管理
- 线程复用:每个
Worker
线程通过循环持续处理任务,避免频繁创建/销毁线程 - 负载均衡:多个工作线程自动竞争获取任务,实现任务均衡分配
3. 线程安全保证
- 队列安全:
LinkedBlockingQueue
内部使用锁保证线程安全 - 状态可见性:
isStopped
使用volatile保证多线程可见性
4. 优雅关闭机制
- 标志位检查:
isStopped
控制线程是否继续运行 - 中断响应:通过中断唤醒阻塞在队列的工作线程
- 资源清理:确保所有线程能够正常退出
性能特点分析
-
优点:
- 实现简单直观
- 有效控制并发线程数量
- 减少线程创建销毁开销
- 任务缓冲避免直接拒绝
-
局限:
- 固定线程数,无法动态扩展
- 无任务拒绝策略(队列无界)
- 关闭时可能丢失队列中的任务
扩展建议
- 动态线程数:增加核心线程数和最大线程数配置
- 拒绝策略:添加队列满时的处理策略
- 任务优先级:改用
PriorityBlockingQueue
- 执行结果获取:增加
Future
机制 - 线程池监控:添加任务计数、线程状态等监控指标
这个实现展示了线程池的核心工作原理,实际开发中建议使用Java标准库的ThreadPoolExecutor
,它提供了更完善的功能和更好的性能。