JAVA将一个同步方法改为异步执行
目的:
这么做的目的就是为了使一个高频率执行的方法能不阻塞整个程序,将该方法丢入到线程池中让线程去做异步执行,既提高了程序整体运行速度,也使得在高并发环境下程序能够更加健壮(同步执行可能会使得请求堆积以致系统瘫痪,而异步的话,并发过高消耗完默认线程最大值后,后面来的请求直接执行拒绝策略就可以了)
背景:
项目中有一个监控推送的方法,每5分钟执行一次,且随着堆积数据量越来越大,推送时做的一些数据库交互逻辑耗时越来越长,平台统计的该组件交易耗时就越来越高。
老大肯定是不希望每周平台周报公布时自己管理的组件数据不好看的,且这是一个监控功能,不影响业务,那就更不该喧宾夺主了。综上所述决定将其方法由同步改成异步。
代码:
先定义一个单例模式的线程池管理器MonitorExecutor类。
/*** class MonitorExecutor*/
public class MonitorExecutor implements Disposable {private static final Logger log = LoggerFactory.getLogger(MonitorExecutor.class);private static final Object MONITOR_LOCK = new Object();private volatile ExecutorService monitorProcessor;private static class SingletonHolder {private static final MonitorExecutor INSTANCE = new MonitorExecutor();}public static MonitorExecutor getInstance() {return SingletonHolder.INSTANCE;}private ExecutorService getMonitorProcessor() {if (monitorProcessor == null) {synchronized (MONITOR_LOCK) {if (monitorProcessor == null) {MonitorPoolConfig config = Configuration.loader(MonitorPoolConfig.class);int minPoolSize = config.getMinPoolSize();int maxPoolSize = config.getMaxPoolSize();monitorProcessor = new ThreadPoolExecutor(minPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,new SynchronousQueue<>(),new NamedThreadFactory("monitor-processing-thread"),new ThreadPoolExecutor.CallerRunsPolicy());ShutdownHook.getInstance().addDisposable(this);log.info("Initialize the monitor processing thread pool, minPoolSize {}, maxPoolSize {}", minPoolSize, maxPoolSize);}}}return monitorProcessor;}private static ExecutorService getExecutor(){return getInstance().getMonitorProcessor();}public static void execute(Runnable cmd) {getExecutor().execute(cmd);}@Overridepublic void destroy() {if (monitorProcessor != null) {monitorProcessor.shutdown();}}
}
定义一个线程池大小配置类MonitorPoolConfig,可用默认值也可在application里动态设置。关于@Config动态读取配置文件请参见我另一篇文章。
/*** class MonitorPoolConfig*/
@Data
@ConfigPrefix(ConfKeyConstants.SERVER_MONITOR_POOL)
public class MonitorPoolConfig {@ConfigMapping(key = "minPoolSize", def = "5")private Integer minPoolSize;@ConfigMapping(key = "maxPoolSize", def = "200")private Integer maxPoolSize;}
OK准备工作已做完,现在先来看一下优化前的方法代码:
public boolean save(MonitorInfoDTO dto) {if (dto == null) {return false;}// 推送的四个步骤,我想将前三个步骤改成异步,最后一个推送错误告警的依然同步progressDoService.saveProgress(converter.toProgressEntities(dto.getProgresses()));seqGenStatusDoService.saveStatus(converter.toSeqGenStatusEntities(dto.getSeqGenstatuses()));workerIdDoService.saveWorkerIds(converter.toWorkerIdEntities(dto.getWorkerIdInfos()));failedSeqDoService.saveFailedSeqs(converter.toFailedSeqEntities(dto.getFailedSeqs()));return true;}
优化改造后:
// 将异步执行抽象出一个方法private <T> void saveAsync(List<T> list, Function<List<T>, Boolean> func) {MonitorExecutor.execute(() -> {func.apply(list);});}public boolean save(MonitorInfoDTO dto) {if (dto == null) {return false;}saveAsync(converter.toProgressEntities(dto.getProgresses()),progresses -> progressDoService.saveProgress(progresses));saveAsync(converter.toSeqGenStatusEntities(dto.getSeqGenstatuses()),statuses -> seqGenStatusDoService.saveStatus(statuses));saveAsync(converter.toWorkerIdEntities(dto.getWorkerIdInfos()),workerIdInfos -> workerIdDoService.saveWorkerIds(workerIdInfos));failedSeqDoService.saveFailedSeqs(converter.toFailedSeqEntities(dto.getFailedSeqs()));return true;}
在saveAsync里可以随意定制异步执行时的共用操作,就像切面思维一样,例如:
// 将异步执行抽象出一个方法private <T> void saveAsync(List<T> list, Function<List<T>, Boolean> func) {final String systemId = SystemContext.getSystemId();final String tenantId = SystemContext.getTenantId();MonitorExecutor.execute(() -> {SystemContext.setSystemId(systemId);SystemContext.setTenantId(tenantId);try {func.apply(list);} finally {SystemContext.clear();}});}
优化完毕。
线程池管理器的AI解析说明:
这段代码定义了一个名为 MonitorExecutor 的类,它是一个单例模式的线程池管理器,用于处理监控任务。以下是对代码的详细分析:
- 类和接口
类名:MonitorExecutor
实现接口:Disposable,表示该类实现了资源释放接口,用于在程序关闭时释放资源。 - 日志记录
使用了 org.slf4j.Logger 和 LoggerFactory 来记录日志,便于调试和监控。 - 单例模式
使用了双重检查锁定(Double-Check Locking)的单例模式实现:
静态内部类 SingletonHolder 包含一个静态实例 INSTANCE,确保线程安全。
getInstance() 方法返回单例实例。 - 线程池初始化
线程池的初始化:
使用了 ThreadPoolExecutor 来创建线程池。
配置参数从 MonitorPoolConfig 类中加载,包括最小线程数 minPoolSize 和最大线程数 maxPoolSize。
使用了 SynchronousQueue 作为任务队列,这意味着任务会直接提交给线程执行,如果没有空闲线程,则会拒绝任务。
使用了 NamedThreadFactory 来为线程池中的线程命名,便于调试和监控。
使用了 CallerRunsPolicy 作为拒绝策略,当任务被拒绝时,会在调用线程中执行任务。 - 线程池的生命周期管理
初始化:
在 getMonitorProcessor() 方法中,如果 monitorProcessor 为 null,则会初始化线程池。
初始化后,将当前实例添加到 ShutdownHook 中,确保在程序关闭时能够正确释放资源。
销毁:
实现了 Disposable 接口的 destroy() 方法,在程序关闭时调用 shutdown() 方法来关闭线程池。 - 任务提交
提供了 execute(Runnable cmd) 方法,用于提交任务到线程池中执行。 - 代码结构
线程安全:使用了 synchronized 块和双重检查锁定来确保线程池的初始化是线程安全的。
日志记录:在初始化线程池时记录了日志,便于监控线程池的初始化状态。
资源管理:通过实现 Disposable 接口,确保在程序关闭时能够正确释放线程池资源。 - 改进建议
线程池配置:
当前使用了 SynchronousQueue,这可能导致任务提交时的阻塞。如果任务提交频率较高,可以考虑使用其他类型的阻塞队列,如 LinkedBlockingQueue。
日志记录:
在 destroy() 方法中添加日志记录,记录线程池的销毁状态。
异常处理:
在 execute(Runnable cmd) 方法中添加异常处理逻辑,确保任务提交失败时能够记录错误信息。 - 示例用法
public class Main {public static void main(String[] args) {// 提交任务到线程池MonitorExecutor.execute(() -> {System.out.println("Task is running in thread: " + Thread.currentThread().getName());});// 程序关闭时释放资源Runtime.getRuntime().addShutdownHook(new Thread(() -> {MonitorExecutor.getInstance().destroy();}));}
}
通过这段代码,你可以更好地理解和使用 MonitorExecutor 类来管理监控任务的线程池。
OK码完收工,THX!