当前位置: 首页 > news >正文

多线程代码案例(线程池)- 4

目录

引入

标准库中的线程池 -- ThreadPoolExecutor

研究一下这个方法的几个参数

1. int corePoolSize

2. int maximumPoolSize 

3. long keepAliveTime

4. TimeUnit unit

5. BolckingQueue workQueue

6. ThreadFactory threadFactory 

7. RejectedExecutionHandler handler

四种拒绝策略

Executors 工厂类

如何确定线程池的数量???

实现一个简单的线程池


引入

池,是一个非常重要的概念,我们有常量池,数据库连接池,线程池,进程池,内存池....

池的作用:

        1. 提前把要用的对象准备好。

        2. 把用完的对象也不立即释放,先留着以备下次使用 ==》 提高效率!!!

举个栗子: 我是个美女,谈了一个男朋友。如果我有一天对这个男人厌倦了,如何才能提高更换男朋友的效率呢? ==》 备胎池...

最开始,进程能够解决并发编程的问题,之后因为频繁的创建和销毁进程,成本太高了,所以我们引入了轻量级的进程 --> 线程,但如果创建和销毁线程的频率进一步提高,此时线程的创建和销毁的开销,也不能够无视了。(抛开剂量谈毒性,都是耍流氓)我们就需要想办法来优化此处的线程的创建销毁的效率。

        1. 引入轻量级 线程 --> 纤程 / 协程(Java 21 里引入 “虚拟线程” 就是这个东西)。协程本质,是程序员在用户态的代码中进行调度,不是靠内核的调度器调度 ==》节省了很多调度上的开销。(我们在此处不做深入研究...)

        2. 线程池 把要使用的线程提前创建好了,用完了也不直接释放而是以备下次使用。这样就节省了创建 / 销毁 线程的开销。在这个过程中,并没有真的频繁创建销毁线程,只是从线程池里面,取线程使用,用完了就还给线程池。(在这个过程中,会占用比较多的空间,这个代价是无法避免的,可以接收)

那为什么,从线程池里面取线程,就比从系统申请更加高效呢???

举个栗子来说明:还是银行的例子,柜台里面,相当于内核态,大堂相当于用户态。

当我们要办理业务,需要一个身份证复印件的时候,我们并没有带,这时候,柜员就说我们有两个途径取解决:

        1. 自己拿着身份证,去自助复印机上复印即可(纯用户态代码)

        2. 把身份证交给柜员,他拿着身份证帮你去复印。(柜员拿到我们的身份证之后,就消失在我们的视野中了,此时我们无法知道他要花费多长时间,也不知道他都要做那些事情,我们唯一能做的,就是等,等他回来 ~~)

基本的结论:

如果一个工作,滑稽老铁自己就能完成,就更加可控,更加高效。 从线程池里面取线程,就是纯用户代码,就是可控的。

如果一个工作,滑稽老铁要拜托银行的柜员来完成,就不可控,更低效。通过系统申请创建线程,就是需要内核来完成的,不太可控。

标准库中的线程池 -- ThreadPoolExecutor

我们可以在 Java 官方文档中,找到 java.util.concurrent 包,在下面的 Classes 中就可以找到 ThreadPoolExecutor,往下翻可以找到构造方法,有 4 种

我们只需要关注最后一个即可(最后一个的参数是最全的)

研究一下这个方法的几个参数

1. int corePoolSize

表示的是 核心线程数(一个线程池里面,最少得有多少个线程)

2. int maximumPoolSize 

表示的是 最大线程数(一个线程池里面,最多最多能有多少个线程)

注意: 标准库提供的线程池,持有的线程个数,并非是一成不变的,会根据当前任务量,自适应线程的个数。(任务非常多,就多搞几个线程;任务比较少,就少搞几个线程)

举个栗子:假设一个公司,里面有 10 个员工(正式签劳动合同的员工)。当公司的业务非常繁忙的时候,10 个人干不过来了,就需要招聘,一个成本比较低的做法是,招聘实习生(非正式员工),比如可以再招聘 5 个实习生(廉价劳动力)

过了一段时间,公司没那么忙了,大家都闲下来开始摸鱼了,10 个正式员工,是不能被随便裁员的(劳动仲裁~~~)但是这 5 个实习生,是可以随便裁的。把这 5 个实习生裁掉,使当前这 10 个正式员工也没有那么空闲了,整体的成本就降低了。

如果过了一段时间,公司业务又多了起来,10 个人又忙不过来了,此时重新再招几个实习生就好啦~~~

通过实习生,来应对突发的峰值!!!

10 个正式员工,就是核心线程数(参数叫 核心线程数, 而不是 最小线程数)

10 + 5 正式员工 + 实习生 就是最大线程数了

3. long keepAliveTime

表示的是 保持存活时间

4. TimeUnit unit

表示的是 时间单位(s,min,ms,hour)

再用我们的栗子解释:keepAliveTime 就是实习生线程,允许最大的空闲摸鱼时间。即,如果发现某个实习生正在摸鱼(这个线程空闲了),此时要立即马上把这个实习生开除掉吗???不应该!!!可能发生,这边一空闲马上就开除,结果下一时刻,任务又突然多起来了!!!

此处的 keepAliveTime 意思就是实习生线程,空闲时间超过了这个时间阈值,就会被销毁掉。注意:实习生线程被销毁之后,就没有了,在未来的某一天,线程还会重新招聘实习生,但不是之前的那个了

5. BolckingQueue<Runnable> workQueue

这个和定时器是类似的阻塞队列,用来存放等待执行的任务。当核心线程都在忙碌的时候,新任务会被放入这个队列中排队等待。用 Runnable 来作为描述任务的主体。 ==》 也可以设置 PriorityBlockingQueue 带有优先级

6. ThreadFactory threadFactory 

这个表示线程工厂

工厂模式,也是一种常见的设计模式。通过专门的 “工厂类 / 工厂对象”来创建指定的对象~~

工厂模式,本质上是给 Java 语法填坑的 

举个栗子:

我们会发现,上面的代码,无法通过编译~~

在 c++ / Java 中要想提供多个版本的构造方法,就需要让这多个方法能够构成重载

重载的要求:

1. 方法名字相同(构造方法的名字本身都相同)

2. 形参的 个数 / 类型 不同!

上面的代码不符合第二个要求,所以无法通过编译。为了解决上述问题,就引入了 “工厂模式”,使用普通的方法来创建对象,把构造方法封装了一层

如果把工厂方法放到一个其他的类里面,这个其他的类就叫做“工厂类”。

总的来说,通过静态方法封装 new 操作,无需实例化对象,在方法内部设定不同的属性完成对象初始化,构造对象的过程,就是工厂模式。

回过头来说我们的参数:ThreadFactory threadFactory 通过这个工厂类,来创建线程对象(Thread 对象)在这个类里面提供了方法(也不一定非得是静态的)让方法封装 new Thread 的操作,并且给 Thread 设置一些属性,就构成了 ThreadFactory 线程工厂!

7. RejectedExecutionHandler handler

上述参数中,这个是最重要的!!!

这个表示的是拒绝策略。在线程池中,有一个阻塞队列,能够容纳的元素是有上限的。当任务队列已经满了的时候,如果继续往队列里面添加元素,那么线程池会怎么办呢??? ==》 就是这个拒绝策略参数了!

在官方文档中,构造方法的上面就是拒绝策略

四种拒绝策略

1. 继续添加任务,直接抛出异常。此时就是“撂挑子”的状态,新任务 旧任务 都不执行了!!!

2. 新的任务,由添加任务的线程负责执行,此时新的任务会执行,不过并不是线程池执行,而是调用者执行。同事让我帮忙,我自己都忙的焦头烂额,只能忙自己的,同事的忙还得他自己解决。

3. 丢弃最老的任务。将最老的任务舍弃一个,然后执行新的任务。

4. 丢弃最新的任务。直接抛弃新的任务,新的任务就无了,不执行了,调用的线程不会进程, 线程池也不会执行。

Executors 工厂类

ThreadPoolExecutor 本身使用起来还是比较复杂, 因此 Java 标准库还提供了另一个版本,把 ThreadPoolExecutor 给封装了一下~~

Executors 工厂类,通过这个类来创建出不同的线程池对象(在内部把 ThreadPoolExecutor 创建好了并且设置了不同的参数)

我们可以创建一个线程池用如下的方式

可以看到 Executors 这个工厂类中有许多不同的线程池

newSingleThreadExecutor() 是一个定时器类似物,也能延时执行任务

newScheduleThreadPool 是只包含单个线程的线程池

newCachedThreadPool 是线程数目能够动态扩容是线程池

newFixedThreadPool() 是线程数目固定的线程池

示例代码如下:

打印结果如下:

ThreadPoolExecutor 也是通过 submit 添加任务的,只是构造方法不同

什么时候使用 Executors 什么时候使用 ThreadPoolExecutor 呢???

当我们只是简单使用一个线程池的时候,就可以使用 Executors

当我们需要一个高度定制化的线程池的时候,就可以使用 ThreadPoolExecutor

网上流传的 阿里巴巴Java开发编程规范中,写了不建议使用 Executors,一定要使用 ThreadPoolExecutor,用 ThreadPoolExecutor 意味着一切尽在掌握之中,不会出现一些不可控的因素~~ 我们可以参考,但还是要以具体的公司编程规范要求为准啦...

如何确定线程池的数量???

创建线程池的时候,很多时候,需要设定线程池的数量。这个数量应该怎么设置比较合适???我们上面只是随意的设置了一个 4 ,到底怎么样是合适的呢?

网上有很多说法,假如 CPU 的逻辑核心数是 N ,网上的说法:线程数量应该是 N,N + 1,1.5N,2N... ==》 都是错误的。

不同的程序,能够设定的线程的数量是不同的,必须要具体问题具体分析。

要区分,一个线程是 CPU 密集型的任务,还是 IO 密集型的任务。

CPU 密集型的任务:这个线程大部分的时间,都在要 CPU 上运行,进行计算。 比如,在线程 run 里面计算 1 + 2 + ... + 10w 这种就是 CPU 密集型

IO 密集型的任务:这个线程大部分的时间都在等待 IO,不需要去 CPU 上运行,比如,线程 run 里,搞一个 scanner,读取用户的输入,就是 IO 密集型

如果一个进程中,所有的线程都是 CPU 密集型的,每个线程所有的工作都是在 CPU 上执行的(假定的一种极端情况~~~)此时,线程的数目就不应该超过 N(CPU 逻辑核心数)

如果一个进程中,所有的线程都是 IO 密集型的,每个线程的大部分工作都是在等待 IO,CPU 消耗非常少,此时线程的数目就可以很多很多,远远超过 N(CPU 逻辑核心数)

上面的两个场景,是两种非常极端的情况,实际上,一个进程中的线程,一部分是 IO,一份是 CPU,这里的比例是不好确定的。

综上,由于程序的复杂性,很难直接对线程的数量进行估计。更合适的做法应该是:通过实验 / 测试的方式,找到合适的线程数目。==》 尝试给线程池,设定不同的线程数目,分别进行性能测试,衡量每种线程数目下,总的时间开销 和 系统资源占用的开销,找到这两者的合适值。

实现一个简单的线程池

我们这里直接写一个固定数目的线程池,暂时不考虑线程数目的增多和减少。

        1. 提供构造方法,指定创建多少个线程。

        2. 在构造方法中,把这些线程都创建好

        3. 有一个阻塞队列,能够持有要执行的方法

        4. 提供 subbmit 方法,可以添加新的任务。

代码如下:

成员变量有 threadList,用来存储管理线程;queue 用来保存任务的队列,这里因为 ArrayBlockingQueue 是线程安全的,所以在下面的构造方法并没有上锁。在线程中,利用 n 来创建指定个线程。在 while(true) 循环中,线程持续运行,不断从任务队列中 take 任务并 run 执行。

submit 方法:将新的任务添加到队列中

解释:

测试代码:

但是在测试代码中,发现有一个小小的红色波浪线,在 i 下面,为啥会编译报错呢?

==》 变量捕获!!!

run 回调函数访问当前外部作用域的变量就是变量捕获,我们之前讲过,变量捕获的值,要不然是 final 修饰的常量值,要不然是一个“事实 final” 变量,但现在 i 是一直变化的,怎么办呢?

在创建一个 n,把 i 赋值给 n,此处的 n 就是一个“事实 final” 变量,每次循环,都是一个新的 n,n 本身没有改变,就可以被捕获!!!

运行起来,注意,这些多个线程之间的执行顺序的不确定的!!!某个线程获取到了某个任务,但是并非立即执行,这个过程中可能其他线程就到前面执行了。(此处的这些线程,彼此之间都是等价的...)

完整代码如下:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class MyThreadPoolExecutor {

    // 用 List 数据结构来存储线程
    private List<Thread> threadList = new ArrayList<>();

    // 是一个用来保存任务的队列
    private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);

    // 通过 n 指定创建多少个线程
    public MyThreadPoolExecutor (int n) {
        for (int i = 0; i < n; i++) {
            Thread t = new Thread(() -> {
                // 线程要做的事情是 把任务队列中的任务不停的取出来,并且进行执行
                while (true) {
                    try {
                        // 此处的 take 是带有阻塞功能的
                        // 如果队列为空,此处的 take 会阻塞
                        Runnable runnable = queue.take();
                        // 取出一个任务执行一个任务
                        runnable.run();
                    } catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            });
            t.start();
            threadList.add(t);
        }
    }

    // 将新任务添加到任务队列里面!
    public void submit(Runnable runnable) throws InterruptedException {
        queue.put(runnable);
    }
    
}
public class ThreadDemo43 {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(4);
        for (int i = 0; i < 1000; i++) {
            int n = i;
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("执行任务 " + n + "当前线程为:+ " + Thread.currentThread().getName());
                }
            });
        }
    }
}

代码细节完善: 

        1. 我们还可以补充一个关闭线程的方法 shutdown

        2. 补充 shutdown 的成员变量 isShutdown 并且优化捕捉到异常时候的操作

        3. submit 前先进行一个判断:

        4. 打印完毕之后,大约 5s 程序结束

完整改善代码:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class MyThreadPoolExecutor {

    // 用 List 数据结构来存储线程
    private List<Thread> threadList = new ArrayList<>();

    // 是一个用来保存任务的队列
    private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);

    // 线程是否关闭的标志
    private volatile boolean isShutdown = false;

    // 通过 n 指定创建多少个线程
    public MyThreadPoolExecutor (int n) {
        for (int i = 0; i < n; i++) {
            Thread t = new Thread(() -> {
                // 线程要做的事情是 把任务队列中的任务不停的取出来,并且进行执行
                while (!isShutdown) {
                    try {
                        // 此处的 take 是带有阻塞功能的
                        // 如果队列为空,此处的 take 会阻塞
                        Runnable runnable = queue.take();
                        // 取出一个任务执行一个任务
                        runnable.run();
                    } catch (InterruptedException e){
                        // 当捕捉到 异常的时候,恢复线程的中断状态并退出循环
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
            t.start();
            threadList.add(t);
        }
    }

    // 将新任务添加到任务队列里面!
    public void submit(Runnable runnable) throws InterruptedException {
        if (isShutdown) {
            throw new IllegalStateException("线程池已经关闭,无法提交新的任务");
        }
        queue.put(runnable);
    }

    public void shutdown() {
        isShutdown = true;
        for (Thread t : threadList) {
            t.interrupt();
        }
    }
}
public class ThreadDemo43 {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(4);
        for (int i = 0; i < 1000; i++) {
            int n = i;
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("执行任务 " + n + "当前线程为:+ " + Thread.currentThread().getName());
                }
            });
        }
        Thread.sleep(5000);
        executor.shutdown();
    }
}

相关文章:

  • Rust 之四 运算符、标量、元组、数组、字符串、结构体、枚举
  • springboot Filter实现请求响应全链路拦截!完整日志监控方案​​
  • DeepSeek底层揭秘——《推理时Scaling方法》技术对比浅析
  • AI日报 - 2025年4月9日
  • 信息系统项目管理师-第十三章-项目资源管理
  • 2024 Jiangsu Collegiate Programming Contest H
  • 漫步·简单二进制
  • 基于STM32_HAL库的电动车报警器项目
  • 随机数据下的最短路问题(Dijstra优先队列)
  • golang通过飞书邮件服务API发送邮件功能详解
  • echart实现动态折线图(vue3+ts)
  • react的redux总结
  • telophoto源码查看记录
  • Nextjs15 实战 - React Notes CURD 实现
  • Dockerfile中CMD命令未生效
  • MyBatis的第四天学习笔记下
  • 动态规划算法深度解析:0-1背包问题(含完整流程)
  • 【Mysql】主从复制和读写分离
  • linux 处理2个文件的差集
  • 运动规划实战案例 | 基于四叉树分解的路径规划(附ROS C++/Python仿真)
  • 马上评|“衣服越来越难买”,对市场是一个提醒
  • 受美关税影响,本田预计新财年净利下降七成,并推迟加拿大建厂计划
  • 外交部亚洲司司长刘劲松会见印度驻华大使罗国栋
  • 科普|揭秘女性压力性尿失禁的真相
  • 牛市早报|中美日内瓦经贸会谈联合声明公布
  • 欧元区财长会讨论国际形势及应对美国关税政策