SpringBoot自定义EndPoint实现线程池动态管理
在 Spring Boot 中,@Endpoint
注解用于创建自定义的管理EndPoint(Actuator Endpoint)。Actuator EndPoint是 Spring Boot 提供的一种功能,用于暴露应用程序的不同信息,例如应用程序的健康状况、配置信息、环境属性等。通过自定义的Actuator EndPoint,可以向应用程序添加自定义的管理功能。
依赖引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
自定义EndPoint
@EndPoint 注解
自定义 EndPoint 可以声明一个类,用 @EndPoint
注解进行修饰,并声明 id 信息。@Endpoint
声明该类是一个 Actuator EndPoint。
EndPoint 的 id
id
是一个 Actuator EndPoint 的标识, id 的定义需要符合 EndpointId 的规则。
- 不可为空
- 只能包含字母和数字
- 不能以数字开头
- 不能以大写字母开头
- 可以包含
.
和-
,但是会输出警告日志 - 忽略大小写和语法字符
private static final Pattern VALID_PATTERN = Pattern.compile("[a-zA-Z0-9.-]+");private static final Pattern WARNING_PATTERN = Pattern.compile("[.-]+");private EndpointId(String value) {//不可为空Assert.hasText(value, "Value must not be empty");//必须符合a-z,A-Z,0-9,-,.Assert.isTrue(VALID_PATTERN.matcher(value).matches(), "Value must only contain valid chars");//不能以数字开头Assert.isTrue(!Character.isDigit(value.charAt(0)), "Value must not start with a number");//不能以大写字母开头Assert.isTrue(!Character.isUpperCase(value.charAt(0)), "Value must not start with an uppercase letter");//可以包含. - 但是会输出警告日志if (WARNING_PATTERN.matcher(value).find()) {logWarning(value);}this.value = value;//转换成小写,其实是忽略大小写this.lowerCaseValue = value.toLowerCase(Locale.ENGLISH);this.lowerCaseAlphaNumeric = getAlphaNumerics(this.lowerCaseValue);
}
操作注解
@ReadOperation
:用来标记 EndPoint 方法仅支持只读操作。
@WriteOperation
:用于标记 EndPoint 方法支持写操作。写操作是指可能会对应用程序状态进行更改的操作,例如重启应用程序、清除缓存等。
@DeleteOperation
:用于标记 EndPoint 方法支持删除操作,通常用于清除缓存、删除临时文件等。
@EndpointWebExtension
:用于扩展 Web EndPoint的功能,通常与@Endpoint
注解一起使用,并且方法可以使用HTTP请求的其他部分(例如,路径变量)来接收输入参数。
@EndpointJmxExtension
:用于扩展JMX EndPoint的功能,类似于@EndpointWebExtension
,但是用于JMX EndPoint。
代码示例
以下是我定义的一个线程池管理的 EndPoint,主要通过 ThreadPoolRegistry 获取注册在 ThreadPoolRegistry 中的所有线程池的信息,并可以针对线程池进行查询、调整核心线程和最大线程数、关闭线程池等操作。
import com.moon.cloud.threadpool.endpoint.dto.ThreadPoolInfoDTO;
import com.moon.cloud.threadpool.registry.ThreadPoolRegistry;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.endpoint.annotation.*;
import org.springframework.stereotype.Component;import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;@Slf4j
@Endpoint(id = "threadpools")
public class MoonThreadPoolEndpoint {/*** 获取所有的线程池信息* @return*/@ReadOperationpublic Map<String, Object> threadPools() {try {Set<String> poolNames = ThreadPoolRegistry.getAllPoolNames();List<ThreadPoolInfoDTO> poolInfos = poolNames.stream().map(poolName -> {ThreadPoolExecutor executor = ThreadPoolRegistry.getExecutor(poolName);return executor != null ? buildThreadPoolInfoDTO(poolName, executor) : null;}).filter(Objects::nonNull).collect(Collectors.toList());Map<String, Object> result = new HashMap<>();result.put("total", poolInfos.size());result.put("pools", poolInfos);return result;} catch (Exception e) {log.error("获取线程池列表失败", e);Map<String, Object> result = new HashMap<>();result.put("msg", "获取线程池列表失败");result.put("error", e.getMessage());return null;}}/*** 构建线程池信息DTO* @param poolName 线程池名称* @param executor 线程池执行器* @return 线程池信息DTO*/private ThreadPoolInfoDTO buildThreadPoolInfoDTO(String poolName, ThreadPoolExecutor executor) {ThreadPoolInfoDTO info = new ThreadPoolInfoDTO();info.setPoolName(poolName);info.setCorePoolSize(executor.getCorePoolSize());info.setMaximumPoolSize(executor.getMaximumPoolSize());info.setActiveCount(executor.getActiveCount());info.setPoolSize(executor.getPoolSize());info.setLargestPoolSize(executor.getLargestPoolSize());info.setTaskCount(executor.getTaskCount());info.setCompletedTaskCount(executor.getCompletedTaskCount());info.setQueueSize(executor.getQueue().size());info.setQueueRemainingCapacity(executor.getQueue().remainingCapacity());info.setShutdown(executor.isShutdown());info.setTerminated(executor.isTerminated());info.setTerminating(executor.isTerminating());// 计算线程池使用率double utilizationRate = executor.getMaximumPoolSize() > 0 ?(double) executor.getActiveCount() / executor.getMaximumPoolSize() * 100 : 0;info.setUtilizationRate(String.format("%.2f%%", utilizationRate));// 计算队列使用率int totalQueueCapacity = executor.getQueue().size() + executor.getQueue().remainingCapacity();double queueUtilizationRate = totalQueueCapacity > 0 ?(double) executor.getQueue().size() / totalQueueCapacity * 100 : 0;info.setQueueUtilizationRate(String.format("%.2f%%", queueUtilizationRate));return info;}/*** 获取线程池信息* @param poolName* @return*/@ReadOperationpublic Map<String, Object> threadPool(@Selector @Nullable String poolName) {Map<String, Object> result = new HashMap<>();try {if (poolName == null) {result.put("msg", "线程池名称不能为空");return result;}ThreadPoolExecutor executor = ThreadPoolRegistry.getExecutor(poolName);if (executor == null) {result.put("msg", String.format("线程池[%s]不存在", poolName));return result;}ThreadPoolInfoDTO info = buildThreadPoolInfoDTO(poolName, executor);result.put("info", info);return result;} catch (Exception e) {log.error("获取线程池详情失败: {}", poolName, e);result.put("msg", String.format("获取线程池[%s]详情失败", poolName));result.put("error", e.getMessage());return result;}}/*** 调整线程池参数* @param poolName* @param corePoolSize* @param maximumPoolSize* @return*/@WriteOperationpublic Map<String, Object> adjustThreadPool(@Selector @Nullable String poolName,@Selector @Nullable Integer corePoolSize,@Selector @Nullable Integer maximumPoolSize) {Map<String, Object> result = new HashMap<>();// 动态调整线程池参数try {if (poolName == null || corePoolSize == null || maximumPoolSize == null) {result.put("msg", "参数不能为空");return result;}ThreadPoolExecutor executor = ThreadPoolRegistry.getExecutor(poolName);if (executor == null) {result.put("msg", String.format("线程池[%s]不存在", poolName));return result;}if (corePoolSize <= 0) {result.put("msg", "核心线程数必须大于0");return result;}if (corePoolSize > executor.getMaximumPoolSize()) {result.put("msg", "核心线程数不能大于最大线程数");return result;}int oldCorePoolSize = executor.getCorePoolSize();executor.setCorePoolSize(corePoolSize);log.info("线程池 {} 核心线程数已从 {} 调整为 {}", poolName, oldCorePoolSize, corePoolSize);int oldMaximumPoolSize = executor.getMaximumPoolSize();executor.setMaximumPoolSize(maximumPoolSize);log.info("线程池 {} 最大线程数已从 {} 调整为 {}", poolName, oldMaximumPoolSize, maximumPoolSize);result.put("poolName", poolName);result.put("oldCorePoolSize", oldCorePoolSize);result.put("newCorePoolSize", corePoolSize);result.put("oldMaximumPoolSize", oldMaximumPoolSize);result.put("newMaximumPoolSize", maximumPoolSize);result.put("msg", "线程核心线程数和最大线程数调整成功!");return result;} catch (Exception e) {log.error("调整线程池{}核心线程数失败: ", poolName, e);result.put("msg", "调整核心线程数失败");result.put("error", e.getMessage());return result;}}/*** 关闭线程* @param poolName* @return*/@DeleteOperationpublic Map<String, Object> shutdownThreadPool(@Selector @Nullable String poolName) {Map<String, Object> result = new HashMap<>();try {if (poolName == null) {result.put("msg", "线程池名称不能为空");return result;}ThreadPoolExecutor executor = ThreadPoolRegistry.getExecutor(poolName);if (executor == null) {result.put("msg", String.format("线程池[%s]不存在", poolName));return result;}if (executor.isShutdown()) {result.put("msg", String.format("线程池[%s]已经关闭", poolName));return result;}ThreadPoolRegistry.shutdown(executor);result.put("poolName", poolName);log.info("线程池 {} 已开始关闭", poolName);return result;} catch (Exception e) {result.put("msg", "关闭线程池失败");result.put("error", e.getMessage());return result;}}
}
注册监控端点
项目中我将EndPoint封装成了 SpringBoot Starter,可以直接在项目中引入。
@Slf4j
@Configuration
@ConditionalOnClass({ThreadPoolRegistry.class})
@ConditionalOnProperty(prefix = "management.endpoint.threadpools", name = "enabled", havingValue = "true", matchIfMissing = true
)
public class ThreadPoolActuatorConfiguration {/*** 注册线程池监控端点* * @return MoonThreadPoolEndpoint*/@Bean@ConditionalOnMissingBean@ConditionalOnAvailableEndpointpublic MoonThreadPoolEndpoint moonThreadPoolEndpoint() {log.info("启用 Moon 线程池 Actuator Endpoint: /actuator/threadpools");return new MoonThreadPoolEndpoint();}
}
在 META-INF.spring 目录下新增 org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,文件内容
:
# Moon Thread Pool Auto Configuration
com.moon.cloud.threadpool.config.ThreadPoolActuatorConfiguration
EndPoint信息暴露
management.endpoints.web.exposure.include
可以设置需要暴露的 Actuator EndPoint,如 health、info 等信息,如果需要将所有 EndPoint都开放,则可以将其设置为 *
。
或者指定特定 EndPoint也可以。
# 开放所有 EndPoint
management.endpoints.web.exposure.include=*
# 开放特定 EndPoint
management.endpoints.web.exposure.include=health,info,threadpools
如果某些 EndPoint包含了敏感信息,可以通过 Spring Security 等进行授权控制。
EndPoint授权控制
添加 Spring Security 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId>
</dependency>
在 application.properties 中配置安全规则
spring.security.user.name=admin
spring.security.user.password=admin-password
或者自定义安全规则
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().antMatchers("/actuator/**").hasRole("ADMIN") // 这里保护所有Actuator EndPoint.anyRequest().authenticated().and().httpBasic();}
}
访问 EndPoint
类似 http://127.0.0.1:8080/actuator/threadpools , actuator 后面加上 EndPoint的 id 即可。