多线程代码案例(线程池)- 4
目录
引入
标准库中的线程池 -- ThreadPoolExecutor
研究一下这个方法的几个参数
1. int corePoolSize
2. int maximumPoolSize
3. long keepAliveTime
4. TimeUnit unit
6. ThreadFactory threadFactory
7. RejectedExecutionHandler handler
四种拒绝策略
Executors 工厂类
如何确定线程池的数量???
实现一个简单的线程池
完
引入
池,是一个非常重要的概念,我们有常量池,数据库连接池,线程池,进程池,内存池....
池的作用:
1. 提前把要用的对象准备好。
2. 把用完的对象也不立即释放,先留着以备下次使用 ==》 提高效率!!!
举个栗子: 我是个美女,谈了一个男朋友。如果我有一天对这个男人厌倦了,如何才能提高更换男朋友的效率呢? ==》 备胎池...
最开始,进程能够解决并发编程的问题,之后因为频繁的创建和销毁进程,成本太高了,所以我们引入了轻量级的进程 --> 线程,但如果创建和销毁线程的频率进一步提高,此时线程的创建和销毁的开销,也不能够无视了。(抛开剂量谈毒性,都是耍流氓)我们就需要想办法来优化此处的线程的创建销毁的效率。
1. 引入轻量级 线程 --> 纤程 / 协程(Java 21 里引入 “虚拟线程” 就是这个东西)。协程本质,是程序员在用户态的代码中进行调度,不是靠内核的调度器调度 ==》节省了很多调度上的开销。(我们在此处不做深入研究...)
2. 线程池 把要使用的线程提前创建好了,用完了也不直接释放而是以备下次使用。这样就节省了创建 / 销毁 线程的开销。在这个过程中,并没有真的频繁创建销毁线程,只是从线程池里面,取线程使用,用完了就还给线程池。(在这个过程中,会占用比较多的空间,这个代价是无法避免的,可以接收)
那为什么,从线程池里面取线程,就比从系统申请更加高效呢???
举个栗子来说明:还是银行的例子,柜台里面,相当于内核态,大堂相当于用户态。
当我们要办理业务,需要一个身份证复印件的时候,我们并没有带,这时候,柜员就说我们有两个途径取解决:
1. 自己拿着身份证,去自助复印机上复印即可(纯用户态代码)
2. 把身份证交给柜员,他拿着身份证帮你去复印。(柜员拿到我们的身份证之后,就消失在我们的视野中了,此时我们无法知道他要花费多长时间,也不知道他都要做那些事情,我们唯一能做的,就是等,等他回来 ~~)
基本的结论:
如果一个工作,滑稽老铁自己就能完成,就更加可控,更加高效。 从线程池里面取线程,就是纯用户代码,就是可控的。
如果一个工作,滑稽老铁要拜托银行的柜员来完成,就不可控,更低效。通过系统申请创建线程,就是需要内核来完成的,不太可控。
标准库中的线程池 -- ThreadPoolExecutor
我们可以在 Java 官方文档中,找到 java.util.concurrent 包,在下面的 Classes 中就可以找到 ThreadPoolExecutor,往下翻可以找到构造方法,有 4 种
我们只需要关注最后一个即可(最后一个的参数是最全的)
研究一下这个方法的几个参数
1. int corePoolSize
表示的是 核心线程数(一个线程池里面,最少得有多少个线程)
2. int maximumPoolSize
表示的是 最大线程数(一个线程池里面,最多最多能有多少个线程)
注意: 标准库提供的线程池,持有的线程个数,并非是一成不变的,会根据当前任务量,自适应线程的个数。(任务非常多,就多搞几个线程;任务比较少,就少搞几个线程)
举个栗子:假设一个公司,里面有 10 个员工(正式签劳动合同的员工)。当公司的业务非常繁忙的时候,10 个人干不过来了,就需要招聘,一个成本比较低的做法是,招聘实习生(非正式员工),比如可以再招聘 5 个实习生(廉价劳动力)
过了一段时间,公司没那么忙了,大家都闲下来开始摸鱼了,10 个正式员工,是不能被随便裁员的(劳动仲裁~~~)但是这 5 个实习生,是可以随便裁的。把这 5 个实习生裁掉,使当前这 10 个正式员工也没有那么空闲了,整体的成本就降低了。
如果过了一段时间,公司业务又多了起来,10 个人又忙不过来了,此时重新再招几个实习生就好啦~~~
通过实习生,来应对突发的峰值!!!
10 个正式员工,就是核心线程数(参数叫 核心线程数, 而不是 最小线程数)
10 + 5 正式员工 + 实习生 就是最大线程数了
3. long keepAliveTime
表示的是 保持存活时间
4. TimeUnit unit
表示的是 时间单位(s,min,ms,hour)
再用我们的栗子解释:keepAliveTime 就是实习生线程,允许最大的空闲摸鱼时间。即,如果发现某个实习生正在摸鱼(这个线程空闲了),此时要立即马上把这个实习生开除掉吗???不应该!!!可能发生,这边一空闲马上就开除,结果下一时刻,任务又突然多起来了!!!
此处的 keepAliveTime 意思就是实习生线程,空闲时间超过了这个时间阈值,就会被销毁掉。注意:实习生线程被销毁之后,就没有了,在未来的某一天,线程还会重新招聘实习生,但不是之前的那个了
5. BolckingQueue<Runnable> workQueue
这个和定时器是类似的阻塞队列,用来存放等待执行的任务。当核心线程都在忙碌的时候,新任务会被放入这个队列中排队等待。用 Runnable 来作为描述任务的主体。 ==》 也可以设置 PriorityBlockingQueue 带有优先级
6. ThreadFactory threadFactory
这个表示线程工厂。
工厂模式,也是一种常见的设计模式。通过专门的 “工厂类 / 工厂对象”来创建指定的对象~~
工厂模式,本质上是给 Java 语法填坑的
举个栗子:
我们会发现,上面的代码,无法通过编译~~
在 c++ / Java 中要想提供多个版本的构造方法,就需要让这多个方法能够构成重载
重载的要求:
1. 方法名字相同(构造方法的名字本身都相同)
2. 形参的 个数 / 类型 不同!
上面的代码不符合第二个要求,所以无法通过编译。为了解决上述问题,就引入了 “工厂模式”,使用普通的方法来创建对象,把构造方法封装了一层。
如果把工厂方法放到一个其他的类里面,这个其他的类就叫做“工厂类”。
总的来说,通过静态方法封装 new 操作,无需实例化对象,在方法内部设定不同的属性完成对象初始化,构造对象的过程,就是工厂模式。
回过头来说我们的参数:ThreadFactory threadFactory 通过这个工厂类,来创建线程对象(Thread 对象)在这个类里面提供了方法(也不一定非得是静态的)让方法封装 new Thread 的操作,并且给 Thread 设置一些属性,就构成了 ThreadFactory 线程工厂!
7. RejectedExecutionHandler handler
上述参数中,这个是最重要的!!!
这个表示的是拒绝策略。在线程池中,有一个阻塞队列,能够容纳的元素是有上限的。当任务队列已经满了的时候,如果继续往队列里面添加元素,那么线程池会怎么办呢??? ==》 就是这个拒绝策略参数了!
在官方文档中,构造方法的上面就是拒绝策略
四种拒绝策略
1. 继续添加任务,直接抛出异常。此时就是“撂挑子”的状态,新任务 旧任务 都不执行了!!!
2. 新的任务,由添加任务的线程负责执行,此时新的任务会执行,不过并不是线程池执行,而是调用者执行。同事让我帮忙,我自己都忙的焦头烂额,只能忙自己的,同事的忙还得他自己解决。
3. 丢弃最老的任务。将最老的任务舍弃一个,然后执行新的任务。
4. 丢弃最新的任务。直接抛弃新的任务,新的任务就无了,不执行了,调用的线程不会进程, 线程池也不会执行。
Executors 工厂类
ThreadPoolExecutor 本身使用起来还是比较复杂, 因此 Java 标准库还提供了另一个版本,把 ThreadPoolExecutor 给封装了一下~~
Executors 工厂类,通过这个类来创建出不同的线程池对象(在内部把 ThreadPoolExecutor 创建好了并且设置了不同的参数)
我们可以创建一个线程池用如下的方式
可以看到 Executors 这个工厂类中有许多不同的线程池
newSingleThreadExecutor() 是一个定时器类似物,也能延时执行任务
newScheduleThreadPool 是只包含单个线程的线程池
newCachedThreadPool 是线程数目能够动态扩容是线程池
newFixedThreadPool() 是线程数目固定的线程池
示例代码如下:
打印结果如下:
ThreadPoolExecutor 也是通过 submit 添加任务的,只是构造方法不同
什么时候使用 Executors 什么时候使用 ThreadPoolExecutor 呢???
当我们只是简单使用一个线程池的时候,就可以使用 Executors
当我们需要一个高度定制化的线程池的时候,就可以使用 ThreadPoolExecutor
网上流传的 阿里巴巴Java开发编程规范中,写了不建议使用 Executors,一定要使用 ThreadPoolExecutor,用 ThreadPoolExecutor 意味着一切尽在掌握之中,不会出现一些不可控的因素~~ 我们可以参考,但还是要以具体的公司编程规范要求为准啦...
如何确定线程池的数量???
创建线程池的时候,很多时候,需要设定线程池的数量。这个数量应该怎么设置比较合适???我们上面只是随意的设置了一个 4 ,到底怎么样是合适的呢?
网上有很多说法,假如 CPU 的逻辑核心数是 N ,网上的说法:线程数量应该是 N,N + 1,1.5N,2N... ==》 都是错误的。
不同的程序,能够设定的线程的数量是不同的,必须要具体问题具体分析。
要区分,一个线程是 CPU 密集型的任务,还是 IO 密集型的任务。
CPU 密集型的任务:这个线程大部分的时间,都在要 CPU 上运行,进行计算。 比如,在线程 run 里面计算 1 + 2 + ... + 10w 这种就是 CPU 密集型
IO 密集型的任务:这个线程大部分的时间都在等待 IO,不需要去 CPU 上运行,比如,线程 run 里,搞一个 scanner,读取用户的输入,就是 IO 密集型
如果一个进程中,所有的线程都是 CPU 密集型的,每个线程所有的工作都是在 CPU 上执行的(假定的一种极端情况~~~)此时,线程的数目就不应该超过 N(CPU 逻辑核心数)
如果一个进程中,所有的线程都是 IO 密集型的,每个线程的大部分工作都是在等待 IO,CPU 消耗非常少,此时线程的数目就可以很多很多,远远超过 N(CPU 逻辑核心数)
上面的两个场景,是两种非常极端的情况,实际上,一个进程中的线程,一部分是 IO,一份是 CPU,这里的比例是不好确定的。
综上,由于程序的复杂性,很难直接对线程的数量进行估计。更合适的做法应该是:通过实验 / 测试的方式,找到合适的线程数目。==》 尝试给线程池,设定不同的线程数目,分别进行性能测试,衡量每种线程数目下,总的时间开销 和 系统资源占用的开销,找到这两者的合适值。
实现一个简单的线程池
我们这里直接写一个固定数目的线程池,暂时不考虑线程数目的增多和减少。
1. 提供构造方法,指定创建多少个线程。
2. 在构造方法中,把这些线程都创建好
3. 有一个阻塞队列,能够持有要执行的方法
4. 提供 subbmit 方法,可以添加新的任务。
代码如下:
成员变量有 threadList,用来存储管理线程;queue 用来保存任务的队列,这里因为 ArrayBlockingQueue 是线程安全的,所以在下面的构造方法并没有上锁。在线程中,利用 n 来创建指定个线程。在 while(true) 循环中,线程持续运行,不断从任务队列中 take 任务并 run 执行。
submit 方法:将新的任务添加到队列中
解释:
测试代码:
但是在测试代码中,发现有一个小小的红色波浪线,在 i 下面,为啥会编译报错呢?
==》 变量捕获!!!
run 回调函数访问当前外部作用域的变量就是变量捕获,我们之前讲过,变量捕获的值,要不然是 final 修饰的常量值,要不然是一个“事实 final” 变量,但现在 i 是一直变化的,怎么办呢?
在创建一个 n,把 i 赋值给 n,此处的 n 就是一个“事实 final” 变量,每次循环,都是一个新的 n,n 本身没有改变,就可以被捕获!!!
运行起来,注意,这些多个线程之间的执行顺序的不确定的!!!某个线程获取到了某个任务,但是并非立即执行,这个过程中可能其他线程就到前面执行了。(此处的这些线程,彼此之间都是等价的...)
完整代码如下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class MyThreadPoolExecutor {
// 用 List 数据结构来存储线程
private List<Thread> threadList = new ArrayList<>();
// 是一个用来保存任务的队列
private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
// 通过 n 指定创建多少个线程
public MyThreadPoolExecutor (int n) {
for (int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
// 线程要做的事情是 把任务队列中的任务不停的取出来,并且进行执行
while (true) {
try {
// 此处的 take 是带有阻塞功能的
// 如果队列为空,此处的 take 会阻塞
Runnable runnable = queue.take();
// 取出一个任务执行一个任务
runnable.run();
} catch (InterruptedException e){
e.printStackTrace();
}
}
});
t.start();
threadList.add(t);
}
}
// 将新任务添加到任务队列里面!
public void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
}
}
public class ThreadDemo43 {
public static void main(String[] args) throws InterruptedException {
MyThreadPoolExecutor executor = new MyThreadPoolExecutor(4);
for (int i = 0; i < 1000; i++) {
int n = i;
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("执行任务 " + n + "当前线程为:+ " + Thread.currentThread().getName());
}
});
}
}
}
代码细节完善:
1. 我们还可以补充一个关闭线程的方法 shutdown
2. 补充 shutdown 的成员变量 isShutdown 并且优化捕捉到异常时候的操作
3. submit 前先进行一个判断:
4. 打印完毕之后,大约 5s 程序结束
完整改善代码:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class MyThreadPoolExecutor {
// 用 List 数据结构来存储线程
private List<Thread> threadList = new ArrayList<>();
// 是一个用来保存任务的队列
private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
// 线程是否关闭的标志
private volatile boolean isShutdown = false;
// 通过 n 指定创建多少个线程
public MyThreadPoolExecutor (int n) {
for (int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
// 线程要做的事情是 把任务队列中的任务不停的取出来,并且进行执行
while (!isShutdown) {
try {
// 此处的 take 是带有阻塞功能的
// 如果队列为空,此处的 take 会阻塞
Runnable runnable = queue.take();
// 取出一个任务执行一个任务
runnable.run();
} catch (InterruptedException e){
// 当捕捉到 异常的时候,恢复线程的中断状态并退出循环
Thread.currentThread().interrupt();
break;
}
}
});
t.start();
threadList.add(t);
}
}
// 将新任务添加到任务队列里面!
public void submit(Runnable runnable) throws InterruptedException {
if (isShutdown) {
throw new IllegalStateException("线程池已经关闭,无法提交新的任务");
}
queue.put(runnable);
}
public void shutdown() {
isShutdown = true;
for (Thread t : threadList) {
t.interrupt();
}
}
}
public class ThreadDemo43 {
public static void main(String[] args) throws InterruptedException {
MyThreadPoolExecutor executor = new MyThreadPoolExecutor(4);
for (int i = 0; i < 1000; i++) {
int n = i;
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("执行任务 " + n + "当前线程为:+ " + Thread.currentThread().getName());
}
});
}
Thread.sleep(5000);
executor.shutdown();
}
}