并发工具【上】——线程池及其操作
6.1 线程池
对于一个任务,我们可以创建一个线程来处理,进而提高效率,但是如果每次一个新任务到来就创建一个新线程,频繁创建/销毁线程会带来的性能开销,因此对于这样的模式,不需要每次有新任务来就要创建新的线程,而可以充分发挥和利用已有线程的潜力,因此可以使用线程池。这也是享元模式的思想。
JDK的线程池:一般使用ThreadPoolExecutor。有五种状态:
ThreadPoolExecutor 使用 int 高 3 位表示线程池状态,低 29 位表示线程数。
状态名 | 高3位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 可接收新任务,处理阻塞队列任务 |
SHUTDOWN | 000 | N | Y | 不再接收新任务,但会处理队列剩余任务,因此较为温和 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全部执行完,活动线程为0,即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
数字上:TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
- 状态信息和线程数存储在一个原子变量 ctl 中,高3位是状态,低29位是线程数。这样可以用一次CAS原子操作同时设置状态和线程数。
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)));// rs 为高3位线程池状态, wc为低29代表线程个数,ctl是合并值
private static int ctlOf(int rs, int wc) { return rs | wc; }
6.1.1 线程池的构造方法:
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 空闲线程存活时间TimeUnit unit, // 存活时间单位BlockingQueue<Runnable> workQueue, // 任务队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler // 拒绝策略
)
参数含义如下:
参数 | 含义 |
---|---|
corePoolSize | 核心线程数,线程池始终保有的线程数,即使这些线程是空闲也不会销毁 |
maximumPoolSize | 最大线程数,线程池能容纳的最大线程数(包括核心线程) |
keepAliveTime | 非核心线程的最大空闲时长,超时后会被回收;且当allowCoreThreadTimeOut(true) 时,核心线程也会超时被回收 |
unit | keepAliveTime的时间单位,如TimeUnit.SECONDS |
workQueue | 任务队列(如LinkedBlockingQueue 、ArrayBlockingQueue 等),用于保存等待执行的任务 |
threadFactory | 线程工厂,用来定制创建新线程(可设定线程名字、优先级等) |
handler | 拒绝策略。任务队列满且线程池达到最大线程数时,如何处理新任务。常见值有AbortPolicy、DiscardPolicy等 |
工作流程如下:
- 线程数 < corePoolSize:创建新线程处理任务(即便有空闲线程也不会复用,且创建了的核心线程不会消失)
- 线程数 >= corePoolSize,队列未满:任务放入队列等待
- 线程数 >= corePoolSize,队列已满,且线程数 < maximumPoolSize:创建救急线程处理任务,救急线程有存活时间
- 线程数达到maximumPoolSize且队列已满:走拒绝策略
拒绝策略大致如下:
- AbortPolicy (默认):直接抛出异常。用于强一致、不可丢任务的主流程(建议加监控/报警)。
- DiscardPolicy :直接丢弃本次任务。适合不重要的辅助任务,如日志打点、刷缓存。
- DiscardOldestPolicy :丢弃队头的任务。适合高频率,关注最新数据的场景,如监控采样、用户操作实时日志等。
- CallerRunsPolicy :任务由线程调用者自己执行。适合不希望丢任务且能容忍请求变慢,业务希望自动“降流”,“让系统自我保护”。
6.1.2 线程池工厂方法:
指通过一系列静态方法,直接帮你生成常用线程池实例 ,而无需关心底层复杂参数。一般是java.util.concurrent.Executors中的静态方法。
- Executors.newFixedThreadPool(int n):
固定线程数线程池 有n个核心线程,队列为无界LinkedBlockingQueue。
-
Executors.newCachedThreadPool() :
可缓存线程池 没有核心线程,最大线程数Integer.MAX_VALUE,空闲60s自动回收。
-
Executors.newSingleThreadExecutor() :
单线程线程池,只有一个线程,串行执行任务,队列是无界的linkedBlockingQueue。和直接使用单线程的区别是:如果是单线程执行失败会直接结束,但是使用线程池则任务失败还会创建一个新的线程,保证后续工作。和调用newFixedThreadPool(1)的区别是newSingleThreadExecutor()返回的是被包装后的 FinalizableDelegatedExecutorService,仅暴露
ExecutorService
通用方法,不能使用ThreadPoolExecutor
的特殊方法(如 getPoolSize/setCorePoolSize 等)。也就是说,newFixedThreadPool还可以调用setCorePoolSize来进行大小转换,不一定只保持核心线程和总线程数只为1的情况。
6.1.3 线程池方法
先说一下callable和future:
callable和runnable是一对,callable可以有返回值,而runnable则单纯是执行任务。
Callable<Integer> task = () -> {// 复杂计算return 42;
};Runnable task = () -> {// 复杂计算int r = 100 + 20;// 不能将 r 直接传递回主线程
};
future则是往往和submit一起使用,submit是提交任务,而不管如何使用submit,都会有返回值,返回值为一个future对象,future代表的是这次提交的任务,可以用future对象的方法来获取任务执行的状态,例如正在执行或者执行结束:
ExecutorService pool = Executors.newFixedThreadPool(2);Future<Integer> future = pool.submit(() -> {Thread.sleep(1000);return 99;
});if (!future.isDone()) {System.out.println("任务还没完成");
}Integer result = future.get(); // 阻塞直到任务完成,拿结果
System.out.println("任务完成,结果:" + result);
下面是线程池常用的方法:
execute(Runnable command):提交一个不带返回值的任务给线程池执行。
threadPool.execute(() -> System.out.println("Hello!"));
submit(Callable task):提交一个带返回值的任务,返回一个Future对象。这里结合上面的runnable和callable来看:
ExecutorService pool = Executors.newFixedThreadPool(2);// 1. 提交一个Runnable,没有返回值
Future<?> f1 = pool.submit(() -> System.out.println("abc"));
Object r1 = f1.get(); // r1为null(无返回值)// 2. 提交一个Runnable,指定特殊返回值
Future<String> f2 = pool.submit(() -> System.out.println("abc"), "done");
String r2 = f2.get(); // r2为"done"// 3. 提交一个Callable,直接返回结果
Future<Integer> f3 = pool.submit(() -> 123);
int r3 = f3.get(); // r3为123
这里要说一下,execute和submit没有先后关系,不是要先提交才能执行,不要被名称迷惑。两种只是带不带返回值的关系,可以各自独立执行,execute只是接受runnable类型的task执行,并且不带返回值;而submit则更多样,且一定返回一个future。通常来说,execute可以执行的是简单的不需要返回值的任务,而submit则是需要结果/状态的任务。
invokeAll:
有两种,带超时时间和不带超时时间的。作用是:批量提交一组 Callable
任务给线程池。会阻塞 直到所有任务都执行完毕 或抛出异常。返回一个和任务列表顺序对应的 Future<T>
列表(每个代表一个任务结果)。适用于需要全量结果批处理的情况。
List<Callable<Integer>> list = Arrays.asList(() -> { Thread.sleep(10000); return 1; },() -> { Thread.sleep(10000); return 2; },() -> { Thread.sleep(10000); return 3; }
);ExecutorService pool = Executors.newFixedThreadPool(3);long start = System.currentTimeMillis();
List<Future<Integer>> results = pool.invokeAll(list); // 阻塞~10s
long dur = System.currentTimeMillis() - start;System.out.println("主线程阻塞时长: " + dur/1000 + "秒"); // 输出大概是10s
invokeAny:
批量提交一组 Callable
任务给线程池。只要有任意一个任务成功返回结果(未抛异常) ,就立刻阻塞返回其结果,其它任务被取消。适用于多个接口,只要有一个接口能出结果就使用的场景。也因此返回值是泛型T,而不是future。
List<Callable<Integer>> jobs = Arrays.asList(() -> { Thread.sleep(200); return 1; }, // 慢() -> { Thread.sleep(100); return 2; }, // 快() -> { Thread.sleep(300); return 3; } // 慢
);ExecutorService pool = Executors.newFixedThreadPool(3);
Integer res = pool.invokeAny(jobs); // 得到最快完成的那个(此例为2)
System.out.println(res);
shutdown&shutdownNow:
作用是较为温和的关闭线程池,并且将状态改为shutdown。温和是指:不会接受新任务,且会等待旧任务/已经提交的任务都执行结束。这个可以和shutdownNow进行对比,后者是立即关闭线程池,未执行的任务会被移除,正在执行的任务会被中断。
例如如下的例子:如果使用shutdown则可以都执行,因为for循环的部分是submit全部任务,submit后才会执行shutdown,没有被执行的任务会存放在任务队列中。而shutdownNow则会触发catch中的内容,因为是强制终止。
import java.util.concurrent.*;
import java.util.List;
public class Main {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(2);// 提交五个任务,每个任务都会休眠1秒for (int i = 0; i < 5; i++) {final int taskId = i;pool.submit(() -> {System.out.println("Task " + taskId + " started");try { Thread.sleep(1000); } catch (InterruptedException e) {System.out.println("Task " + taskId + " interrupted");}System.out.println("Task " + taskId + " finished");});}// 使用shutdown(),等待现有任务完成//pool.shutdown();// 或使用shutdownNow(),尝试立即终止List<Runnable> notStarted = pool.shutdownNow();System.out.println("未启动的任务数: " + notStarted.size());// 关闭后继续提交任务将抛RejectedExecutionException// pool.submit(() -> System.out.println("new task"));// 等待线程池完全终止pool.awaitTermination(5, TimeUnit.SECONDS);System.out.println("线程池已关闭");}
}
线程池的使用原则:
-
不同任务创建不同的线程池来进行,避免全部的任务都给一个线程池来操作。
-
根据cpu密集型和io密集型来决定线程池的线程数量。如果线程数量过少则不能充分利用系统资源会导致饥饿,过多则浪费资源。
-
cpu密集型:是主要消耗CPU运算资源的任务,任务大部分时间都在跑CPU指令,很少主动等待外部(磁盘/网络)IO,例如大规模数学运算、解压缩算法。线程数则通常是cpu数+1。+1是保证当线程由于页缺失的问题或者其他原因导致暂停,额外的线程可以接替工作。
-
io密集型:主要时间消耗在IO操作,对于开发可能更加常见,例如RPC调用、数据库操作,就会大量设计IO操作。因此CPU会闲置,可以使用如下公式:线程数=核数 * 期望CPU利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间。例如,4核的cpu计算时间是50%,等待时间是50%,希望cpu完全得到利用,因此线程数=4 * 1 * 1/0.5 = 8,也就是要8个线程。或者是:线程数 = CPU核心数 × 若干倍(2-10倍)。
-