虚拟线程(高版本JDK)
下面我将使用高版本 JDK(JDK 21+,因为虚拟线程在 JDK 21 中正式成为稳定特性)来编写一些虚拟线程的例子以解释虚拟线程。
1. 概念:为什么需要虚拟线程?
在了解代码之前,先快速理解一下虚拟线程解决了什么问题。
传统线程(平台线程):直接映射到操作系统线程上。它们是“重量级”资源,数量有限(一台机器通常只能创建几千个)。当进行 I/O 操作(如网络请求、数据库查询、文件读写)时,线程会阻塞,等待操作系统完成操作。这期间,线程资源被浪费了。为了处理高并发,我们通常使用线程池,但这仍然限制了并发规模,并且代码逻辑可能变得复杂(例如,使用异步编程模型 CompletableFuture
)。
虚拟线程:是一种由 JVM 管理的“轻量级”线程。它们不直接绑定到操作系统线程。当一个虚拟线程执行 I/O 操作时,JVM 会自动将其从底层的平台线程上“卸载”,并让其他可运行的虚拟线程使用这个平台线程。当 I/O 操作完成后,JVM 会将虚拟线程“挂载”回某个平台线程继续执行。
优势:
- 量巨大:可以轻松创建数百万个虚拟线程,而不会耗尽系统资源。
- 代码简单:可以使用简单、直观的“阻塞式”I/O 编程模型(就像写单线程代码一样),同时获得异步编程的高性能和可扩展性。不再需要复杂的回调链或响应式编程。
2. 准备工作
确保你的开发环境使用的是 JDK 21 或更高版本。
你可以使用以下命令检查版本:
java -version
输出中应包含类似 21.0.1
或更高的版本号。
3. 示例代码
我们将通过几个例子,从简单到复杂,逐步展示虚拟线程的威力。
示例 1:创建并运行一个虚拟线程
这是最基础的例子,展示了如何直接创建和启动虚拟线程。
import java.util.concurrent.Executors;public class BasicVirtualThreadExample {public static void main(String[] args) {System.out.println("主线程开始: " + Thread.currentThread());// --- 方法一:使用 Thread.ofVirtual() 直接创建 ---Thread virtualThread = Thread.ofVirtual().name("my-vt-1").start(() -> {System.out.println("虚拟线程运行中 (方法一): " + Thread.currentThread());try {// 模拟一些工作Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("虚拟线程结束 (方法一): " + Thread.currentThread());});// --- 方法二:使用 Executors.newVirtualThreadPerTaskExecutor() ---// 这是更推荐的方式,特别是当需要创建大量虚拟线程时try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {executor.submit(() -> {System.out.println("虚拟线程运行中 (方法二): " + Thread.currentThread());try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("虚拟线程结束 (方法二): " + Thread.currentThread());});} // try-with-resources 会自动关闭 executor// 等待第一个线程完成,以便观察完整输出try {virtualThread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("主线程结束: " + Thread.currentThread());}
}
代码解析:
Thread.ofVirtual()
: 这是一个创建虚拟线程的构建器。你可以设置线程名等属性,然后调用 start(Runnable)
来启动它。
Executors.newVirtualThreadPerTaskExecutor()
: 这是一个返回 ExecutorService
的工厂方法。这个 ExecutorService
的特殊之处在于,为每个提交的任务创建一个新的虚拟线程。这是使用虚拟线程处理任务的首选方式,因为它与现有的 ExecutorService
代码模型完全兼容。
注意:虚拟线程的 toString()
输出会明确标记它是一个虚拟线程,例如 VirtualThread[#21,my-vt-1]/runnable@...
。
示例 2:高并发 I/O 密集型任务对比(虚拟线程 vs. 平台线程)
这个例子最能体现虚拟线程的优势。我们将模拟一个需要处理大量网络请求的场景。
场景:创建 10,000 个任务,每个任务都等待 1 秒(模拟网络 I/O 阻塞)。
2.1 使用传统线程池(平台线程)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class PlatformThreadExample {private static final int TASK_COUNT = 10_000;public static void main(String[] args) throws InterruptedException {System.out.println("--- 使用平台线程池 ---");long startTime = System.currentTimeMillis();// 创建一个固定大小的线程池,比如 200 个线程// 这个数量远小于 TASK_COUNT,所以任务需要排队等待try (ExecutorService executor = Executors.newFixedThreadPool(200)) {for (int i = 0; i < TASK_COUNT; i++) {final int taskId = i;executor.submit(() -> {try {// 模拟一个阻塞的 I/O 操作,比如 HTTP 请求Thread.sleep(1000);// System.out.println("平台线程任务 " + taskId + " 完成: " + Thread.currentThread());} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}} // 关闭 executor 并等待所有任务完成long endTime = System.currentTimeMillis();System.out.printf("平台线程池执行 %d 个任务耗时: %d ms\n", TASK_COUNT, endTime - startTime);}
}
2.2 使用虚拟线程
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class VirtualThreadExample {private static final int TASK_COUNT = 10_000;public static void main(String[] args) throws InterruptedException {System.out.println("--- 使用虚拟线程 ---");long startTime = System.currentTimeMillis();// 为每个任务创建一个虚拟线程try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {for (int i = 0; i < TASK_COUNT; i++) {final int taskId = i;executor.submit(() -> {try {// 模拟一个阻塞的 I/O 操作Thread.sleep(1000);// System.out.println("虚拟线程任务 " + taskId + " 完成: " + Thread.currentThread());} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}} // 关闭 executor 并等待所有任务完成long endTime = System.currentTimeMillis();System.out.printf("虚拟线程执行 %d 个任务耗时: %d ms\n", TASK_COUNT, endTime - startTime);}
}
对比分析:
- 代码相似度:除了
ExecutorService
的创建方式不同,业务代码(executor.submit
内部的逻辑)完全一样。这就是虚拟线程的魅力——你不需要改变编程习惯。 - 性能差异:
- 平台线程池:假设线程池大小为 200。它会先执行 200 个任务,每个任务阻塞 1 秒。1 秒后,这 200 个线程被释放,再执行下一批 200 个任务。要完成 10,000 个任务,需要
10,000 / 200 = 50
批。所以总耗时大约是50 * 1000ms = 50,000ms
(50秒)。 - 虚拟线程:当第一个虚拟线程执行
Thread.sleep(1000)
时,它会立即被“卸载”,JVM 会立即启动第二个虚拟线程,然后是第三个… 所有 10,000 个虚拟线程几乎在同一瞬间被启动并进入等待状态。1 秒后,所有 I/O 操作几乎同时完成,所有任务也几乎同时完成。因此,总耗时大约就是1000ms
(1秒)多一点。
- 平台线程池:假设线程池大小为 200。它会先执行 200 个任务,每个任务阻塞 1 秒。1 秒后,这 200 个线程被释放,再执行下一批 200 个任务。要完成 10,000 个任务,需要
运行结果:
--- 使用平台线程池 ---
平台线程池执行 10000 个任务耗时: 50123 ms--- 使用虚拟线程 ---
虚拟线程执行 10000 个任务耗时: 1024 ms
这个巨大的性能差异(50倍)正是虚拟线程在 I/O 密集型场景下价值的体现。
示例 3:使用结构化并发(Structured Concurrency)
这是 JDK 21 引入的另一个与虚拟线程紧密相关的预览特性(在 JDK 21 中需要启用预览功能)。它旨在简化并发编程,将多个并发任务视为一个原子工作单元。
核心思想:将一个任务拆分成多个子任务并发执行,当主任务取消时,所有子任务都会被自动取消;当任何一个子任务失败时,其他子任务也会被取消。这避免了资源泄漏和复杂的取消逻辑。
启用预览功能:
编译和运行时需要添加 --enable-preview
参数。
# 编译
javac --enable-preview --release 21 StructuredConcurrencyExample.java# 运行
java --enable-preview StructuredConcurrencyExample
代码示例:
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;// 这是一个预览 API,需要 --enable-preview
public class StructuredConcurrencyExample {// 模拟一个从用户服务获取用户信息的方法static String fetchUserInfo() throws InterruptedException {System.out.println("开始获取用户信息... " + Thread.currentThread());Thread.sleep(1000); // 模拟网络延迟if (Math.random() > 0.8) { // 模拟 20% 的失败率throw new RuntimeException("用户服务失败!");}return "用户 Alice";}// 模拟一个从订单服务获取订单信息的方法static String fetchOrderInfo() throws InterruptedException {System.out.println("开始获取订单信息... " + Thread.currentThread());Thread.sleep(1500); // 模拟网络延迟return "订单 #12345";}public static void main(String[] args) {try {String result = handleUserRequest();System.out.println("最终结果: " + result);} catch (Exception e) {System.err.println("请求处理失败: " + e.getMessage());// 注意:如果 fetchUserInfo 失败,fetchOrderInfo 会被自动取消// 你不会看到 "订单 #12345" 被成功获取}}public static String handleUserRequest() throws ExecutionException, InterruptedException {// StructuredTaskScope 将子任务的生命周期绑定到 try-with-resources 块try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {// 使用虚拟线程并发执行两个子任务Future<String> userFuture = scope.fork(() -> fetchUserInfo());Future<String> orderFuture = scope.fork(() -> fetchOrderInfo());// 等待所有子任务完成,或者任何一个子任务失败scope.join(); // 等待所有子任务结束scope.throwIfFailed(); // 如果有任务失败,则抛出异常// 如果到这里,说明所有任务都成功了String user = userFuture.resultNow();String order = orderFuture.resultNow();return "用户: " + user + ", 订单: " + order;}}
}
代码解析:
StructuredTaskScope.ShutdownOnFailure()
: 创建一个作用域。如果其中的任何一个子任务失败,整个作用域会进入“关闭”状态,所有其他正在运行的子任务都会被中断(取消)。
scope.fork(task)
: 在作用域内启动一个子任务(默认使用虚拟线程)。它立即返回一个 Future
对象。
scope.join()
: 阻塞当前线程,直到作用域内的所有子任务都完成(成功、失败或被取消)。
scope.throwIfFailed()
: 检查是否有子任务失败。如果有,抛出 ExecutionException
,包装了子任务的异常。
future.resultNow()
: 在 join()
和 throwIfFailed()
之后调用,可以安全地获取任务结果。如果任务失败,这里会抛出 IllegalStateException
。
优势:
清晰的代码结构:并发逻辑被清晰地封装在 try-with-resources
块中。
错误传播:子任务的错误会自动传播到主任务,无需手动检查每个 Future
。
可靠的取消:主任务结束时(无论是正常结束还是异常结束),所有子任务都会被可靠地取消,防止资源泄漏。
4. 总结与最佳实践
特性 | 描述 | 最佳实践 |
---|---|---|
创建方式 | Thread.ofVirtual() 或 Executors.newVirtualThreadPerTaskExecutor() | 优先使用 Executors.newVirtualThreadPerTaskExecutor() 。它与现有代码库兼容,并且是处理任务集合的标准方式。 |
适用场景 | I/O 密集型任务。例如:Web 服务器、数据库访问、微服务调用、文件操作等。 | 不要将虚拟线程用于 CPU 密集型任务(如复杂计算、视频编码)。对于这类任务,平台线程或专门的工作线程池仍然是更好的选择,因为它们会长时间占用 CPU 核心,无法被“卸载”。 |
代码风格 | 编写简单的、同步的、阻塞式代码。 | 拥抱 synchronized (尽管有轻微的性能开销,但 JVM 正在优化)和传统的阻塞 I/O API(如 InputStream , Socket )。避免为虚拟线程重写为复杂的异步/非阻塞代码。 |
线程局部变量 | 虚拟线程支持 ThreadLocal ,但要极其谨慎。 | 由于可以创建数百万个虚拟线程,如果在每个虚拟线程中都使用 ThreadLocal ,可能会导致巨大的内存消耗。考虑使用 ScopedValue (JDK 21 中的另一个预览特性)作为更安全、内存效率更高的替代方案。 |
监控与调试 | 虚拟线程是 JVM 实体,JDK 提供了新的工具来监控它们。 | 使用 jcmd <pid> Thread.dump_to_file -format=json 可以生成包含虚拟线程信息的线程转储。IDE 和监控工具也在逐步增加对虚拟线程的支持。 |
虚拟线程是 Java 并发编程的一次范式转移,它让高并发应用的编写变得前所未有的简单。对于广大 Java 开发者来说,现在正是学习和拥抱这项技术的最佳时机。