当前位置: 首页 > news >正文

#简易线程池...实现原理

代码实现(带详细注释)

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 简单线程池实现
 * 1. 使用阻塞队列管理待执行任务
 * 2. 固定数量的工作线程处理任务
 * 3. 支持优雅关闭
 */
public class SimpleThreadPool {
    // 任务队列:用于存储待执行的任务,采用线程安全的LinkedBlockingQueue实现
    private final BlockingQueue<Runnable> taskQueue;
    
    // 工作线程数组:用于执行任务的实际线程
    private final Worker[] workers;
    
    // 线程池状态标志:volatile保证多线程可见性
    private volatile boolean isStopped = false;

    /**
     * 构造函数:初始化线程池 
     * @param numThreads 线程池中工作线程的数量
     */
    public SimpleThreadPool(int numThreads) {
        // 创建无界任务队列(可根据需求改为有界队列)
        this.taskQueue = new LinkedBlockingQueue<>();
        
        // 初始化工作线程数组
        this.workers = new Worker[numThreads];

        // 创建并启动所有工作线程
        for (int i = 0; i < numThreads; i++) {
            workers[i] = new Worker();
            workers[i].start();  // 启动工作线程

            // !!!重点!!!! 是 SimpleThreadPool 的构造方法,在被创建后就会执行,开启的多个线程就start了

        }
    }

    /**
     * 提交任务到线程池
     * @param task 待执行的任务(Runnable接口实现)
     * @throws IllegalStateException 如果线程池已关闭
     */
    public void submit(Runnable task) {
        // 检查线程池状态
        if (!isStopped) {
            try {
                // 将任务放入队列(如果队列满会阻塞)
                taskQueue.put(task); 
            } catch (InterruptedException e) {
                // 恢复中断状态
                Thread.currentThread().interrupt();
            }
        } else {
            throw new IllegalStateException("ThreadPool is stopped");
        }
    }

    /**
     * 关闭线程池
     * 1. 设置停止标志
     * 2. 中断所有工作线程
     */
    public void shutdown() {
        // 设置停止标志
        isStopped = true;
        
        // 中断所有工作线程
        for (Worker worker : workers) {
            worker.interrupt(); 
        }
    }

    /**
     * 工作线程内部类
     * 负责从任务队列中获取并执行任务
     */
    private class Worker extends Thread {  
        @Override
        public void run() {
            // 只要线程池未关闭,就持续执行任务
            while (!isStopped) {
                try {
                    // 从队列获取任务(队列空时阻塞)
                    Runnable task = taskQueue.take(); 
                    
                    // 执行任务(同步执行)
                    task.run(); 
                    
                    /* !!!!重点!!!!
                    //我们都知道一个Runnable对象的run方法,在直接调用时,它是没有开启一个新的线程的
                    //在这个案例中,每一个Worker是一个线程, 其中执行runnable对象的run方法 , 
                    //   续上句: 这就相当于是 开了新的线程去执行Runnable对象的run方法 , 虽然这里对线程对象复用了,但原理是这样.
                    //线程对象 的任务是 从阻塞队列 取出Runnable对象, 并执行其run方法 
                    */
                    
                } catch (InterruptedException e) {
                    // 检查是否是因关闭线程池导致的中断
                    if (isStopped) {
                        break;  // 如果是关闭操作,则退出循环
                    }
                    // 其他中断情况继续循环
                }
            }
        }
    }

    /**
     * 测试主方法
     */
    public static void main(String[] args) {
        // 创建包含3个工作线程的线程池
        SimpleThreadPool threadPool = new SimpleThreadPool(3); 

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            threadPool.submit(() -> {
                System.out.println("开始执行任务 " + taskNumber + ",执行线程:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("完成任务 " + taskNumber);
            });
        }

        // 关闭线程池
        threadPool.shutdown(); 
    }
}

执行流程与原理详解

1. 初始化阶段

  1. 创建线程池实例:调用构造函数SimpleThreadPool(3)创建包含3个工作线程的线程池
  2. 初始化任务队列:创建LinkedBlockingQueue作为任务缓冲区
  3. 创建工作线程
    • 创建3个Worker线程实例
    • 调用start()方法启动每个工作线程
  4. 工作线程启动
    • 每个Worker线程进入run()方法
    • 立即调用taskQueue.take()尝试获取任务(此时队列为空,线程阻塞)

2. 任务提交阶段

  1. 提交任务:主线程调用submit()方法提交任务
  2. 任务入队
    • taskQueue.put(task)将任务放入队列
    • 如果队列已满,该方法会阻塞(本示例使用无界队列,不会出现)
  3. 唤醒工作线程
    • put()操作会唤醒一个阻塞在take()的工作线程
    • 被唤醒的线程获取到任务并开始执行

3. 任务执行阶段

  1. 任务获取:工作线程从taskQueue.take()获取任务
  2. 同步执行:直接调用task.run()执行任务
  3. 执行完成
    • 任务执行完毕后,工作线程再次调用take()获取新任务
    • 如果队列为空,线程再次阻塞等待

4. 线程池关闭阶段

  1. 设置关闭标志shutdown()方法将isStopped设为true
  2. 中断工作线程:调用每个工作线程的interrupt()方法
  3. 线程终止
    • 工作线程捕获中断异常
    • 检查isStopped为true后退出循环
    • 线程自然终止

核心设计原理

1. 生产者-消费者模式

  • 生产者:调用submit()方法的主线程
  • 消费者Worker工作线程
  • 缓冲区BlockingQueue任务队列

2. 工作线程管理

  • 线程复用:每个Worker线程通过循环持续处理任务,避免频繁创建/销毁线程
  • 负载均衡:多个工作线程自动竞争获取任务,实现任务均衡分配

3. 线程安全保证

  • 队列安全LinkedBlockingQueue内部使用锁保证线程安全
  • 状态可见性isStopped使用volatile保证多线程可见性

4. 优雅关闭机制

  1. 标志位检查isStopped控制线程是否继续运行
  2. 中断响应:通过中断唤醒阻塞在队列的工作线程
  3. 资源清理:确保所有线程能够正常退出

性能特点分析

  1. 优点

    • 实现简单直观
    • 有效控制并发线程数量
    • 减少线程创建销毁开销
    • 任务缓冲避免直接拒绝
  2. 局限

    • 固定线程数,无法动态扩展
    • 无任务拒绝策略(队列无界)
    • 关闭时可能丢失队列中的任务

扩展建议

  1. 动态线程数:增加核心线程数和最大线程数配置
  2. 拒绝策略:添加队列满时的处理策略
  3. 任务优先级:改用PriorityBlockingQueue
  4. 执行结果获取:增加Future机制
  5. 线程池监控:添加任务计数、线程状态等监控指标

这个实现展示了线程池的核心工作原理,实际开发中建议使用Java标准库的ThreadPoolExecutor,它提供了更完善的功能和更好的性能。

相关文章:

  • 从零开始的图论讲解(1)——图的概念,图的存储,图的遍历与图的拓扑排序
  • ubuntu 20.04 编译和运行A-LOAM
  • std::async 和 std::thread 的主要区别
  • 使用Vue、Nodejs以及websocket搭建一个简易聊天室
  • 项目难点亮点
  • 国密算法(SM2/SM3/SM4)与国际算法(AES/RSA/SHA-256)
  • 数据集的训练-测试拆分在机器学习中的重要性
  • Mac下Homebrew的安装与使用
  • SLAAC 与 DHCPv6 笔记250405
  • adb devices报错 ADB server didn‘t ACK
  • 第八届蓝桥杯大赛软件赛省赛C/C++ 大学 B 组 购物单
  • 23种设计模式-行为型模式-状态
  • 使用 Qt 和 OBS 工具检测系统硬件编码器支持情况(NVENC、QSV、AMF)
  • InceptionNeXt:When Inception Meets ConvNeXt论文翻译
  • ML:Sigmoid 饱和函数
  • C++第1讲:基础语法;通讯录管理系统
  • ROS云课三分钟-差动移动机器人巡逻报告如何撰写-评分良好
  • python:获取某路径下所有图片的名称
  • 拉普拉斯变换
  • COMSOL 与人工智能融合的多物理场应用:28个案例的思路、方法与工具概述
  • 怎么用axure做网站导航栏/北京软件培训机构前十名
  • qq排名优化网站/域名注册信息查询whois
  • 网站开发与维护是做什么工作/推广百度百科
  • 付费网站建设/池州网站seo
  • 江门营销型网站建设多少钱/seo关键词挖掘工具
  • 939w78w78w乳液永久/登封网站关键词优化软件