跟进 JDK25:将虚拟线程安全引入生产的权衡与实战
目录
一、为什么要更新成虚拟线程?
二、方案
(一)在 Filter 内部创建虚拟线程
(二)让 Tomcat 在创建请求线程时使用虚拟线程
三、结论
四、代码实现
(一)HttpsVirtualThreadConfig
(二)BaseContext
(三)VirtualThreadAsyncJwtFilter
一、为什么要更新成虚拟线程?
为了最大限度地利用 CPU,避免平台线程被 IO 阻塞,从而提高应用程序的整体吞吐量,尤其是在大量 IO 密集型请求的场景下可以显著提升并发处理能力。
有关虚拟线程相关的知识可以先看这篇博客:
带你轻松学习虚拟线程和StructuredTaskScope-CSDN博客
https://blog.csdn.net/2401_88959292/article/details/150288495?spm=1001.2014.3001.5501
注意:使用虚拟线程并不一定会降低单个请求的响应延迟,它提升的是并发处理能力(吞吐量 / QPS),而非单请求的延迟。
二、方案
(一)在 Filter 内部创建虚拟线程
一种思路是新建一个 Filter,在 Filter 中把请求处理包装成一个任务交给虚拟线程执行。
但直接开启裸的虚拟线程并不规范,例如:
Thread.ofVirtual().start(() -> {});
这样做的问题有两点:无法方便地监控任务生命周期,也不易获取任务返回值或处理异常。
于是常见的做法是使用Future:
Future<TaskResult> future = service.submit(() -> {});
TaskResult result = future.get();
但 future.get() 会阻塞当前线程——如果这个线程是平台线程,就又回到了阻塞平台线程的问题上。要利用虚拟线程的优势,必须避免阻塞平台线程。
改进方向包括使用 CompletableFuture 异步回调来避免直接阻塞平台线程,但回调式编程带来的问题是回调时机不可控、代码复杂且难以统一管理任务的生命周期。
因此更理想的是使用结构化并发来组织这些异步任务:它能提供更可控的生命周期管理、错误聚合与超时处理,且能和虚拟线程很好地配合。
不过JDK25中,结构化并发仍旧属于预览特性,所以用在线上项目极不安全,因此我们暂时放弃了该方案。
(二)让 Tomcat 在创建请求线程时使用虚拟线程
既然手动把工作包装成任务不够稳定,另一个做法是让Tomcat为每个请求直接使用虚拟线程执行,即为Tomcat的请求处理器配置一个虚拟线程,例如通过Executors.newThreadPerTaskExecutor来替换默认的线程池。这样,容器分配的执行线程本身就是虚拟线程,从而避免了过滤器里把平台线程包装后再转发的问题。
但是newThreadPerTaskExecutor并不是传统意义上的固定大小线程池,没有核心/最大线程数的界限,如果并发量失控,会对DB连接池等下游资源造成压力。
因此仍然需要在应用层做并发限制——例如在最外层加一个高优先级的信号量Semaphore的 Filter,用于控制最大并发数,避免耗尽DB连接或触发OOM。这个Filter只负责发放许可,并在请求完成后释放许可。
三、结论
不过我们的项目中肯定不少CPU密集型任务,这类逻辑并不适用虚拟线程。所以方案二这种全量变为虚拟线程的方案肯定还是不稳妥的。
最佳实践仍旧是方案一,待结构化并发正式发布后,我们应该使用方案一的思路,对于封装成虚拟线程的Filter的排除路径中加上CPU密集型任务接口的路径,让这些接口依旧使用平台线程。
最后对于Xxl-Job定时任务调度、MQ调度这种中间件自己调度的线程,它们依旧还是平台线程,因此我们可以写个自定义注解,在注解中使用结构化并发,将pjp.proceed()包装成一个任务交给结构化并发管理进行虚拟线程调度执行,这样灵活性也高,对于CPU密集型任务则不加该注解即可。
但是由于结构化并发仍旧为预览特性,而且对于Web项目的绝大多数接口来说都是以IO密集型任务为主,所以这里我们只展示方案二的代码实现。
四、代码实现
(一)HttpsVirtualThreadConfig
@Configuration
public class HttpsVirtualThreadConfig {@Value("${quick-ssl.http-port:8080}")private Integer httpPort;@Beanpublic TomcatServletWebServerFactory tomcatServletWebServerFactory() {TomcatServletWebServerFactory factory = new TomcatServletWebServerFactory();factory.addAdditionalTomcatConnectors(httpConnector());return factory;}@Beanpublic Connector httpConnector() {Connector connector = new Connector("org.apache.coyote.http11.Http11NioProtocol");connector.setPort(httpPort);// 给这个connector单独设置虚拟线程池connector.addLifecycleListener(event -> {// 在创建初期使用if ("after_start".equals(event.getType())) {var protocolHandler = connector.getProtocolHandler();ThreadFactory factory = Thread.ofVirtual().name("VirtualThread#", 1).factory();protocolHandler.setExecutor(Executors.newThreadPerTaskExecutor(factory));}});return connector;}}
(二)BaseContext
public final class BaseContext {public static final ScopedValue<Long> CURRENT_ID = ScopedValue.newInstance();// 允许抛受检异常@FunctionalInterfacepublic interface ThrowingRunnable {void run() throws Exception;}public static void runWithCurrentId(Long id, ThrowingRunnable action) throws Exception {Objects.requireNonNull(action);var scope = ScopedValue.where(CURRENT_ID, id);// Scope不允许抛受检异常,因此我们包装并在异常时向外抛final Exception[] thrown = new Exception[1];Runnable runner = () -> {try {action.run();} catch (Exception e) {thrown[0] = e;throw new RuntimeException(e);}};try {scope.run(runner);} catch (RuntimeException re) {// 如果是我们包装抛出的受检异常则展开if (thrown[0] != null) {throw thrown[0];} else {throw re;}}}// 获取当前线程作用域中的 idpublic static Long getCurrentId() {if (!CURRENT_ID.isBound()){return null;}return CURRENT_ID.get();}// 判断当前线程作用域中是否有 idpublic static boolean hasCurrentId() {return CURRENT_ID.isBound();}
}
(三)VirtualThreadAsyncJwtFilter
@Component
@RequiredArgsConstructor
@Slf4j
public class VirtualThreadAsyncJwtFilter implements Filter {// 最大并发数private static final Semaphore semaphore = new Semaphore(5000);private static final long acquireTimeoutMs = 1000L;@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) {boolean acquired = false;try {// 尝试获取信号量许可,超时返回 503acquired = semaphore.tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();sendError((HttpServletResponse) response, HttpServletResponse.SC_SERVICE_UNAVAILABLE, "SERVER_BUSY", "服务器繁忙");return;}if (!acquired) {sendError((HttpServletResponse) response, HttpServletResponse.SC_SERVICE_UNAVAILABLE, "SERVER_BUSY", "服务器繁忙");return;}// 传递请求链try {chain.doFilter(request, response);} catch (ServletException | IOException e) {throw new RuntimeException(e);} finally {semaphore.release();}}/*** 统一发送 JSON 错误响应*/private void sendError(HttpServletResponse response, int status, String error, String message) {try {response.setStatus(status);response.setContentType(MediaType.APPLICATION_JSON_VALUE);response.setCharacterEncoding("UTF-8");long timestamp = System.currentTimeMillis();String safeError = escapeJson(error);String safeMessage = escapeJson(message);String json = String.format("{\"status\": %d, \"error\": \"%s\", \"message\": \"%s\", \"timestamp\": %d}",status, safeError, safeMessage, timestamp);response.getWriter().write(json);} catch (IOException e) {e.printStackTrace();}}/*** 简单 JSON 字符串转义*/private String escapeJson(String s) {if (s == null) return "";StringBuilder sb = new StringBuilder();for (char ch : s.toCharArray()) {switch (ch) {case '\\': sb.append("\\\\"); break;case '"': sb.append("\\\""); break;case '\b': sb.append("\\b"); break;case '\f': sb.append("\\f"); break;case '\n': sb.append("\\n"); break;case '\r': sb.append("\\r"); break;case '\t': sb.append("\\t"); break;default:if (ch <= 0x1F) sb.append(String.format("\\u%04x", (int) ch));else sb.append(ch);}}return sb.toString();}
}
~码文不易,留个赞再走吧~