自定义线程池 3.1
自定义线程池 3.1
1. 简介
上次我们实现了自定义线程池的 3.0 版本,使用了策略模式,为线程池增加了无法执行任务时的拒绝策略,本文我们将对线程池进行如下的优化:
- 让线程池中线程的创建使用线程工厂。
- 写一个工具类用来创建不同类型的线程池。
2. 优化一
2.1 引入
之前测试的时候可以发现,线程池中创建的线程的名字一直是 thread-?
,对于多线程程序调试很不友好,建议给每个线程一个有意义的名字。比如线程池若是用来执行斐波那契数列中的值,则可以给线程取名 fib-?
。
这样一来,当程序运行在服务器中,查看日志,发现有一个线程抛出一个异常,就可以 快速定位线程的定义位置。除此之外,通过 htop
可以查看线程运行情况,如果线程有一个有意义的名字,那就可以 快速定位到想观察的线程。
接下来想一想怎么实现吧。如果只是给一个线程起一个名字,那么直接使用 public Thread(Runnable target, String name)
这个构造方法即可。但是如果想给一个线程池中的线程起名字,那么就得使用一些别的方法,你能想到吗?
揭晓答案:使用 ThreadFactory
类的 Thread newThread(Runnable r)
方法。让使用者传递一个线程工厂,然后在创建线程时使用即可。
2.2 线程工厂的其他用处
在 Java 中,对线程参数的设置比较少,除了设置名字外,还可以设置如下参数:
- 设置线程是为 守护线程:在工具类或后台任务中使用守护线程,防止主线程退出后仍有非守护线程运行。
- 设置线程的 优先级:为核心任务设置更高优先级,确保资源优先分配。
除了设置线程的基础参数,其实还有一个扩展点,就是 给线程写一个异常处理器。我们可以写一个如下的线程工厂(匿名内部类):
new ThreadFactory() {/*** 计数器,用来记录当前创建的是第几个线程,从 0 开始*/private int counter = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(() -> {try {r.run();} catch (Throwable e) {System.err.println(Thread.currentThread().getName() + "抛出异常:" + e);}}, "thread-pool-" + counter++);}
};
使用线程工厂还可以让使用者配置这些东西,而不仅仅只能设置线程的名称,如果你能想出线程工厂的其他作用,也可以在评论讨论。
2.3 实现上的变化
Worker
只有构造方法发生了变化,变化如下:
public Worker(Runnable initialTask, Set<Worker> threadPool) {this.initialTask = initialTask;this.actuallyRunningThread = new Thread(this);threadPool.add(this);
}
->
public Worker(Runnable initialTask, Set<Worker> threadPool, ThreadFactory threadFactory) {this.initialTask = initialTask;this.actuallyRunningThread = threadFactory.newThread(this);threadPool.add(this);
}
3. 优化二
3.0 版本的 ThreadPool3_1
的构造方法有 6 个参数,加上 3.1 版本新加的参数,一共有 7 个参数,显得十分臃肿。如果只是想简单使用一下线程池,或者在特定场景下使用特定参数的线程池,还得写全 7 个参数,很麻烦,所以我们可以写一个工具类 ThreadPoolUtil
,它用来创建特定场景下专用的线程池。
首先我们想一想,都有什么特定的场景:
- 执行 CPU 密集型任务。
- 执行 I/O 密集型任务。
- 执行 处理时间长 的任务。
- 执行 处理时间短 的任务。
- 负载稳定。
- 负载波动较大。
- 任务需要 串行执行。
接下来,我们讨论这些场景适合的线程池:
- 执行 CPU 密集型任务:适合线程数量固定的线程池,线程数量可以等于 CPU 核心数。
- 执行 I/O 密集型任务:适合线程数量不固定的线程池。
- 执行 处理时间长 的任务:适合线程数量固定的线程池,避免创建太多线程。
- 执行 处理时间短 的任务:适合线程数量不固定的线程池,可以创建很多线程,因为这些线程短时间内就退出了。
- 负载稳定:线程数量固定的线程池仿佛就是为这个场景设置的。
- 负载波动较大:适合线程数量不固定的线程池。
- 任务需要 串行执行:只能使用一个线程的线程池。
然后,我们将这些场景适合的线程池总结一下:
- 线程数量固定的线程池。
- 线程数量不固定的线程池。
- 只有一个线程的线程池。
那么,ThreadPoolUtil
提供的方法就明确了:
3.1 newFixedThreadPool
newFixedThreadPool()
:创建一个全是核心线程的线程池。因为线程数量固定,直接让线程永不退出,只创建核心线程就行了。
- 核心线程数量:由传入的固定线程数量决定。
- 最大线程数量:由传入的固定线程数量决定。
keepAliveTime
和timeUnit
:0
和TimeUnit.SECONDS
。- 任务队列:使用
LinkedBlockingQueue
,因为它是 “无界” 的,不确定使用者到底能在任务队列中囤积多少任务。
3.2 newCachedThreadPool
newCachedThreadPool()
:创建一个全是临时线程的线程池。因为线程数量不固定,有时多、有时少,直接创建临时线程,让它们在空闲一段时间后退出即可。
- 核心线程数量:
0
,因为不用创建核心线程。 - 最大线程数量:
Integer.MAX_VALUE
,因为不确定使用者到底能提交多少任务。 keepAliveTime
和timeUnit
:1
和TimeUnit.SECONDS
,因为希望临时线程的空闲时间尽量短一点,只要空闲 1s,就直接退出。- 任务队列:使用
SynchronousQueue
,因为它 不保存任务,每个插入操作都需要等待取出操作,反之亦然。一般不会使用任务队列,如果要使用,说明到极端场景了,让提交任务的线程等待有线程空闲。
3.3 newSingleThreadPool
newSingleThreadPool()
:创建只有一个线程的线程池。
- 核心线程数量:
1
。 - 最大线程数量:
1
。 keepAliveTime
和timeUnit
:0
和TimeUnit.SECONDS
。- 任务队列:使用
LinkedBlockingQueue
,因为它是 “无界” 的,不确定使用者到底能在任务队列中囤积多少任务。
3.4 多个构造方法
此外,为了方便线程池的创建,我认为有些参数可以调成默认的,核心线程数量、最大线程数量、keepAliveTime
、timeUnit
、任务队列 这 5 个参数是核心参数,大部分情况下不同线程池都会有不同的选择,使用者一般不关注 拒绝策略 和 线程工厂 这 2 个参数,所以我们可以为这两个参数设置默认值。
说到默认值的设置,还是有点学问的,我们应该避免设置使用者无感的默认参数,因为这样运行失败后很难想到哪里出问题了。在线程池中,如果我们将拒绝策略默认设置成 直接丢弃,那就会出现使用者不知道为什么有一个任务无法执行的情况。但是,并不是说这个策略毫无意义,如果使用者 主动选择 直接丢弃 的拒绝策略,代表他知道可能会造成这样的情况。
这里我们可以把拒绝策略选择成 抛出异常 的拒绝策略,线程工厂直接给线程设置一个默认名字 thread-pool-?
即可。
当我们选择好默认参数后,最好把 默认参数写到构造方法的注释(JavaDoc)上。
4. 实现 3.1 版本
4.1 ThreadPool3_1
public class ThreadPool3_1 {/*** 线程池中核心线程的最大数量*/private final int corePoolSize;/*** 线程池中线程的最大数量*/private final int maxPoolSize;/*** 临时线程阻塞的最长时间(单位:ns),超过这个时间还没有领取到任务就直接退出*/private final long keepAliveTime;/*** 任务队列*/private final BlockingQueue<Runnable> taskQueue;/*** 拒绝策略,用于在无法执行任务的时候拒绝任务*/private final RejectPolicy rejectPolicy;/*** 线程工厂*/private final ThreadFactory threadFactory;/*** 默认的线程工厂*/private static final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() {/*** 计数器,用来记录当前创建的是第几个线程,从 0 开始*/private int counter = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "thread-pool-" + counter++);}};/*** 构造一个线程池,默认参数如下:* <ul>* <li>拒绝策略默认为抛出异常的拒绝策略</li>* <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION);}/*** 构造一个线程池,默认参数如下:* <ul>* <li>拒绝策略默认为抛出异常的拒绝策略</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param threadFactory 线程工厂*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, ThreadFactory threadFactory) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION, threadFactory);}/*** 构造一个线程池,默认参数如下:* <ul>* <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, rejectPolicy, DEFAULT_THREAD_FACTORY);}/*** 构造一个线程池** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略* @param threadFactory 线程工厂*/public ThreadPool3_1(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy, ThreadFactory threadFactory) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = unit.toNanos(keepAliveTime);this.taskQueue = taskQueue;this.rejectPolicy = rejectPolicy;this.threadFactory = threadFactory;}/*** 存放线程的集合,使用 {@link Set} 是因为 {@link Set#remove(Object)} 性能更高*/private final Set<Worker> threadPool = new HashSet<>();/*** 线程池的管程* <p>* 用于保证 <strong>获取线程池大小</strong>、<strong>将线程放入线程池</strong>、<strong>从线程池中移除线程</strong> 的互斥性*/private final Object threadPoolMonitor = new Object();/*** <h3>核心线程执行的任务</h3>* {@link #getTask()} 方法会一直阻塞,直到有新任务*/public final class CoreWorker extends Worker {public CoreWorker(Runnable initialTask, Set<Worker> threadPool, ThreadFactory threadFactory) {super(initialTask, threadPool, threadFactory);}@Overrideprotected Runnable getTask() {try {return taskQueue.take();} catch (InterruptedException e) {throw new RuntimeException(e);}}@Overrideprotected void onWorkerExit() {// 在目前的代码中,发现核心线程并不会退出,所以这个方法先不实现}}/*** <h3>临时线程执行的任务</h3>* {@link #getTask()} 方法会在阻塞一定时间后如果还没有任务,则会返回 {@code null}*/public final class TempWorker extends Worker {public TempWorker(Runnable initialTask, Set<Worker> threadPool, ThreadFactory threadFactory) {super(initialTask, threadPool, threadFactory);}@Overrideprotected Runnable getTask() {try {return taskQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}}@Overrideprotected void onWorkerExit() {removeWorkerFromThreadPool(this);}}/*** 提交任务** @param task 待执行的任务*/public void submit(Runnable task) {// 如果 线程数量 小于 最大核心线程数量,则新建一个 核心线程 执行任务,然后直接返回synchronized (threadPoolMonitor) {if (threadPool.size() < corePoolSize) {CoreWorker coreWorker = new CoreWorker(task, threadPool, threadFactory);coreWorker.start();return;}}// 如果能够放到任务队列中,则直接返回if (taskQueue.offer(task)) {return;}// 如果 线程数量 小于 最大线程数量,则新建一个 临时线程 执行任务synchronized (threadPoolMonitor) {if (threadPool.size() < maxPoolSize) {TempWorker tempWorker = new TempWorker(task, threadPool, threadFactory);tempWorker.start();}}// 线程数量到达最大线程数量,任务队列已满,执行拒绝策略rejectPolicy.reject(this, task);}/*** 获取当前线程池中的线程数量** @return 当前线程池中的线程数量*/public int getCurrPoolSize() {synchronized (threadPoolMonitor) {return threadPool.size();}}/*** 丢弃任务队列 {@link #taskQueue} 中的最旧的任务(队头任务)** @return 任务队列中的最旧的任务(队头任务)*/public Runnable discardOldestTask() {return taskQueue.poll();}/*** 从 {@link #threadPool} 中移除指定的 {@link Worker} 对象** @param worker 待移除的 {@link Worker} 对象*/private void removeWorkerFromThreadPool(Worker worker) {synchronized (threadPoolMonitor) {threadPool.remove(worker);}}
}
4.2 ThreadPoolUtil
public class ThreadPoolUtil {/*** 创建一个全是核心线程的线程池** @param poolSize 核心线程的数量* @return 一个全是核心线程的线程池*/public static ThreadPool3_1 newFixedThreadPool(int poolSize) {return new ThreadPool3_1(poolSize, poolSize, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());}/*** 创建一个全是核心线程的线程池** @param poolSize 核心线程的数量* @param threadFactory 线程工厂* @return 一个全是核心线程的线程池*/public static ThreadPool3_1 newFixedThreadPool(int poolSize, ThreadFactory threadFactory) {return new ThreadPool3_1(poolSize, poolSize, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);}/*** 创建一个全是临时线程的线程池** @return 一个全是临时线程的线程池*/public static ThreadPool3_1 newCachedThreadPool() {return new ThreadPool3_1(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<>());}/*** 创建一个全是临时线程的线程池** @param threadFactory 线程工厂* @return 一个全是临时线程的线程池*/public static ThreadPool3_1 newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPool3_1(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);}/*** 创建只有一个线程的线程池** @return 只有一个线程的线程池*/public static ThreadPool3_1 newSingleThreadPool() {return new ThreadPool3_1(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());}/*** 创建只有一个线程的线程池** @param threadFactory 线程工厂* @return 只有一个线程的线程池*/public static ThreadPool3_1 newSingleThreadPool(ThreadFactory threadFactory) {return new ThreadPool3_1(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);}
}
5. 测试程序
public class ThreadPool3_1Test {/*** 测试线程池 3.1 版本的基本功能*/@Testpublic void test() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = new ThreadPool3_1(1, 2, 3, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), RejectPolicy.THROW_EXCEPTION);LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 3.1 版本的 {@link ThreadPoolUtil} 创建全是核心线程的线程池 的功能*/@Testpublic void testNewFixedThreadPool() throws InterruptedException {final int taskSize = 5;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = ThreadPoolUtil.newFixedThreadPool(2);LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 3.1 版本的 {@link ThreadPoolUtil} 创建全是临时线程的线程池 的功能*/@Testpublic void testNewCachedThreadPool() throws InterruptedException {final int taskSize = 5;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = ThreadPoolUtil.newCachedThreadPool();LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 3.1 版本的 {@link ThreadPoolUtil} 创建只有一个线程的线程池 的功能*/@Testpublic void testNewSingleThreadPool() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool3_1 threadPool = ThreadPoolUtil.newSingleThreadPool();LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}
}
6. 测试结果
6.1 test
新建的线程采用 thread-pool-? 的方式命名,这是默认线程工厂在起作用。
16:16:48 [ main] 提交任务前
16:16:48 [ main] 提交任务后
16:16:49 [thread-pool-0] 正在执行任务0
16:16:50 [thread-pool-0] 正在执行任务1
16:16:51 [thread-pool-0] 正在执行任务2
16:16:51 [ main] 任务执行完毕
6.2 testNewFixedThreadPool
线程只有固定的两个,thread-pool-0
和 thread-pool-1
。
16:20:20 [ main] 提交任务前
16:20:20 [ main] 提交任务后
16:20:21 [thread-pool-1] 正在执行任务1
16:20:21 [thread-pool-0] 正在执行任务0
16:20:22 [thread-pool-0] 正在执行任务3
16:20:22 [thread-pool-1] 正在执行任务2
16:20:23 [thread-pool-0] 正在执行任务4
16:20:23 [ main] 任务执行完毕
6.3 testNewCachedThreadPool
线程数量不固定,一个任务新建一个线程。
16:21:05 [ main] 提交任务前
16:21:05 [ main] 提交任务后
16:21:06 [thread-pool-4] 正在执行任务4
16:21:06 [thread-pool-3] 正在执行任务3
16:21:06 [thread-pool-0] 正在执行任务0
16:21:06 [thread-pool-2] 正在执行任务2
16:21:06 [thread-pool-1] 正在执行任务1
16:21:06 [ main] 任务执行完毕
6.4 testNewSingleThreadPool
只有一个线程,串行执行所有任务。
16:21:16 [ main] 提交任务前
16:21:16 [ main] 提交任务后
16:21:17 [thread-pool-0] 正在执行任务0
16:21:18 [thread-pool-0] 正在执行任务1
16:21:19 [thread-pool-0] 正在执行任务2
16:21:19 [ main] 任务执行完毕
7. 思考
- 在
ThreadPool3_1
中,默认的线程工厂中有一个count
字段,当多线程提交任务时,count
会不会有线程安全问题?如果有,应该怎么修改?如果没有,为什么? - 众所周知,资源有创建就有销毁,线程池也一样,需要一个停止线程运行的方法,你能独立设计并实现它吗?
8. 总结
这次我们实现了自定义线程池的 3.1 版本,提供了 线程工厂创建线程 和 工具类创建简单线程池 的功能,使线程池使用起来更加方便。不过得注意一点,在生产中一定不要使用工具类简单创建的线程池,最好通过压测得到线程池的各个参数。除此之外,本文还讲解了不同场景下的线程池选型,可以作为以后使用线程池的参考。