不依赖第三方,不销毁重建,loveqq 框架如何原生实现动态线程池?
动态调整线程池参数是一个比较常见的需求,由此也衍生出一些专门的优秀框架,例如:dynamic-tp 框架。
但是总是要额外的引入第三方依赖,出现问题还要看源码,甚至咨询原作者,对于一些想要轻量级的用户来说略显麻烦了。
本次为大家介绍如何使用 loveqq-framework 框架,实现原生的动态线程池,而且不是销毁再重建的那种,而且是直接基于 java 原生线程池操作,代码简单清晰,出现问题易于排查。
1、创建简单的自定义线程池
@Component
@RefreshScope
public static class AsyncThreadPoolExecutor extends ThreadPoolExecutor {public AsyncThreadPoolExecutor() {super(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new DiscardPolicy());}
}
只是简单的继承即可,给一个默认的无参构造方法。并且加上 @RefreshScope 注解,表示配置文件刷新时,刷新该 bean 实例。
但是,默认的刷新策略是,将 bean 销毁,并且重建,这种操作对于线程池来说是不合适的,于是 loveqq-framework 提供了 ScopeRefreshed 接口,实现了该接口的非单例 bean,当其作用域变更时,将回调该接口,由开发者决定如何刷新 bean。
因此,需要做如下改造:
@Component
@RefreshScope
public static class AsyncThreadPoolExecutor extends ThreadPoolExecutor implements PropertyContextContextAware, ScopeRefreshed {/*** 属性上下文*/private GenericPropertiesContext propertiesContext;public AsyncThreadPoolExecutor() {super(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new DiscardPolicy());}@Overridepublic void setPropertyContext(GenericPropertiesContext propertiesContext) {this.propertiesContext = propertiesContext;}/*** 实现该接口,当作用域刷新时,将回调该接口,而不是销毁线程池再重建** @param applicationContext 应用上下文*/@Overridepublic void onRefreshed(ApplicationContext applicationContext) {this.setCorePoolSize(propertiesContext.getProperty("async.core", int.class));this.setMaximumPoolSize(propertiesContext.getProperty("async.max", int.class));}
}
2、编写线程池监控任务
@Component
public static class AsyncService implements InitializingBean {/*** 是否暂停提交任务*/private boolean pause;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;public void swatchPause() {this.pause = !this.pause;}@Overridepublic void afterPropertiesSet() {new Thread(new AsyncTask()).start();}/*** 用于监控线程池状态*/private class AsyncTask implements Runnable {@Overridepublic void run() {while (true) {if (!pause) {for (int i = 0; i < 150; i++) {threadPoolExecutor.execute(() -> CommonUtil.sleep(200));}}System.out.println(threadPoolExecutor); // 打印线程状态CommonUtil.sleep(1000);}}}
}
该 bean 主要目的是向线程池提交任务,并打印线程池执行状态
3、编写控制器,查看动态线程池效果
@RestController
public class Main {@Autowiredprivate AsyncService asyncService;/*** 暂停提交任务* 可观察线程池运行状态*/@GetMappingpublic String swatchPause() {asyncService.swatchPause();return "ok";}public static void main(String[] args) throws Exception {K.run(Main.class, args);}
}
到这里,如果使用的是 nacos 配置中心,那么就可以修改 nacos 配置,然后观察线程运行状态了。
如果没有使用 nacos 配置中心,那么也可以开放api,由接口控制,控制器如下:
@Autowiredprivate PropertyContext propertyContext;/*** 更新线程池核心数,最大线程数*/@GetMapping@SuppressWarnings("unchecked")public String updatePool(int core, int max) {propertyContext.setRefreshProperty(new Pair<>("async.core", String.valueOf(core)),new Pair<>("async.max", String.valueOf(max)));return "ok";}
最后就可以启动 main 方法,分别调用:
http://localhost:8080/updatePool?core=2&max=3http://localhost:8080/swatchPause
查看线程运行状态了
完整的代码如下:
package com.kfyty.demo;import com.kfyty.loveqq.framework.boot.K;
import com.kfyty.loveqq.framework.core.autoconfig.ApplicationContext;
import com.kfyty.loveqq.framework.core.autoconfig.InitializingBean;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.Autowired;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.BootApplication;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.Component;
import com.kfyty.loveqq.framework.core.autoconfig.annotation.RefreshScope;
import com.kfyty.loveqq.framework.core.autoconfig.aware.PropertyContextContextAware;
import com.kfyty.loveqq.framework.core.autoconfig.env.GenericPropertiesContext;
import com.kfyty.loveqq.framework.core.autoconfig.env.PropertyContext;
import com.kfyty.loveqq.framework.core.autoconfig.scope.ScopeRefreshed;
import com.kfyty.loveqq.framework.core.support.Pair;
import com.kfyty.loveqq.framework.core.utils.CommonUtil;
import com.kfyty.loveqq.framework.web.core.annotation.GetMapping;
import com.kfyty.loveqq.framework.web.core.annotation.RestController;
import com.kfyty.loveqq.framework.web.core.autoconfig.annotation.EnableWebMvc;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
@EnableWebMvc
@RestController
@BootApplication
public class Main {@Autowiredprivate PropertyContext propertyContext;@Autowiredprivate AsyncService asyncService;/*** 更新线程池核心数,最大线程数*/@GetMapping@SuppressWarnings("unchecked")public String updatePool(int core, int max) {propertyContext.setRefreshProperty(new Pair<>("async.core", String.valueOf(core)),new Pair<>("async.max", String.valueOf(max)));return "ok";}/*** 暂停提交任务* 可观察线程池运行状态*/@GetMappingpublic String swatchPause() {asyncService.swatchPause();return "ok";}@Component@RefreshScopepublic static class AsyncThreadPoolExecutor extends ThreadPoolExecutor implements PropertyContextContextAware, ScopeRefreshed {/*** 属性上下文*/private GenericPropertiesContext propertiesContext;public AsyncThreadPoolExecutor() {super(1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new DiscardPolicy());}@Overridepublic void setPropertyContext(GenericPropertiesContext propertiesContext) {this.propertiesContext = propertiesContext;}/*** 实现该接口,当作用域刷新时,将回调该接口,而不是销毁线程池再重建** @param applicationContext 应用上下文*/@Overridepublic void onRefreshed(ApplicationContext applicationContext) {this.setCorePoolSize(propertiesContext.getProperty("async.core", int.class));this.setMaximumPoolSize(propertiesContext.getProperty("async.max", int.class));}}@Componentpublic static class AsyncService implements InitializingBean {/*** 是否暂停提交任务*/private boolean pause;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;public void swatchPause() {this.pause = !this.pause;}@Overridepublic void afterPropertiesSet() {new Thread(new AsyncTask()).start();}/*** 用于监控线程池状态*/private class AsyncTask implements Runnable {@Overridepublic void run() {while (true) {if (!pause) {for (int i = 0; i < 150; i++) {threadPoolExecutor.execute(() -> CommonUtil.sleep(200));}}System.out.println(threadPoolExecutor); // 打印线程状态CommonUtil.sleep(1000);}}}}public static void main(String[] args) throws Exception {K.run(Main.class, args);}
}
感兴趣的同学可以 gitee/gitcode/github 看一下