当前位置: 首页 > news >正文

不依赖第三方,不销毁重建,loveqq 框架如何原生实现动态线程池?

动态调整线程池参数是一个比较常见的需求,由此也衍生出一些专门的优秀框架,例如:dynamic-tp 框架。

但是总是要额外的引入第三方依赖,出现问题还要看源码,甚至咨询原作者,对于一些想要轻量级的用户来说略显麻烦了。

本次为大家介绍如何使用 loveqq-framework 框架,实现原生的动态线程池,而且不是销毁再重建的那种,而且是直接基于 java 原生线程池操作,代码简单清晰,出现问题易于排查。

1、创建简单的自定义线程池

@Component
@RefreshScope
public static class AsyncThreadPoolExecutor extends ThreadPoolExecutor {public AsyncThreadPoolExecutor() {super(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new DiscardPolicy());}
}

只是简单的继承即可,给一个默认的无参构造方法。并且加上 @RefreshScope 注解,表示配置文件刷新时,刷新该 bean 实例。

但是,默认的刷新策略是,将 bean 销毁,并且重建,这种操作对于线程池来说是不合适的,于是 loveqq-framework 提供了 ScopeRefreshed 接口,实现了该接口的非单例 bean,当其作用域变更时,将回调该接口,由开发者决定如何刷新 bean。

因此,需要做如下改造:

@Component
@RefreshScope
public static class AsyncThreadPoolExecutor extends ThreadPoolExecutor implements PropertyContextContextAware, ScopeRefreshed {/*** 属性上下文*/private GenericPropertiesContext propertiesContext;public AsyncThreadPoolExecutor() {super(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new DiscardPolicy());}@Overridepublic void setPropertyContext(GenericPropertiesContext propertiesContext) {this.propertiesContext = propertiesContext;}/*** 实现该接口,当作用域刷新时,将回调该接口,而不是销毁线程池再重建** @param applicationContext 应用上下文*/@Overridepublic void onRefreshed(ApplicationContext applicationContext) {this.setCorePoolSize(propertiesContext.getProperty("async.core", int.class));this.setMaximumPoolSize(propertiesContext.getProperty("async.max", int.class));}
}

2、编写线程池监控任务

@Component
public static class AsyncService implements InitializingBean {/*** 是否暂停提交任务*/private boolean pause;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;public void swatchPause() {this.pause = !this.pause;}@Overridepublic void afterPropertiesSet() {new Thread(new AsyncTask()).start();}/*** 用于监控线程池状态*/private class AsyncTask implements Runnable {@Overridepublic void run() {while (true) {if (!pause) {for (int i = 0; i < 150; i++) {threadPoolExecutor.execute(() -> CommonUtil.sleep(200));}}System.out.println(threadPoolExecutor);                                     // 打印线程状态CommonUtil.sleep(1000);}}}
}

该 bean 主要目的是向线程池提交任务,并打印线程池执行状态

3、编写控制器,查看动态线程池效果

@RestController
public class Main {@Autowiredprivate AsyncService asyncService;/*** 暂停提交任务* 可观察线程池运行状态*/@GetMappingpublic String swatchPause() {asyncService.swatchPause();return "ok";}public static void main(String[] args) throws Exception {K.run(Main.class, args);}
}

到这里,如果使用的是 nacos 配置中心,那么就可以修改 nacos 配置,然后观察线程运行状态了。

如果没有使用 nacos 配置中心,那么也可以开放api,由接口控制,控制器如下:

    @Autowiredprivate PropertyContext propertyContext;/*** 更新线程池核心数,最大线程数*/@GetMapping@SuppressWarnings("unchecked")public String updatePool(int core, int max) {propertyContext.setRefreshProperty(new Pair<>("async.core", String.valueOf(core)),new Pair<>("async.max", String.valueOf(max)));return "ok";}

最后就可以启动 main 方法,分别调用:

http://localhost:8080/updatePool?core=2&max=3http://localhost:8080/swatchPause

查看线程运行状态了

完整的代码如下:

package com.kfyty.demo;import com.kfyty.loveqq.framework.boot.K;
import com.kfyty.loveqq.framework.core.autoconfig.ApplicationContext;
import com.kfyty.loveqq.framework.core.autoconfig.InitializingBean;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.Autowired;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.BootApplication;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.Component;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.RefreshScope;
import com.kfyty.loveqq.framework.core.autoconfig.aware.PropertyContextContextAware;
import com.kfyty.loveqq.framework.core.autoconfig.env.GenericPropertiesContext;
import com.kfyty.loveqq.framework.core.autoconfig.env.PropertyContext;
import com.kfyty.loveqq.framework.core.autoconfig.scope.ScopeRefreshed;
import com.kfyty.loveqq.framework.core.support.Pair;
import com.kfyty.loveqq.framework.core.utils.CommonUtil;
import com.kfyty.loveqq.framework.web.core.annotation.GetMapping;
import com.kfyty.loveqq.framework.web.core.annotation.RestController;
import com.kfyty.loveqq.framework.web.core.autoconfig.annotation.EnableWebMvc;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
@EnableWebMvc
@RestController
@BootApplication
public class Main {@Autowiredprivate PropertyContext propertyContext;@Autowiredprivate AsyncService asyncService;/*** 更新线程池核心数,最大线程数*/@GetMapping@SuppressWarnings("unchecked")public String updatePool(int core, int max) {propertyContext.setRefreshProperty(new Pair<>("async.core", String.valueOf(core)),new Pair<>("async.max", String.valueOf(max)));return "ok";}/*** 暂停提交任务* 可观察线程池运行状态*/@GetMappingpublic String swatchPause() {asyncService.swatchPause();return "ok";}@Component@RefreshScopepublic static class AsyncThreadPoolExecutor extends ThreadPoolExecutor implements PropertyContextContextAware, ScopeRefreshed {/*** 属性上下文*/private GenericPropertiesContext propertiesContext;public AsyncThreadPoolExecutor() {super(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new DiscardPolicy());}@Overridepublic void setPropertyContext(GenericPropertiesContext propertiesContext) {this.propertiesContext = propertiesContext;}/*** 实现该接口,当作用域刷新时,将回调该接口,而不是销毁线程池再重建** @param applicationContext 应用上下文*/@Overridepublic void onRefreshed(ApplicationContext applicationContext) {this.setCorePoolSize(propertiesContext.getProperty("async.core", int.class));this.setMaximumPoolSize(propertiesContext.getProperty("async.max", int.class));}}@Componentpublic static class AsyncService implements InitializingBean {/*** 是否暂停提交任务*/private boolean pause;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;public void swatchPause() {this.pause = !this.pause;}@Overridepublic void afterPropertiesSet() {new Thread(new AsyncTask()).start();}/*** 用于监控线程池状态*/private class AsyncTask implements Runnable {@Overridepublic void run() {while (true) {if (!pause) {for (int i = 0; i < 150; i++) {threadPoolExecutor.execute(() -> CommonUtil.sleep(200));}}System.out.println(threadPoolExecutor);                                     // 打印线程状态CommonUtil.sleep(1000);}}}}public static void main(String[] args) throws Exception {K.run(Main.class, args);}
}

感兴趣的同学可以 gitee/gitcode/github 看一下


文章转载自:

http://Pqgx853t.pbzgj.cn
http://2qy7XzfK.pbzgj.cn
http://5z8Xp9aL.pbzgj.cn
http://dFsKIWta.pbzgj.cn
http://o6xR6j23.pbzgj.cn
http://R4nrYYxj.pbzgj.cn
http://HcxxVdJ4.pbzgj.cn
http://4In9KOZQ.pbzgj.cn
http://UbLFQnZy.pbzgj.cn
http://qFZ62Rr2.pbzgj.cn
http://BLLlNw0Q.pbzgj.cn
http://resQI11D.pbzgj.cn
http://KzhRF4d3.pbzgj.cn
http://txaxCQgK.pbzgj.cn
http://SNQ4Tfwx.pbzgj.cn
http://J09M1s73.pbzgj.cn
http://1vuz9P2O.pbzgj.cn
http://IXu2HFkj.pbzgj.cn
http://o4H2DJf4.pbzgj.cn
http://1cypxRAw.pbzgj.cn
http://2SjUZna8.pbzgj.cn
http://syEEBFpX.pbzgj.cn
http://D9WoKTWN.pbzgj.cn
http://iD3z2mKw.pbzgj.cn
http://j4T498VY.pbzgj.cn
http://c2Tg9bIb.pbzgj.cn
http://ayXcU8zk.pbzgj.cn
http://KJkDO18I.pbzgj.cn
http://jjvcUj7e.pbzgj.cn
http://EFytsNOi.pbzgj.cn
http://www.dtcms.com/a/388410.html

相关文章:

  • Python中正则的三个基础方法
  • 最外层的项目没有父pom配置文件,有很多子模块(maven项目)导入idea中,左侧模块显示不全问题解决
  • 前端将一个 DOM 元素滚动到视口顶部
  • 前端-防重复点击/防抖的方案
  • doris数据库问题
  • PyQt5中实现只读QLineEdit控件的完整指南
  • 金融工程vs金融数学:谁更贴近量化交易?
  • LeetCode 167.两数之和 II - 输入有序数组
  • 小杰机器学习高级(one)——激活函数——sigmoid、tanh、Relu、Leaky Relu、Prelu、ELU、softmax
  • OpenAI原生调用 vs LangChain调用方式的关系
  • [Token剪枝]Token Cropr: 针对众多任务的更快ViT, CVPR2025
  • NW725NW743美光固态闪存NW727NW734
  • 【Linux】归档、压缩、用户管理
  • Lattice FPGA 开发流程(以 ECP5 为例)
  • 大模型实战应用指南:从GPT-4.5到LLaMA的行业解决方案
  • 告别人工标注瓶颈!Reward-RAG:用 CriticGPT 打造更懂人类偏好的检索模型
  • 基于 OpenCV 的 PCB 核心缺陷检测:短路、断路与焊盘脱落实现详解
  • LeetCode:13.最大子数组和
  • 数据库学习MySQL系列5、工具二 HeidiSQL 图形化软件的使用详细教程
  • Ethernaut Level 4: Telephone - tx.origin vs msg.sender 身份验证绕过
  • RWA开启数字时代的文化价值新纪元
  • 【Redis】-- 分布式锁
  • 分布式拜占庭容错算法——实现工作量证明(PoW)算法详解
  • 基础介绍(Solidity、Polkadot)
  • 【Axure高保真原型】智慧水利可视化分析案例
  • oracle的sql语句中 a=b(+),代表什么意思
  • 联邦学习论文分享:
  • Linux渗透中group的利用
  • Linux:基础开发工具
  • 数据结构----链表