篇章六 系统性能优化——资源优化——CPU优化(3)
5.容错与监控
5.1 线程池监控指标
1.Micrometer 监控
Micrometer 是一个现代的监控库,用于在 Spring Boot 应用中收集和报告监控指标。以下是一个示例,展示如何使用 Micrometer 监控线程池的关键指标。
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;import java.util.concurrent.ThreadPoolExecutor;public class ThreadPoolMetrics {public static void registerThreadPoolMetrics(ThreadPoolExecutor threadPool, MeterRegistry meterRegistry) {// 监控活跃线程数Gauge.builder("thread.pool.active", threadPool::getActiveCount).tag("name", "api-pool").register(meterRegistry);// 监控线程池大小Gauge.builder("thread.pool.size", threadPool::getPoolSize).tag("name", "api-pool").register(meterRegistry);// 监控队列大小Gauge.builder("thread.pool.queue.size", threadPool.getQueue()::size).tag("name", "api-pool").register(meterRegistry);// 监控最大线程数Gauge.builder("thread.pool.max.size", threadPool::getMaximumPoolSize).tag("name", "api-pool").register(meterRegistry);// 监控核心线程数Gauge.builder("thread.pool.core.size", threadPool::getCorePoolSize).tag("name", "api-pool").register(meterRegistry);}
}
2.关键报警规则
-
活跃线程 > maxPoolSize * 0.8 持续 1 分钟:
-
如果活跃线程数超过最大线程数的 80% 持续 1 分钟,触发报警。
-
-
队列积压 > queueCapacity * 0.7:
-
如果队列中的任务数量超过队列容量的 70%,触发报警。
-
6.故障处理策略
6.1自定义拒绝策略
自定义拒绝策略可以在线程池拒绝任务时执行特定的逻辑。以下是一个示例,展示如何实现自定义拒绝策略:
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 1. 记录日志System.err.println("Task rejected: " + r.toString());System.err.println("Thread pool state: " + e.toString());// 2. 转存到 KafkasendToKafka(r);// 3. 触发弹性扩容triggerScaling();// 4. 返回 429 状态码throw new RejectedExecutionException("Too many requests");}private void sendToKafka(Runnable r) {// 模拟将任务发送到 KafkaSystem.out.println("Sending task to Kafka: " + r.toString());}private void triggerScaling() {// 模拟触发弹性扩容System.out.println("Triggering scaling...");}
}
6.2 配置线程池
在配置线程池时,可以使用自定义的拒绝策略:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("api-worker-");executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;}
}
6.3 具体例子
假设你有一个 Spring Boot 应用,需要处理高并发的 HTTP 请求。你可以使用 ThreadPoolTaskExecutor
配置线程池,并使用自定义的拒绝策略来处理任务拒绝的情况。
1.监控指标
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class MonitoringConfig {@Autowiredprivate MeterRegistry meterRegistry;@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("api-worker-");executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();// 注册监控指标registerThreadPoolMetrics(executor, meterRegistry);return executor;}private void registerThreadPoolMetrics(ThreadPoolTaskExecutor threadPool, MeterRegistry meterRegistry) {Gauge.builder("thread.pool.active", threadPool::getActiveCount).tag("name", "api-pool").register(meterRegistry);Gauge.builder("thread.pool.size", threadPool::getPoolSize).tag("name", "api-pool").register(meterRegistry);Gauge.builder("thread.pool.queue.size", threadPool.getQueue()::size).tag("name", "api-pool").register(meterRegistry);Gauge.builder("thread.pool.max.size", threadPool::getMaximumPoolSize).tag("name", "api-pool").register(meterRegistry);Gauge.builder("thread.pool.core.size", threadPool::getCorePoolSize).tag("name", "api-pool").register(meterRegistry);}
}
2.自定义拒绝策略
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 1. 记录日志System.err.println("Task rejected: " + r.toString());System.err.println("Thread pool state: " + e.toString());// 2. 转存到 KafkasendToKafka(r);// 3. 触发弹性扩容triggerScaling();// 4. 返回 429 状态码throw new RejectedExecutionException("Too many requests");}private void sendToKafka(Runnable r) {// 模拟将任务发送到 KafkaSystem.out.println("Sending task to Kafka: " + r.toString());}private void triggerScaling() {// 模拟触发弹性扩容System.out.println("Triggering scaling...");}
}
6.4 总结
-
线程池监控指标:
-
使用 Micrometer 监控线程池的关键指标,如活跃线程数、线程池大小、队列大小等。
-
设置关键报警规则,如活跃线程数超过最大线程数的 80% 持续 1 分钟,队列积压超过队列容量的 70%。
-
-
故障处理策略:
-
实现自定义拒绝策略,在线程池拒绝任务时执行特定的逻辑,如记录日志、转存到 Kafka、触发弹性扩容、返回 429 状态码。
-
通过合理配置和监控线程池,可以显著提高系统的容错能力和性能。
7.实施——基准测试
Meter 是一个开源的性能测试工具,主要用于测试软件和应用程序的性能。它最初是为测试 Web 应用程序而设计的,但随着时间的推移,已经扩展到支持多种协议和接口的性能测试。
7.1 JMeter 的主要功能
-
性能测试:
-
负载测试:模拟多个用户同时访问应用程序,以测试其在高负载下的表现。
-
压力测试:逐步增加负载,以确定应用程序的性能瓶颈。
-
耐力测试:长时间运行测试,以检查应用程序的稳定性和资源泄漏。
-
-
协议支持:
-
HTTP/HTTPS:测试 Web 应用程序。
-
FTP:测试文件传输服务。
-
JDBC:测试数据库性能。
-
JMS:测试消息中间件。
-
SOAP/REST:测试 Web 服务。
-
其他协议:如 LDAP、TCP、UDP 等。
-
-
测试计划:
-
线程组:模拟多个用户(线程)同时访问应用程序。
-
采样器:定义要测试的资源,如 HTTP 请求、数据库查询等。
-
监听器:收集和显示测试结果,如响应时间、吞吐量等。
-
断言:验证测试结果是否符合预期。
-
配置元件:设置测试环境,如 HTTP 请求头、数据库连接等。
-
前置处理器和后置处理器:在发送请求前或接收响应后执行特定操作。
-
-
报告和分析:
-
图形化报告:生成各种图表,如响应时间图、吞吐量图等。
-
CSV 报告:生成详细的测试结果文件,便于进一步分析。
-
7.2 JMeter 的工作原理
-
创建测试计划:
-
在 JMeter 中创建一个测试计划,定义测试的结构和逻辑。
-
-
配置线程组:
-
设置线程组的参数,如线程数(用户数)、循环次数、启动延迟等。
-
-
添加采样器:
-
添加具体的测试资源,如 HTTP 请求、数据库查询等。
-
-
添加监听器:
-
添加监听器以收集和显示测试结果。
-
-
运行测试:
-
启动测试,JMeter 会根据配置的线程组和采样器,模拟多个用户同时访问应用程序。
-
-
分析结果:
-
查看监听器生成的报告,分析应用程序的性能表现。
-
7.3使用 JMeter 的步骤
1. 安装 JMeter
-
下载 JMeter:从 Apache JMeter 官方网站 下载最新版本。
-
解压下载的文件到指定目录。
2. 创建测试计划
-
启动 JMeter:双击
jmeter.bat
(Windows)或jmeter.sh
(Linux/Mac)。 -
创建测试计划:在 JMeter 界面中,右键点击 “测试计划”,选择 “添加” -> “线程组”。
-
配置线程组:设置线程数、循环次数等参数。
3. 添加采样器
-
添加 HTTP 请求:右键点击线程组,选择 “添加” -> “采样器” -> “HTTP 请求”。
-
配置 HTTP 请求:设置协议、服务器名称、端口、路径等参数。
4. 添加监听器
-
添加监听器:右键点击线程组,选择 “添加” -> “监听器” -> “查看结果树”。
-
运行测试:点击 “启动” 按钮,开始测试。
5. 分析结果
-
查看结果:在 “查看结果树” 监听器中查看详细的测试结果。
-
生成报告:右键点击监听器,选择 “保存为 CSV 文件”,生成详细的测试报告。
7.4 示例:测试一个 Web 应用程序
假设你要测试一个 Web 应用程序的性能,以下是具体的步骤:
-
创建测试计划:
-
启动 JMeter。
-
右键点击 “测试计划”,选择 “添加” -> “线程组”。
-
设置线程数为 100,循环次数为 1。
-
-
添加 HTTP 请求:
-
右键点击线程组,选择 “添加” -> “采样器” -> “HTTP 请求”。
-
设置协议为
http
,服务器名称为example.com
,端口为80
,路径为/api/test
。
-
-
添加监听器:
-
右键点击线程组,选择 “添加” -> “监听器” -> “查看结果树”。
-
右键点击线程组,选择 “添加” -> “监听器” -> “聚合报告”。
-
-
运行测试:
-
点击 “启动” 按钮,开始测试。
-
-
分析结果:
-
在 “查看结果树” 监听器中查看详细的测试结果。
-
在 “聚合报告” 监听器中查看汇总的测试结果。
-
7.4 总结
JMeter 是一个功能强大的性能测试工具,支持多种协议和接口的性能测试。通过创建测试计划、配置线程组、添加采样器和监听器,可以模拟多个用户同时访问应用程序,从而测试其性能表现。JMeter 提供了丰富的报告和分析功能,帮助你优化应用程序的性能。
8.IO密集型——虚拟线程
虚拟线程是Java 21正式引入的一种轻量级线程,由JVM直接调度,而不是由操作系统内核线程直接管理。与传统线程(平台线程)相比,虚拟线程具有以下特点:
-
极致轻量:单个虚拟线程的内存占用仅几百字节,可以创建数百万个虚拟线程。
-
高效并发:在执行阻塞操作(如I/O操作)时,虚拟线程会自动挂起并释放资源,由JVM将其从当前平台线程卸载,当阻塞操作完成后再继续执行。
-
简化编程模型:支持同步代码风格,避免了复杂的异步回调。
8.1 创建虚拟线程
可以通过以下方式创建虚拟线程:
Thread virtualThread = Thread.startVirtualThread(() -> {System.out.println("Running in virtual thread: " + Thread.currentThread());
});
virtualThread.join(); // 等待线程完成
此外,还可以通过Executors.newVirtualThreadPerTaskExecutor()
创建一个虚拟线程池。
8.3 虚拟线程的优势
-
轻量级:创建和管理虚拟线程的开销极低。
-
简化并发编程:可以使用同步代码风格编写并发程序,避免了回调地狱。
-
提高可扩展性:适合高并发场景,能够显著提升系统的吞吐量。
-
与现有代码兼容:虚拟线程继承自
java.lang.Thread
,支持线程池、锁和同步机制。
8.4 应用场景
-
高并发的I/O密集型应用:如Web服务器、消息队列、API网关等,虚拟线程可以显著提高系统的吞吐量和响应能力。
-
微服务架构:在微服务中处理大量并发请求时,虚拟线程可以提升请求处理的吞吐量。
-
任务调度:将任务分配给多个虚拟线程并行执行,提高任务的执行效率。
8.5 注意事项
-
虽然虚拟线程在高并发场景下表现出色,但在CPU密集型任务中可能并不总是最优选择,因为虚拟线程的上下文切换也有一定开销。
-
虚拟线程并不改变Java的基本并发模型,传统线程仍然会继续存在。
8.6 总结
虚拟线程是Java 21引入的一项重要特性,它通过轻量级的线程模型和高效的调度机制,极大地简化了并发编程的复杂性,特别适合高并发的I/O密集型应用。在实际开发中,建议根据具体场景评估是否使用虚拟线程,以充分发挥其优势
9.CPU密集型
在处理CPU密集型任务时,虚拟线程(Virtual Threads)可能并不是最佳选择,因为虚拟线程的优势主要体现在高并发的I/O密集型场景中。对于CPU密集型任务,我们需要采用不同的策略来优化性能。以下是一些针对CPU密集型任务的优化建议:
9.1使用传统线程(平台线程)
对于CPU密集型任务,传统线程(平台线程)通常更适合,因为它们直接映射到操作系统的内核线程,能够充分利用多核CPU的计算能力。
示例代码:
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < 10; i++) {executor.submit(() -> {// CPU密集型任务for (long j = 0; j < 1000000000; j++) {// 模拟计算任务}System.out.println("Task completed by: " + Thread.currentThread().getName());});
}
executor.shutdown();
优点:
-
充分利用多核CPU:通过创建与CPU核心数相匹配的线程池,可以最大化利用多核CPU的计算能力。
-
避免上下文切换开销:传统线程的上下文切换开销相对较小,适合CPU密集型任务。
缺点:
-
线程数量受限:传统线程的资源占用较高,不适合创建大量线程。
9.2 任务拆分与并行处理
将CPU密集型任务拆分成多个子任务,并行执行,以充分利用多核CPU的计算能力。
示例代码:
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<Long>> futures = new ArrayList<>();// 将任务拆分成多个子任务
for (int i = 0; i < 10; i++) {int start = i * 100000000;int end = (i + 1) * 100000000;futures.add(executor.submit(() -> {long sum = 0;for (long j = start; j < end; j++) {sum += j;}return sum;}));
}// 合并结果
long totalSum = 0;
for (Future<Long> future : futures) {try {totalSum += future.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}System.out.println("Total sum: " + totalSum);
executor.shutdown();
优点:
-
充分利用多核CPU:通过并行处理,可以显著提高任务的执行效率。
-
可扩展性强:可以根据CPU核心数动态调整线程池大小。
缺点:
-
任务拆分复杂:需要合理拆分任务,确保每个子任务的计算量相对均衡。
9.3 使用Fork/Join框架
Fork/Join框架是Java提供的一个用于并行处理的框架,特别适合处理可分解的递归任务。
示例代码:
class SumTask extends RecursiveTask<Long> {private final long start;private final long end;public SumTask(long start, long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start <= 10000000) {long sum = 0;for (long i = start; i < end; i++) {sum += i;}return sum;} else {long mid = (start + end) / 2;SumTask leftTask = new SumTask(start, mid);SumTask rightTask = new SumTask(mid, end);leftTask.fork();long rightResult = rightTask.compute();long leftResult = leftTask.join();return leftResult + rightResult;}}
}public class ForkJoinExample {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();SumTask task = new SumTask(0, 1000000000L);long result = pool.invoke(task);System.out.println("Total sum: " + result);}
}
优点:
-
自动任务拆分:Fork/Join框架会自动将任务拆分成多个子任务,并行执行。
-
高效利用多核CPU:通过递归任务的并行处理,可以充分利用多核CPU的计算能力。
缺点:
-
学习曲线较陡:需要理解Fork/Join框架的工作原理和递归任务的拆分逻辑。
-
适用场景有限:更适合可分解的递归任务。
9.4 使用并行流(Parallel Streams)
Java 8引入了并行流(Parallel Streams),可以方便地将集合操作并行化。
示例代码:
import java.util.stream.IntStream;public class ParallelStreamExample {public static void main(String[] args) {long sum = IntStream.rangeClosed(1, 1000000000).parallel().sum();System.out.println("Total sum: " + sum);}
}
优点:
-
简单易用:只需在流操作中添加
.parallel()
,即可实现并行化。 -
自动任务拆分:并行流会自动将任务拆分成多个子任务,并行执行。
缺点:
-
适用场景有限:更适合简单的集合操作,对于复杂的任务拆分可能不够灵活。
-
性能开销:并行流的开销可能较大,不适合小规模数据。
9.5 优化算法和数据结构
在处理CPU密集型任务时,优化算法和数据结构可以显著提高性能。
示例:
-
使用更高效的算法(如快速排序代替冒泡排序)。
-
使用更高效的数据结构(如HashMap代替ArrayList)。
优点:
-
性能提升显著:优化算法和数据结构可以直接减少计算量,提高任务的执行效率。
-
适用范围广:适用于任何类型的任务。
缺点:
-
需要专业知识:需要对算法和数据结构有深入的了解。
9.6 总结
对于CPU密集型任务,建议优先考虑以下策略:
-
使用传统线程(平台线程)并创建与CPU核心数相匹配的线程池。
-
将任务拆分成多个子任务,并行处理。
-
使用Fork/Join框架或并行流(Parallel Streams)来简化并行处理。
-
优化算法和数据结构,减少计算量。
虚拟线程更适合I/O密集型任务,但在CPU密集型任务中,传统线程和并行处理机制通常能够提供更好的性能。