当前位置: 首页 > news >正文

Java大师成长计划之第16天:高级并发工具类

📢 友情提示:

本文由银河易创AI(https://ai.eaigx.com)平台gpt-4o-mini模型辅助创作完成,旨在提供灵感参考与技术分享,文中关键数据、代码与结论建议通过官方渠道验证。

在现代Java应用中,处理并发问题是确保系统性能和可扩展性的关键。在前面的学习中,我们已经了解了Java中的基本线程操作和线程安全机制。本篇文章将深入探讨Java的高级并发工具类,包括Executor框架、Future和Fork/Join框架,以帮助你更好地管理并发任务。

一. Executor框架

1.1 什么是Executor框架?

Executor框架是Java在java.util.concurrent包中提供的一套用于管理并发任务的工具。它是基于生产者—消费者模式设计的,主要目的是通过线程池来有效管理线程的生命周期,简化线程的创建、管理和调度。与传统的创建和管理线程的方式不同,Executor框架让开发者不需要手动管理线程,而是通过提交任务来进行调度和执行。通过合理使用线程池,Executor框架能够显著提升应用程序的性能,减少线程的创建和销毁开销,避免资源的浪费。

在没有线程池的情况下,每次需要执行任务时都要手动创建线程,这会导致创建大量线程的开销和管理困难。而Executor框架提供了一种统一的方式来提交任务并执行,开发者只需关心任务的提交和管理,而不必关注具体的线程创建与销毁的细节。

1.2 Executor框架的核心接口

Executor框架主要由以下几种接口组成:

  • Executor接口:它是Executor框架的顶层接口,定义了一个方法execute(Runnable command),用于提交一个Runnable任务。通过这个接口,线程池能够执行传递给它的任务。

  • ExecutorService接口:它继承了Executor接口,并添加了更多与任务生命周期相关的方法,比如任务的提交、任务的取消等。ExecutorService接口提供了更强大的功能,支持提交Callable任务和获取任务的执行结果。它还包含了管理线程池生命周期的方法,如shutdown()

  • ScheduledExecutorService接口:它继承了ExecutorService接口,并提供了对定时任务和周期性任务的支持。开发者可以使用它调度任务在指定时间执行,或定期执行。

1.3 Executor的实现类

Java提供了多个Executor接口的实现类,最常用的实现类有:

  • ThreadPoolExecutor:这是Executor框架中最强大且灵活的线程池实现类。它支持动态调整线程池的大小,可以根据需要创建和管理线程。ThreadPoolExecutor提供了丰富的配置选项,适合于大多数并发任务的管理。

  • ScheduledThreadPoolExecutor:这是ScheduledExecutorService的实现类,支持定时和周期性执行任务。它比Timer类更加灵活,并能更好地处理多线程场景中的异常。

  • Executors:这是一个工厂类,提供了静态方法来创建不同类型的线程池,如固定大小线程池、单线程池和缓存线程池等。通过这个类,开发者能够更轻松地创建和配置线程池。

1.4 使用Executor框架创建线程池

Executor框架通过线程池来管理和调度任务。线程池的使用能够有效减少线程的创建和销毁开销,且能提供任务调度、执行、监控等功能。Java提供了一个工厂类Executors,通过它可以快速创建不同类型的线程池。

1.4.1 创建固定大小线程池
import java.util.concurrent.*;public class ExecutorDemo {public static void main(String[] args) {// 创建一个固定大小的线程池ExecutorService executor = Executors.newFixedThreadPool(2);// 提交任务for (int i = 0; i < 5; i++) {executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " is executing a task.");});}// 关闭线程池executor.shutdown();}
}

在这个例子中,我们创建了一个固定大小的线程池,池中的线程数为2。即使提交了5个任务,线程池中只有两个线程可以同时执行任务。任务的其余部分将被排队等待线程空闲。

1.4.2 创建单线程池
import java.util.concurrent.*;public class SingleThreadExecutorDemo {public static void main(String[] args) {// 创建一个只有一个线程的线程池ExecutorService executor = Executors.newSingleThreadExecutor();// 提交任务for (int i = 0; i < 5; i++) {executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " is executing a task.");});}// 关闭线程池executor.shutdown();}
}

newSingleThreadExecutor()方法创建一个只包含一个线程的线程池。这种类型的线程池常用于需要保证任务按顺序执行的场景,线程池中的任务会顺序执行,并且不会并行执行。

1.4.3 创建可缓存线程池
import java.util.concurrent.*;public class CachedThreadPoolDemo {public static void main(String[] args) {// 创建一个可缓存的线程池ExecutorService executor = Executors.newCachedThreadPool();// 提交任务for (int i = 0; i < 5; i++) {executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " is executing a task.");});}// 关闭线程池executor.shutdown();}
}

newCachedThreadPool()方法创建一个可缓存的线程池。线程池会根据任务的数量动态调整线程池的大小。如果有空闲线程,线程池会复用这些线程,否则会创建新线程来执行任务。当线程池中的线程长时间没有任务执行时,它们会被回收。

1.4.4 创建定时任务线程池
import java.util.concurrent.*;public class ScheduledExecutorServiceDemo {public static void main(String[] args) {// 创建一个定时任务线程池ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);// 提交定时任务:延迟1秒后执行executor.schedule(() -> {System.out.println(Thread.currentThread().getName() + " is executing a scheduled task.");}, 1, TimeUnit.SECONDS);// 提交周期性任务:每2秒执行一次executor.scheduleAtFixedRate(() -> {System.out.println(Thread.currentThread().getName() + " is executing a periodic task.");}, 0, 2, TimeUnit.SECONDS);}
}

newScheduledThreadPool()方法用于创建一个可以执行定时任务和周期性任务的线程池。schedule()方法用于提交延迟任务,而scheduleAtFixedRate()方法用于提交周期性任务。

1.5 线程池的管理

线程池在执行任务时需要合理配置其参数,特别是线程池的大小、任务队列的类型、线程的最大空闲时间等。在使用线程池时,我们通常会面临以下几个问题:

  • 线程池的大小:线程池的大小应根据系统的负载和任务的特点来合理配置。一般情况下,线程池的大小可以通过CPU核心数来确定。例如,Runtime.getRuntime().availableProcessors()方法可以返回当前系统的CPU核心数。对于I/O密集型任务,线程池可以配置为稍大于CPU核心数;而对于CPU密集型任务,线程池的大小通常不需要超过CPU核心数。

  • 任务队列:线程池的任务队列用于存储等待执行的任务。常见的任务队列有:

    • 无界队列(LinkedBlockingQueue):当线程池中的线程数达到最大值时,任务会被无限制地加入队列。
    • 有界队列(ArrayBlockingQueue):有界队列限制了队列的大小,任务队列满时,新的任务将被拒绝。
    • 优先级队列(PriorityBlockingQueue):任务会根据优先级进行排序。
  • 拒绝策略:当线程池中的线程和任务队列都被占满时,可以选择任务的拒绝策略。常见的拒绝策略有:

    • AbortPolicy:直接抛出异常(默认策略)。
    • CallerRunsPolicy:由调用者线程来执行该任务。
    • DiscardPolicy:直接丢弃任务。
    • DiscardOldestPolicy:丢弃队列中最旧的任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, // corePoolSize4, // maximumPoolSize60, // keepAliveTimeTimeUnit.SECONDS, // time unitnew ArrayBlockingQueue<>(10), // work queuenew ThreadPoolExecutor.CallerRunsPolicy() // rejection policy
);

1.6 总结

Executor框架提供了一个非常强大而灵活的机制来管理并发任务。通过使用不同类型的线程池,开发者可以根据任务的特点,灵活地调度和执行任务。同时,Executor框架不仅简化了线程管理,也有效避免了手动管理线程池时的资源浪费和复杂性。随着你对线程池及其配置的不断熟悉,你将能够更好地应对多线程编程中的各种挑战。

二. Future接口

2.1 什么是Future?

Future接口是Java中的一个重要接口,位于java.util.concurrent包中,代表一个异步计算的结果。它提供了一种机制,使得开发者可以在任务执行完成后获取结果,同时还可以检查任务的完成状态或处理异常。Future接口的设计使得多线程编程变得更加灵活和高效。

在并行编程中,通常会遇到需要在某个任务执行完成后获取其计算结果的场景。使用Future接口,开发者不仅可以提交并行执行的任务,还可以在需要的时候安全地获取这些任务的结果或处理异常。

2.2 Future接口的主要方法

Future接口定义了一些关键的方法,帮助开发者管理异步任务的执行。以下是Future接口中最常用的方法:

  • boolean cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。如果任务已被执行或已完成,则无法取消。mayInterruptIfRunning参数指示是否中断正在执行的任务。

  • boolean isCancelled():检查任务是否已被取消。

  • boolean isDone():检查任务是否已完成。任务可能是通过正常完成、异常或被取消而完成。

  • V get() throws InterruptedException, ExecutionException:获取任务的结果。如果任务尚未完成,该方法会阻塞,直到结果可用。

  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException:获取任务的结果,并设置超时。如果在指定时间内未完成,则抛出TimeoutException

2.3 提交任务并获取结果

在Executor框架中,使用submit()方法可以提交一个Callable任务,并返回一个Future对象。通过这个Future对象,可以在任务执行完成后获取结果或处理异常。

示例:使用Future获取任务结果
import java.util.concurrent.*;public class FutureResultExample {public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(1);// 提交一个Callable任务Future<Integer> future = executor.submit(() -> {// 模拟长时间任务Thread.sleep(2000);return 42; // 返回结果});// 在这里可以执行其他操作System.out.println("Task submitted, doing other things...");try {// 获取任务结果(阻塞式)Integer result = future.get(); // 这会等待任务完成System.out.println("Task result: " + result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {executor.shutdown(); // 关闭线程池}}
}

在这个示例中,我们提交了一个Callable任务,计算结果的值为42。调用future.get()时,如果任务未完成,当前线程会被阻塞,直到任务完成并返回结果。在这段时间内,我们仍然可以执行其他操作。

2.4 Future的常用用途

Future接口可以用于多种场景,以下是一些典型的用途:

  1. 异步任务执行Future非常适合于需要在后台线程中执行耗时操作的场景,比如文件下载、网络请求等。

  2. 批量处理:在需要处理大量数据时,可以使用Future提交多个异步任务,并在所有任务完成后获取结果。

  3. 任务取消:通过Future.cancel()方法,可以尝试取消正在执行的任务,这在处理长时间运行的操作时非常有用。

  4. 超时控制:使用get(long timeout, TimeUnit unit)方法,可以指定获取结果的超时时间,从而避免线程长时间阻塞。

2.5 示例:取消Future任务

import java.util.concurrent.*;public class CancelFutureExample {public static void main(String[] args) throws InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(1);// 提交一个长时间运行的任务Future<Integer> future = executor.submit(() -> {try {Thread.sleep(5000); // 模拟长时间任务} catch (InterruptedException e) {System.out.println("Task was interrupted");return null; // 返回 null 表示任务中断}return 42; // 返回结果});// 等待一段时间后尝试取消任务Thread.sleep(1000);boolean canceled = future.cancel(true); // 请求取消System.out.println("Task canceled: " + canceled);try {// 尝试获取任务结果Integer result = future.get(); // 可能会抛出异常System.out.println("Task result: " + result);} catch (CancellationException e) {System.out.println("Task was canceled: " + e.getMessage());} catch (ExecutionException e) {System.out.println("Task encountered an exception: " + e.getCause());}executor.shutdown(); // 关闭线程池}
}

在这个示例中,我们提交了一个模拟长时间运行的任务。我们在1秒后尝试取消这个任务。调用future.cancel(true)会请求取消任务,如果任务正在执行且可以中断,任务将在下一个检查点被中断。我们可以通过异常处理来捕获任务取消或执行中的异常。

2.6 总结

Future接口为Java中的异步计算提供了强大的支持,允许开发者轻松管理和获取异步任务的结果。通过合理使用Future,可以提高程序的响应能力和性能,特别是在处理耗时的操作或需要并发执行多个任务的场合。理解Future的使用和特点,将使你在并发编程中游刃有余。

三. Fork/Join框架

3.1 什么是Fork/Join框架?

Fork/Join框架是Java 7引入的一种并行编程模型,旨在充分利用多核处理器的能力。该框架采用了分而治之的设计思想,将一个复杂的任务递归地分解成多个小任务并行执行,最终合并结果。Fork/Join框架位于java.util.concurrent包中,主要由ForkJoinPoolForkJoinTask及其子类RecursiveTaskRecursiveAction构成。

通过Fork/Join框架,开发者可以高效地处理需要大量计算的任务,特别是在处理大规模的数据集或计算问题时。由于其底层实现了工作窃取算法,因此它能够在任务执行过程中动态地调整线程的利用率,从而提高程序的执行效率。

3.2 ForkJoinPool

ForkJoinPool是Fork/Join框架的核心组件,负责管理和调度Fork/Join任务。与传统的线程池不同,Fork/Join框架采用了工作窃取算法,这意味着空闲的工作线程可以从其他忙碌线程的任务队列中“窃取”任务以执行。这种设计极大地提高了多核CPU的利用率,避免了线程的长时间空闲。

3.2.1 ForkJoinPool的基本特性
  • 工作窃取:每个工作线程都有一个双端队列,用于存储待处理的任务。当线程的任务队列为空时,它会从其他线程的队列中窃取任务执行。
  • 分层结构:Fork/Join框架支持递归任务的分层结构,能够自动调整任务的分配和执行策略。
  • 可调的并行度:通过设置并行度,开发者可以控制Fork/Join框架的性能表现,适应不同的应用场景。

3.3 ForkJoinTask

ForkJoinTask是Fork/Join框架中的抽象类,表示可以在ForkJoinPool中执行的任务。它是异步计算的基础,主要有两种具体的实现:

  • RecursiveTask<V>:用于在任务执行后返回结果的子类,适合有结果返回的计算任务。
  • RecursiveAction:用于不返回结果的子类,适合只执行操作的任务。

3.4 使用RecursiveTask进行任务分解

在Fork/Join框架中,通常使用RecursiveTask来实现可分解的计算任务。开发者需要重写compute()方法,在该方法中定义任务的分解逻辑和结果合并的过程。

示例:使用Fork/Join框架计算斐波那契数
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;public class Fibonacci extends RecursiveTask<Integer> {private final int n;public Fibonacci(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n <= 1) {return n;}// 将任务分解为两个子任务Fibonacci f1 = new Fibonacci(n - 1);Fibonacci f2 = new Fibonacci(n - 2);// 异步计算第一个子任务f1.fork();// 计算第二个子任务int resultF2 = f2.compute();// 等待第一个子任务完成并获取结果int resultF1 = f1.join();// 返回最终结果return resultF1 + resultF2;}public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();int n = 10; // 计算第10个斐波那契数int result = pool.invoke(new Fibonacci(n)); // 提交任务System.out.println("Fibonacci of " + n + " is " + result);}
}

在这个例子中,我们实现了一个计算斐波那契数的RecursiveTaskcompute()方法中,当输入参数n小于等于1时,直接返回结果;否则,将任务分解为两个子任务,然后异步计算第一个子任务,最后计算第二个子任务,并合并结果。

3.4.1 任务分解与合并

Fork/Join框架的核心思想是“分而治之”,即将一个大任务分解为多个小任务并行处理。在执行compute()时,开发者需要决定何时分解任务以及如何合并结果。

  • 任务分解:通过递归地创建子任务,任务可以被逐步分解到足够小且易于计算的程度。
  • 结果合并:使用join()方法等待子任务完成并获取结果,合并所有子任务的结果以得到最终的答案。

3.5 使用RecursiveAction进行并行操作

当任务不需要返回结果时,可以使用RecursiveAction类。RecursiveAction的使用方式与RecursiveTask类似,但不涉及结果的返回。

示例:使用Fork/Join框架进行数组求和
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;public class ArraySum extends RecursiveAction {private final int[] array;private final int start;private final int end;private static final int THRESHOLD = 10; // 阈值public ArraySum(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected void compute() {if (end - start <= THRESHOLD) {// 直接计算小块的和int sum = 0;for (int i = start; i < end; i++) {sum += array[i];}System.out.println("Sum from " + start + " to " + end + " is: " + sum);} else {// 分解任务int mid = (start + end) / 2;ArraySum leftTask = new ArraySum(array, start, mid);ArraySum rightTask = new ArraySum(array, mid, end);invokeAll(leftTask, rightTask); // 同时执行两个子任务}}public static void main(String[] args) {int[] array = new int[100];for (int i = 0; i < array.length; i++) {array[i] = i + 1; // 填充数组}ForkJoinPool pool = new ForkJoinPool();pool.invoke(new ArraySum(array, 0, array.length)); // 提交任务}
}

在这个例子中,我们实现了一个RecursiveAction来计算数组的和。通过设置一个阈值,当任务的大小小于等于THRESHOLD时,直接计算和;否则,将任务分解为两个子任务并并行处理。

3.6 Fork/Join框架的优势

  • 高效利用多核处理器:Fork/Join框架的工作窃取算法能够充分利用多核CPU的处理能力,使得任务执行更加高效。
  • 简化并行编程:通过提供简单的API,Fork/Join框架最大限度地简化了并行编程的复杂性,允许开发者专注于任务的分解和合并。
  • 动态调整任务:由于采用工作窃取算法,Fork/Join框架能够动态调整并发任务的执行,从而提高系统的响应能力和处理效率。

3.7 总结

Fork/Join框架为Java开发者提供了一种强大的工具,能够以简洁的方式实现高效的并行处理。通过合理使用ForkJoinPoolRecursiveTaskRecursiveAction,开发者可以将复杂的计算任务分解为小任务并行执行,从而显著提高程序的性能和响应能力。掌握Fork/Join框架将使你在处理大规模计算问题时游刃有余,充分发挥多核处理器的优势。

四. 总结

在本篇文章中,我们深入探讨了Java中的高级并发工具类,包括Executor框架、Future接口和Fork/Join框架。通过合理使用这些工具,我们可以更高效地管理线程和并发任务,提高应用程序的性能和响应能力。随着对并发编程的深入理解,你将能够设计出更加高效和可靠的Java应用程序。

继续关注我们的Java大师成长计划,接下来的内容将为你带来更多有趣和实用的技术!

相关文章:

  • 通过.sh脚本设置java环境变量
  • LeetCode:101、对称二叉树
  • 分治算法-leetcode148题
  • Linux云计算训练营笔记day05(Rocky Linux中的命令:管道操作 |、wc、find、vim)
  • Godot4.3类星露谷游戏开发之【昼夜循环】
  • 【软件设计师:数据】17.数据安全
  • 人力资源管理系统如何有效提高招聘效率?
  • Navicat 17最新保姆级安装教程(附安装包+永久使用方法)
  • 软件设计师教程——第一章 计算机系统知识(下)
  • 不同渲染任务,用CPU还是GPU?
  • upload文件上传
  • MySQL 的锁机制
  • Webug4.0靶场通关笔记24- 第29关Webshell爆破
  • Linux 大于2T磁盘分区
  • opencv中的图像特征提取
  • RK3588 Ubuntu安装Qt6
  • 从代码学习深度学习 - 区域卷积神经网络(R-CNN)系列 PyTorch版
  • levelDB的数据查看(非常详细)
  • 【面板数据】各省双向FDI协调发展水平数据集(2005-2022年)
  • 并发 vs 并行编程详解
  • 东洋学人|滨田青陵:近代日本考古学第一人
  • 正荣地产:董事会主席、行政总裁辞任,拟投入更多精力推动境内债重组等工作
  • 央行:增加科技创新和技术改造再贷款额度3000亿元
  • 云南省司法厅党委书记、厅长茶忠旺主动投案,正接受审查调查
  • 罗马尼亚临时总统博洛让任命普雷多尤为看守政府总理
  • 今年五一档电影票房已破7亿