当前位置: 首页 > news >正文

跟进 JDK25:将虚拟线程安全引入生产的权衡与实战

目录

一、为什么要更新成虚拟线程?

二、方案

(一)在 Filter 内部创建虚拟线程

(二)让 Tomcat 在创建请求线程时使用虚拟线程

三、结论

四、代码实现

(一)HttpsVirtualThreadConfig

(二)BaseContext

(三)VirtualThreadAsyncJwtFilter


一、为什么要更新成虚拟线程?

为了最大限度地利用 CPU,避免平台线程被 IO 阻塞,从而提高应用程序的整体吞吐量,尤其是在大量 IO 密集型请求的场景下可以显著提升并发处理能力。

有关虚拟线程相关的知识可以先看这篇博客:

带你轻松学习虚拟线程和StructuredTaskScope-CSDN博客https://blog.csdn.net/2401_88959292/article/details/150288495?spm=1001.2014.3001.5501

注意:使用虚拟线程并不一定会降低单个请求的响应延迟,它提升的是并发处理能力(吞吐量 / QPS),而非单请求的延迟。


二、方案

(一)在 Filter 内部创建虚拟线程

一种思路是新建一个 Filter,在 Filter 中把请求处理包装成一个任务交给虚拟线程执行。
但直接开启裸的虚拟线程并不规范,例如:

Thread.ofVirtual().start(() -> {});

这样做的问题有两点:无法方便地监控任务生命周期,也不易获取任务返回值或处理异常。

于是常见的做法是使用Future:

Future<TaskResult> future = service.submit(() -> {});
TaskResult result = future.get();

但 future.get() 会阻塞当前线程——如果这个线程是平台线程,就又回到了阻塞平台线程的问题上。要利用虚拟线程的优势,必须避免阻塞平台线程。

改进方向包括使用 CompletableFuture 异步回调来避免直接阻塞平台线程,但回调式编程带来的问题是回调时机不可控、代码复杂且难以统一管理任务的生命周期。

因此更理想的是使用结构化并发来组织这些异步任务:它能提供更可控的生命周期管理、错误聚合与超时处理,且能和虚拟线程很好地配合。

不过JDK25中,结构化并发仍旧属于预览特性,所以用在线上项目极不安全,因此我们暂时放弃了该方案。

(二)让 Tomcat 在创建请求线程时使用虚拟线程

既然手动把工作包装成任务不够稳定,另一个做法是让Tomcat为每个请求直接使用虚拟线程执行,即为Tomcat的请求处理器配置一个虚拟线程,例如通过Executors.newThreadPerTaskExecutor来替换默认的线程池。这样,容器分配的执行线程本身就是虚拟线程,从而避免了过滤器里把平台线程包装后再转发的问题。

但是newThreadPerTaskExecutor并不是传统意义上的固定大小线程池,没有核心/最大线程数的界限,如果并发量失控,会对DB连接池等下游资源造成压力。

因此仍然需要在应用层做并发限制——例如在最外层加一个高优先级的信号量Semaphore的 Filter,用于控制最大并发数,避免耗尽DB连接或触发OOM。这个Filter只负责发放许可,并在请求完成后释放许可。


三、结论

不过我们的项目中肯定不少CPU密集型任务,这类逻辑并不适用虚拟线程。所以方案二这种全量变为虚拟线程的方案肯定还是不稳妥的。

最佳实践仍旧是方案一,待结构化并发正式发布后,我们应该使用方案一的思路,对于封装成虚拟线程的Filter的排除路径中加上CPU密集型任务接口的路径,让这些接口依旧使用平台线程。

最后对于Xxl-Job定时任务调度、MQ调度这种中间件自己调度的线程,它们依旧还是平台线程,因此我们可以写个自定义注解,在注解中使用结构化并发,将pjp.proceed()包装成一个任务交给结构化并发管理进行虚拟线程调度执行,这样灵活性也高,对于CPU密集型任务则不加该注解即可。

但是由于结构化并发仍旧为预览特性,而且对于Web项目的绝大多数接口来说都是以IO密集型任务为主,所以这里我们只展示方案二的代码实现。


四、代码实现

(一)HttpsVirtualThreadConfig

@Configuration
public class HttpsVirtualThreadConfig {@Value("${quick-ssl.http-port:8080}")private Integer httpPort;@Beanpublic TomcatServletWebServerFactory tomcatServletWebServerFactory() {TomcatServletWebServerFactory factory = new TomcatServletWebServerFactory();factory.addAdditionalTomcatConnectors(httpConnector());return factory;}@Beanpublic Connector httpConnector() {Connector connector = new Connector("org.apache.coyote.http11.Http11NioProtocol");connector.setPort(httpPort);// 给这个connector单独设置虚拟线程池connector.addLifecycleListener(event -> {// 在创建初期使用if ("after_start".equals(event.getType())) {var protocolHandler = connector.getProtocolHandler();ThreadFactory factory = Thread.ofVirtual().name("VirtualThread#", 1).factory();protocolHandler.setExecutor(Executors.newThreadPerTaskExecutor(factory));}});return connector;}}

(二)BaseContext

public final class BaseContext {public static final ScopedValue<Long> CURRENT_ID = ScopedValue.newInstance();// 允许抛受检异常@FunctionalInterfacepublic interface ThrowingRunnable {void run() throws Exception;}public static void runWithCurrentId(Long id, ThrowingRunnable action) throws Exception {Objects.requireNonNull(action);var scope = ScopedValue.where(CURRENT_ID, id);// Scope不允许抛受检异常,因此我们包装并在异常时向外抛final Exception[] thrown = new Exception[1];Runnable runner = () -> {try {action.run();} catch (Exception e) {thrown[0] = e;throw new RuntimeException(e);}};try {scope.run(runner);} catch (RuntimeException re) {// 如果是我们包装抛出的受检异常则展开if (thrown[0] != null) {throw thrown[0];} else {throw re;}}}// 获取当前线程作用域中的 idpublic static Long getCurrentId() {if (!CURRENT_ID.isBound()){return null;}return CURRENT_ID.get();}// 判断当前线程作用域中是否有 idpublic static boolean hasCurrentId() {return CURRENT_ID.isBound();}
}

(三)VirtualThreadAsyncJwtFilter

@Component
@RequiredArgsConstructor
@Slf4j
public class VirtualThreadAsyncJwtFilter implements Filter {// 最大并发数private static final Semaphore semaphore = new Semaphore(5000);private static final long acquireTimeoutMs = 1000L;@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) {boolean acquired = false;try {// 尝试获取信号量许可,超时返回 503acquired = semaphore.tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();sendError((HttpServletResponse) response, HttpServletResponse.SC_SERVICE_UNAVAILABLE, "SERVER_BUSY", "服务器繁忙");return;}if (!acquired) {sendError((HttpServletResponse) response, HttpServletResponse.SC_SERVICE_UNAVAILABLE, "SERVER_BUSY", "服务器繁忙");return;}// 传递请求链try {chain.doFilter(request, response);} catch (ServletException | IOException e) {throw new RuntimeException(e);} finally {semaphore.release();}}/*** 统一发送 JSON 错误响应*/private void sendError(HttpServletResponse response, int status, String error, String message) {try {response.setStatus(status);response.setContentType(MediaType.APPLICATION_JSON_VALUE);response.setCharacterEncoding("UTF-8");long timestamp = System.currentTimeMillis();String safeError = escapeJson(error);String safeMessage = escapeJson(message);String json = String.format("{\"status\": %d, \"error\": \"%s\", \"message\": \"%s\", \"timestamp\": %d}",status, safeError, safeMessage, timestamp);response.getWriter().write(json);} catch (IOException e) {e.printStackTrace();}}/*** 简单 JSON 字符串转义*/private String escapeJson(String s) {if (s == null) return "";StringBuilder sb = new StringBuilder();for (char ch : s.toCharArray()) {switch (ch) {case '\\': sb.append("\\\\"); break;case '"': sb.append("\\\""); break;case '\b': sb.append("\\b"); break;case '\f': sb.append("\\f"); break;case '\n': sb.append("\\n"); break;case '\r': sb.append("\\r"); break;case '\t': sb.append("\\t"); break;default:if (ch <= 0x1F) sb.append(String.format("\\u%04x", (int) ch));else sb.append(ch);}}return sb.toString();}
}

~码文不易,留个赞再走吧~

http://www.dtcms.com/a/407706.html

相关文章:

  • 网站浏览历史怎么查看欧美网站建设公司排名
  • 四平网站建设哪家效果好wordpress 中文博客主题
  • 小灯题目!!
  • RK3588:MIPI底层驱动学习——入门第三篇(IIC与V4L2如何共存?)
  • 去哪儿面经
  • 网站建设和维护合同iis7 网站打不开
  • EPLAN2022部件库导入步骤
  • 微信网站建设报价表网页设计文案
  • < 自用文 备份 script :intar.sh> 使用 tar cvfz 命令打包成 .tar.gz 文件来备份多目标(目录,文件)
  • 网站建设asp做家具的网站
  • 高校网站建设的时效性浅谈网站建设
  • 卫星互联网商用落地的三个条件
  • 网站分析流程全美网站建设公司
  • 中讯高科网站建设襄阳网站推广优化技巧
  • 中值滤波器原理及C++实现
  • 【 GUI自动化测试】pywinauto 常见操作
  • SQL SELECT 语句怎么用?COMPANY 表查询案例(含条件 / 模糊 / 分页)
  • 北京网站建设公司降龙手机创建个人网站 免费
  • 朝阳企业网站建设方案高端办公室装修公司
  • 写网站开发代码注册公司地址可以是住宅吗
  • 网站 底部网站建设做得好
  • 老字号传承,达尔优AE6电竞鼠标!熟悉的味道,时代的配方
  • 通过你的自有服务器代理网址
  • 智能手机背面缺陷检测数据集VOC+YOLO格式5203张5类别
  • 太乙笔记全文
  • 成功网站管理系统个人网页设计硬件需求
  • 淄博网站排名优化公司网站建设进度总结
  • 想建设一个网站济南建站
  • 网站地区词优化智能网站推广优化
  • python做网站的多吗企业做网站价格