当前位置: 首页 > news >正文

并发工具【上】——线程池及其操作

6.1 线程池

对于一个任务,我们可以创建一个线程来处理,进而提高效率,但是如果每次一个新任务到来就创建一个新线程,频繁创建/销毁线程会带来的性能开销,因此对于这样的模式,不需要每次有新任务来就要创建新的线程,而可以充分发挥和利用已有线程的潜力,因此可以使用线程池。这也是享元模式的思想。

JDK的线程池:一般使用ThreadPoolExecutor。有五种状态:

ThreadPoolExecutor 使用 int 高 3 位表示线程池状态,低 29 位表示线程数。

状态名高3位接收新任务处理阻塞队列任务说明
RUNNING111YY可接收新任务,处理阻塞队列任务
SHUTDOWN000NY不再接收新任务,但会处理队列剩余任务,因此较为温和
STOP001NN会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING010--任务全部执行完,活动线程为0,即将进入终结
TERMINATED011--终结状态

数字上:TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

  • 状态信息和线程数存储在一个原子变量 ctl 中,高3位是状态,低29位是线程数。这样可以用一次CAS原子操作同时设置状态和线程数。
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)));// rs 为高3位线程池状态, wc为低29代表线程个数,ctl是合并值
private static int ctlOf(int rs, int wc) { return rs | wc; }
6.1.1 线程池的构造方法:
public ThreadPoolExecutor(int corePoolSize,                  // 核心线程数int maximumPoolSize,               // 最大线程数long keepAliveTime,                // 空闲线程存活时间TimeUnit unit,                     // 存活时间单位BlockingQueue<Runnable> workQueue, // 任务队列ThreadFactory threadFactory,       // 线程工厂RejectedExecutionHandler handler   // 拒绝策略
)

参数含义如下:

参数含义
corePoolSize核心线程数,线程池始终保有的线程数,即使这些线程是空闲也不会销毁
maximumPoolSize最大线程数,线程池能容纳的最大线程数(包括核心线程)
keepAliveTime非核心线程的最大空闲时长,超时后会被回收;且当allowCoreThreadTimeOut(true)时,核心线程也会超时被回收
unitkeepAliveTime的时间单位,如TimeUnit.SECONDS
workQueue任务队列(如LinkedBlockingQueueArrayBlockingQueue等),用于保存等待执行的任务
threadFactory线程工厂,用来定制创建新线程(可设定线程名字、优先级等)
handler拒绝策略。任务队列满且线程池达到最大线程数时,如何处理新任务。常见值有AbortPolicy、DiscardPolicy等

工作流程如下:

  1. 线程数 < corePoolSize:创建新线程处理任务(即便有空闲线程也不会复用,且创建了的核心线程不会消失)
  2. 线程数 >= corePoolSize,队列未满:任务放入队列等待
  3. 线程数 >= corePoolSize,队列已满,且线程数 < maximumPoolSize:创建救急线程处理任务,救急线程有存活时间
  4. 线程数达到maximumPoolSize且队列已满:走拒绝策略

拒绝策略大致如下:

  • AbortPolicy (默认):直接抛出异常。用于强一致、不可丢任务的主流程(建议加监控/报警)。
  • DiscardPolicy :直接丢弃本次任务。适合不重要的辅助任务,如日志打点、刷缓存。
  • DiscardOldestPolicy :丢弃队头的任务。适合高频率,关注最新数据的场景,如监控采样、用户操作实时日志等。
  • CallerRunsPolicy :任务由线程调用者自己执行。适合不希望丢任务且能容忍请求变慢,业务希望自动“降流”,“让系统自我保护”。
6.1.2 线程池工厂方法:

指通过一系列静态方法,直接帮你生成常用线程池实例 ,而无需关心底层复杂参数。一般是java.util.concurrent.Executors中的静态方法。

  1. Executors.newFixedThreadPool(int n):

​ 固定线程数线程池 有n个核心线程,队列为无界LinkedBlockingQueue。

  1. Executors.newCachedThreadPool() :

    可缓存线程池 没有核心线程,最大线程数Integer.MAX_VALUE,空闲60s自动回收。

  2. Executors.newSingleThreadExecutor() :

    单线程线程池,只有一个线程,串行执行任务,队列是无界的linkedBlockingQueue。和直接使用单线程的区别是:如果是单线程执行失败会直接结束,但是使用线程池则任务失败还会创建一个新的线程,保证后续工作。和调用newFixedThreadPool(1)的区别是newSingleThreadExecutor()返回的是被包装后的 FinalizableDelegatedExecutorService,仅暴露ExecutorService通用方法,不能使用 ThreadPoolExecutor 的特殊方法(如 getPoolSize/setCorePoolSize 等)。也就是说,newFixedThreadPool还可以调用setCorePoolSize来进行大小转换,不一定只保持核心线程和总线程数只为1的情况。

6.1.3 线程池方法

先说一下callable和future:

callable和runnable是一对,callable可以有返回值,而runnable则单纯是执行任务。

Callable<Integer> task = () -> {// 复杂计算return 42;
};Runnable task = () -> {// 复杂计算int r = 100 + 20;// 不能将 r 直接传递回主线程
};

future则是往往和submit一起使用,submit是提交任务,而不管如何使用submit,都会有返回值,返回值为一个future对象,future代表的是这次提交的任务,可以用future对象的方法来获取任务执行的状态,例如正在执行或者执行结束:

ExecutorService pool = Executors.newFixedThreadPool(2);Future<Integer> future = pool.submit(() -> {Thread.sleep(1000);return 99;
});if (!future.isDone()) {System.out.println("任务还没完成");
}Integer result = future.get(); // 阻塞直到任务完成,拿结果
System.out.println("任务完成,结果:" + result);

下面是线程池常用的方法:

execute(Runnable command):提交一个不带返回值的任务给线程池执行。

threadPool.execute(() -> System.out.println("Hello!"));

submit(Callable task):提交一个带返回值的任务,返回一个Future对象。这里结合上面的runnable和callable来看:

ExecutorService pool = Executors.newFixedThreadPool(2);// 1. 提交一个Runnable,没有返回值
Future<?> f1 = pool.submit(() -> System.out.println("abc"));
Object r1 = f1.get();          // r1为null(无返回值)// 2. 提交一个Runnable,指定特殊返回值
Future<String> f2 = pool.submit(() -> System.out.println("abc"), "done");
String r2 = f2.get();          // r2为"done"// 3. 提交一个Callable,直接返回结果
Future<Integer> f3 = pool.submit(() -> 123);
int r3 = f3.get();             // r3为123

这里要说一下,execute和submit没有先后关系,不是要先提交才能执行,不要被名称迷惑。两种只是带不带返回值的关系,可以各自独立执行,execute只是接受runnable类型的task执行,并且不带返回值;而submit则更多样,且一定返回一个future。通常来说,execute可以执行的是简单的不需要返回值的任务,而submit则是需要结果/状态的任务。

invokeAll:

有两种,带超时时间和不带超时时间的。作用是:批量提交一组 Callable 任务给线程池。会阻塞 直到所有任务都执行完毕 或抛出异常。返回一个和任务列表顺序对应的 Future<T> 列表(每个代表一个任务结果)。适用于需要全量结果批处理的情况。

List<Callable<Integer>> list = Arrays.asList(() -> { Thread.sleep(10000); return 1; },() -> { Thread.sleep(10000); return 2; },() -> { Thread.sleep(10000); return 3; }
);ExecutorService pool = Executors.newFixedThreadPool(3);long start = System.currentTimeMillis();
List<Future<Integer>> results = pool.invokeAll(list);  // 阻塞~10s
long dur = System.currentTimeMillis() - start;System.out.println("主线程阻塞时长: " + dur/1000 + "秒"); // 输出大概是10s

invokeAny:

批量提交一组 Callable 任务给线程池。只要有任意一个任务成功返回结果(未抛异常) ,就立刻阻塞返回其结果,其它任务被取消。适用于多个接口,只要有一个接口能出结果就使用的场景。也因此返回值是泛型T,而不是future。

List<Callable<Integer>> jobs = Arrays.asList(() -> { Thread.sleep(200); return 1; },    // 慢() -> { Thread.sleep(100); return 2; },    // 快() -> { Thread.sleep(300); return 3; }     // 慢
);ExecutorService pool = Executors.newFixedThreadPool(3);
Integer res = pool.invokeAny(jobs); // 得到最快完成的那个(此例为2)
System.out.println(res);

shutdown&shutdownNow:

作用是较为温和的关闭线程池,并且将状态改为shutdown。温和是指:不会接受新任务,且会等待旧任务/已经提交的任务都执行结束。这个可以和shutdownNow进行对比,后者是立即关闭线程池,未执行的任务会被移除,正在执行的任务会被中断。

例如如下的例子:如果使用shutdown则可以都执行,因为for循环的部分是submit全部任务,submit后才会执行shutdown,没有被执行的任务会存放在任务队列中。而shutdownNow则会触发catch中的内容,因为是强制终止。

import java.util.concurrent.*;
import java.util.List;
public class Main {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(2);// 提交五个任务,每个任务都会休眠1秒for (int i = 0; i < 5; i++) {final int taskId = i;pool.submit(() -> {System.out.println("Task " + taskId + " started");try { Thread.sleep(1000); } catch (InterruptedException e) {System.out.println("Task " + taskId + " interrupted");}System.out.println("Task " + taskId + " finished");});}// 使用shutdown(),等待现有任务完成//pool.shutdown();// 或使用shutdownNow(),尝试立即终止List<Runnable> notStarted = pool.shutdownNow();System.out.println("未启动的任务数: " + notStarted.size());// 关闭后继续提交任务将抛RejectedExecutionException// pool.submit(() -> System.out.println("new task"));// 等待线程池完全终止pool.awaitTermination(5, TimeUnit.SECONDS);System.out.println("线程池已关闭");}
}

线程池的使用原则:

  1. 不同任务创建不同的线程池来进行,避免全部的任务都给一个线程池来操作。

  2. 根据cpu密集型和io密集型来决定线程池的线程数量。如果线程数量过少则不能充分利用系统资源会导致饥饿,过多则浪费资源。

    • cpu密集型:是主要消耗CPU运算资源的任务,任务大部分时间都在跑CPU指令,很少主动等待外部(磁盘/网络)IO,例如大规模数学运算、解压缩算法。线程数则通常是cpu数+1。+1是保证当线程由于页缺失的问题或者其他原因导致暂停,额外的线程可以接替工作。

    • io密集型:主要时间消耗在IO操作,对于开发可能更加常见,例如RPC调用、数据库操作,就会大量设计IO操作。因此CPU会闲置,可以使用如下公式:线程数=核数 * 期望CPU利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间。例如,4核的cpu计算时间是50%,等待时间是50%,希望cpu完全得到利用,因此线程数=4 * 1 * 1/0.5 = 8,也就是要8个线程。或者是:线程数 = CPU核心数 × 若干倍(2-10倍)。

相关文章:

  • Elasticsearch的插件(Plugin)系统介绍
  • 多态(全)
  • 企业级实战之Iptables防火墙案例分析
  • 11. MySQL事务管理(上)
  • 极客大挑战 2019 EasySQL 1(万能账号密码,SQL注入,HackBar)
  • 3.spring基础入门(三)
  • 打卡day44
  • typescript中的type如何使用
  • 信息学奥赛一本通 1570:【例 2】能量项链 | 1843:【06NOIP提高组】能量项链 | 洛谷 P1063 [NOIP 2006 提高组] 能量项链
  • MySQL 索引:为使用 B+树作为索引数据结构,而非 B树、哈希表或二叉树?
  • React-native实战系列
  • 论文速读《VideoMimic:通过视觉模仿实现人形机器人感知控制》
  • 玩转Docker | 使用Docker部署Qwerty Learner英语单词学习网站
  • 第八部分:第三节 - 事件处理:响应顾客的操作
  • AXPM11584:颠覆传统,发现新可能
  • 省赛中药检测模型调优
  • 电路图识图基础知识-降压启动(十五)
  • Java面试题及答案整理( 2025年最新版,持续更新...)
  • 理解继承与组合的本质:Qt 项目中的设计选择指南
  • 《射频识别(RFID)原理与应用》期末复习 RFID第一章 射频识别技术概论(知识点总结+习题巩固)
  • 没有做robots对网站有影响/游戏代理加盟
  • 比较好看的网站设计/目前最好的引流推广方法
  • 哪个网站做首饰批发好/seo综合查询 站长工具
  • 西安做网站哪家好/营销广告文案
  • 南京大型网站建设/百度网盘人工客服
  • js弹出网站/电脑培训学校排名