第9部分-性能优化、调试与并发设计模式
第9部分:性能优化、调试与并发设计模式
核心目标
构建高质量、可扩展的并发系统。
1. 性能测试与可视化工具
JMH(Java Microbenchmark Harness)
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class ConcurrencyBenchmark {private AtomicInteger atomicCounter;private int synchronizedCounter;private final Object lock = new Object();private ReentrantLock reentrantLock;private int reentrantLockCounter;@Setuppublic void setup() {atomicCounter = new AtomicInteger(0);synchronizedCounter = 0;reentrantLock = new ReentrantLock();reentrantLockCounter = 0;}@Benchmarkpublic int atomicIncrement() {return atomicCounter.incrementAndGet();}@Benchmarkpublic int synchronizedIncrement() {synchronized (lock) {return ++synchronizedCounter;}}@Benchmarkpublic int reentrantLockIncrement() {reentrantLock.lock();try {return ++reentrantLockCounter;} finally {reentrantLock.unlock();}}public static void main(String[] args) throws RunnerException {Options opt = new OptionsBuilder().include(ConcurrencyBenchmark.class.getSimpleName()).forks(1).warmupIterations(5).measurementIterations(10).build();new Runner(opt).run();}
}
VisualVM使用
public class VisualVMExample {public static void main(String[] args) throws InterruptedException {// 1. 线程监控demonstrateThreadMonitoring();// 2. 内存监控demonstrateMemoryMonitoring();// 3. CPU监控demonstrateCPUMonitoring();// 4. 死锁检测demonstrateDeadlockDetection();}private static void demonstrateThreadMonitoring() throws InterruptedException {System.out.println("=== 线程监控 ===");// 创建多个线程List<Thread> threads = new ArrayList<>();for (int i = 0; i < 10; i++) {final int threadId = i;Thread thread = new Thread(() -> {try {Thread.sleep(5000);System.out.println("线程 " + threadId + " 完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});threads.add(thread);thread.start();}// 等待所有线程完成for (Thread thread : threads) {thread.join();}}private static void demonstrateMemoryMonitoring() {System.out.println("=== 内存监控 ===");// 创建大量对象List<String> list = new ArrayList<>();for (int i = 0; i < 100000; i++) {list.add("String " + i);}// 强制垃圾回收System.gc();// 获取内存信息Runtime runtime = Runtime.getRuntime();long totalMemory = runtime.totalMemory();long freeMemory = runtime.freeMemory();long usedMemory = totalMemory - freeMemory;System.out.println("总内存: " + totalMemory / 1024 / 1024 + "MB");System.out.println("已用内存: " + usedMemory / 1024 / 1024 + "MB");System.out.println("空闲内存: " + freeMemory / 1024 / 1024 + "MB");}private static void demonstrateCPUMonitoring() {System.out.println("=== CPU监控 ===");// CPU密集型任务long start = System.currentTimeMillis();long sum = 0;for (int i = 0; i < 100000000; i++) {sum += i;}long time = System.currentTimeMillis() - start;System.out.println("CPU密集型任务耗时: " + time + "ms");System.out.println("计算结果: " + sum);}private static void demonstrateDeadlockDetection() throws InterruptedException {System.out.println("=== 死锁检测 ===");Object lock1 = new Object();Object lock2 = new Object();Thread thread1 = new Thread(() -> {synchronized (lock1) {try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}synchronized (lock2) {System.out.println("线程1获取两个锁");}}});Thread thread2 = new Thread(() -> {synchronized (lock2) {try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}synchronized (lock1) {System.out.println("线程2获取两个锁");}}});thread1.start();thread2.start();thread1.join();thread2.join();}
}
Flight Recorder使用
import jdk.jfr.*;public class FlightRecorderExample {@Eventpublic static class CustomEvent {@Label("Event Name")public String name;@Label("Event Value")public int value;}public static void main(String[] args) throws InterruptedException {// 1. 自定义事件demonstrateCustomEvents();// 2. 性能分析demonstratePerformanceAnalysis();// 3. 内存分析demonstrateMemoryAnalysis();}private static void demonstrateCustomEvents() throws InterruptedException {System.out.println("=== 自定义事件 ===");for (int i = 0; i < 100; i++) {CustomEvent event = new CustomEvent();event.name = "CustomEvent";event.value = i;event.commit();Thread.sleep(10);}}private static void demonstratePerformanceAnalysis() throws InterruptedException {System.out.println("=== 性能分析 ===");// 创建多个线程执行任务List<Thread> threads = new ArrayList<>();for (int i = 0; i < 5; i++) {final int threadId = i;Thread thread = new Thread(() -> {for (int j = 0; j < 1000; j++) {// 模拟工作Math.random();}});threads.add(thread);thread.start();}for (Thread thread : threads) {thread.join();}}private static void demonstrateMemoryAnalysis() {System.out.println("=== 内存分析 ===");// 创建大量对象List<String> list = new ArrayList<>();for (int i = 0; i < 100000; i++) {list.add("String " + i);}// 强制垃圾回收System.gc();}
}
2. 线程池调优与背压控制
线程池参数调优
public class ThreadPoolTuning {public static void main(String[] args) throws InterruptedException {// 1. 核心参数调优demonstrateCoreParameterTuning();// 2. 队列选择demonstrateQueueSelection();// 3. 拒绝策略demonstrateRejectionPolicies();// 4. 背压控制demonstrateBackpressureControl();}private static void demonstrateCoreParameterTuning() throws InterruptedException {System.out.println("=== 核心参数调优 ===");// CPU密集型任务ThreadPoolExecutor cpuIntensivePool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // 核心线程数Runtime.getRuntime().availableProcessors(), // 最大线程数60L, TimeUnit.SECONDS, // 空闲时间new LinkedBlockingQueue<>(), // 队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);// I/O密集型任务ThreadPoolExecutor ioIntensivePool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, // 核心线程数Runtime.getRuntime().availableProcessors() * 4, // 最大线程数60L, TimeUnit.SECONDS, // 空闲时间new LinkedBlockingQueue<>(100), // 有界队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);// 测试CPU密集型任务testThreadPool(cpuIntensivePool, "CPU密集型");// 测试I/O密集型任务testThreadPool(ioIntensivePool, "I/O密集型");cpuIntensivePool.shutdown();ioIntensivePool.shutdown();}private static void testThreadPool(ThreadPoolExecutor pool, String type) throws InterruptedException {long start = System.currentTimeMillis();List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 100; i++) {final int taskId = i;Future<String> future = pool.submit(() -> {try {Thread.sleep(100);return type + " 任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return type + " 任务 " + taskId + " 中断";}});futures.add(future);}for (Future<String> future : futures) {future.get();}long time = System.currentTimeMillis() - start;System.out.println(type + " 线程池耗时: " + time + "ms");}private static void demonstrateQueueSelection() throws InterruptedException {System.out.println("=== 队列选择 ===");// 1. LinkedBlockingQueue(无界)ThreadPoolExecutor unboundedPool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), // 无界队列new ThreadPoolExecutor.AbortPolicy());// 2. ArrayBlockingQueue(有界)ThreadPoolExecutor boundedPool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), // 有界队列new ThreadPoolExecutor.CallerRunsPolicy());// 3. SynchronousQueue(同步队列)ThreadPoolExecutor synchronousPool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS,new SynchronousQueue<>(), // 同步队列new ThreadPoolExecutor.CallerRunsPolicy());// 测试不同队列testQueue(unboundedPool, "无界队列");testQueue(boundedPool, "有界队列");testQueue(synchronousPool, "同步队列");unboundedPool.shutdown();boundedPool.shutdown();synchronousPool.shutdown();}private static void testQueue(ThreadPoolExecutor pool, String queueType) throws InterruptedException {long start = System.currentTimeMillis();List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 20; i++) {final int taskId = i;Future<String> future = pool.submit(() -> {try {Thread.sleep(100);return queueType + " 任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return queueType + " 任务 " + taskId + " 中断";}});futures.add(future);}for (Future<String> future : futures) {future.get();}long time = System.currentTimeMillis() - start;System.out.println(queueType + " 耗时: " + time + "ms");}private static void demonstrateRejectionPolicies() throws InterruptedException {System.out.println("=== 拒绝策略 ===");// 1. AbortPolicy(抛出异常)testRejectionPolicy(new ThreadPoolExecutor.AbortPolicy(), "AbortPolicy");// 2. CallerRunsPolicy(调用者运行)testRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy(), "CallerRunsPolicy");// 3. DiscardPolicy(丢弃任务)testRejectionPolicy(new ThreadPoolExecutor.DiscardPolicy(), "DiscardPolicy");// 4. DiscardOldestPolicy(丢弃最老任务)testRejectionPolicy(new ThreadPoolExecutor.DiscardOldestPolicy(), "DiscardOldestPolicy");}private static void testRejectionPolicy(RejectedExecutionHandler handler, String policyName) {System.out.println("测试 " + policyName + ":");ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1), // 小队列handler);try {for (int i = 0; i < 5; i++) {final int taskId = i;pool.submit(() -> {try {Thread.sleep(1000);System.out.println("任务 " + taskId + " 完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}} catch (RejectedExecutionException e) {System.out.println("任务被拒绝: " + e.getMessage());}pool.shutdown();}private static void demonstrateBackpressureControl() throws InterruptedException {System.out.println("=== 背压控制 ===");// 使用信号量控制并发Semaphore semaphore = new Semaphore(5); // 最多5个并发任务ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 20; i++) {final int taskId = i;Future<String> future = pool.submit(() -> {try {semaphore.acquire(); // 获取许可try {Thread.sleep(1000);return "任务 " + taskId + " 完成";} finally {semaphore.release(); // 释放许可}} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}for (Future<String> future : futures) {System.out.println(future.get());}pool.shutdown();}
}
自适应线程池
public class AdaptiveThreadPool {private final ThreadPoolExecutor pool;private final AtomicInteger taskCount = new AtomicInteger(0);private final AtomicLong totalExecutionTime = new AtomicLong(0);private final AtomicLong lastAdjustmentTime = new AtomicLong(System.currentTimeMillis());public AdaptiveThreadPool() {this.pool = new ThreadPoolExecutor(2, 10, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());// 启动自适应调整线程startAdaptiveAdjustment();}private void startAdaptiveAdjustment() {Thread adjustmentThread = new Thread(() -> {while (!pool.isShutdown()) {try {Thread.sleep(5000); // 每5秒调整一次adjustPoolSize();} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});adjustmentThread.setDaemon(true);adjustmentThread.start();}private void adjustPoolSize() {int currentSize = pool.getPoolSize();int activeCount = pool.getActiveCount();int queueSize = pool.getQueue().size();// 计算平均执行时间long currentTime = System.currentTimeMillis();long timeSinceLastAdjustment = currentTime - lastAdjustmentTime.get();int tasksSinceLastAdjustment = taskCount.get();if (tasksSinceLastAdjustment > 0) {long avgExecutionTime = timeSinceLastAdjustment / tasksSinceLastAdjustment;// 根据队列大小和平均执行时间调整线程数if (queueSize > 10 && avgExecutionTime > 1000) {// 增加线程数int newSize = Math.min(currentSize + 2, pool.getMaximumPoolSize());pool.setCorePoolSize(newSize);System.out.println("增加线程数到: " + newSize);} else if (queueSize < 2 && avgExecutionTime < 500) {// 减少线程数int newSize = Math.max(currentSize - 1, 1);pool.setCorePoolSize(newSize);System.out.println("减少线程数到: " + newSize);}}// 重置计数器taskCount.set(0);lastAdjustmentTime.set(currentTime);}public Future<String> submit(Callable<String> task) {taskCount.incrementAndGet();return pool.submit(task);}public void shutdown() {pool.shutdown();}public static void main(String[] args) throws InterruptedException {AdaptiveThreadPool adaptivePool = new AdaptiveThreadPool();// 提交任务List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 100; i++) {final int taskId = i;Future<String> future = adaptivePool.submit(() -> {try {Thread.sleep(1000 + (int) (Math.random() * 1000));return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}// 等待所有任务完成for (Future<String> future : futures) {System.out.println(future.get());}adaptivePool.shutdown();}
}
3. 并发设计模式
生产者-消费者模式
public class ProducerConsumerPattern {public static void main(String[] args) throws InterruptedException {// 1. 基本生产者-消费者demonstrateBasicProducerConsumer();// 2. 多生产者-多消费者demonstrateMultiProducerConsumer();// 3. 优先级生产者-消费者demonstratePriorityProducerConsumer();}private static void demonstrateBasicProducerConsumer() throws InterruptedException {System.out.println("=== 基本生产者-消费者 ===");BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);// 生产者Thread producer = new Thread(() -> {for (int i = 0; i < 20; i++) {try {queue.put("Item " + i);System.out.println("生产: Item " + i);Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});// 消费者Thread consumer = new Thread(() -> {for (int i = 0; i < 20; i++) {try {String item = queue.take();System.out.println("消费: " + item);Thread.sleep(150);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});producer.start();consumer.start();producer.join();consumer.join();}private static void demonstrateMultiProducerConsumer() throws InterruptedException {System.out.println("=== 多生产者-多消费者 ===");BlockingQueue<String> queue = new LinkedBlockingQueue<>(20);// 多个生产者List<Thread> producers = new ArrayList<>();for (int i = 0; i < 3; i++) {final int producerId = i;Thread producer = new Thread(() -> {for (int j = 0; j < 10; j++) {try {queue.put("Producer " + producerId + " - Item " + j);System.out.println("生产者 " + producerId + " 生产: Item " + j);Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});producers.add(producer);}// 多个消费者List<Thread> consumers = new ArrayList<>();for (int i = 0; i < 2; i++) {final int consumerId = i;Thread consumer = new Thread(() -> {for (int j = 0; j < 15; j++) {try {String item = queue.take();System.out.println("消费者 " + consumerId + " 消费: " + item);Thread.sleep(150);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});consumers.add(consumer);}// 启动所有线程for (Thread producer : producers) {producer.start();}for (Thread consumer : consumers) {consumer.start();}// 等待所有线程完成for (Thread producer : producers) {producer.join();}for (Thread consumer : consumers) {consumer.join();}}private static void demonstratePriorityProducerConsumer() throws InterruptedException {System.out.println("=== 优先级生产者-消费者 ===");PriorityBlockingQueue<PriorityItem> queue = new PriorityBlockingQueue<>();// 生产者Thread producer = new Thread(() -> {for (int i = 0; i < 20; i++) {int priority = (int) (Math.random() * 10);PriorityItem item = new PriorityItem("Item " + i, priority);queue.put(item);System.out.println("生产: " + item);try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});// 消费者Thread consumer = new Thread(() -> {for (int i = 0; i < 20; i++) {try {PriorityItem item = queue.take();System.out.println("消费: " + item);Thread.sleep(150);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});producer.start();consumer.start();producer.join();consumer.join();}static class PriorityItem implements Comparable<PriorityItem> {private final String name;private final int priority;public PriorityItem(String name, int priority) {this.name = name;this.priority = priority;}@Overridepublic int compareTo(PriorityItem other) {return Integer.compare(other.priority, this.priority); // 高优先级在前}@Overridepublic String toString() {return name + " (优先级: " + priority + ")";}}
}
Future模式
public class FuturePattern {public static void main(String[] args) throws InterruptedException {// 1. 基本Future模式demonstrateBasicFuture();// 2. 组合FuturedemonstrateComposedFuture();// 3. 超时处理demonstrateTimeoutHandling();}private static void demonstrateBasicFuture() throws InterruptedException {System.out.println("=== 基本Future模式 ===");ExecutorService executor = Executors.newFixedThreadPool(3);// 提交任务Future<String> future1 = executor.submit(() -> {Thread.sleep(1000);return "任务1完成";});Future<String> future2 = executor.submit(() -> {Thread.sleep(1500);return "任务2完成";});Future<String> future3 = executor.submit(() -> {Thread.sleep(800);return "任务3完成";});// 获取结果try {System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}private static void demonstrateComposedFuture() throws InterruptedException {System.out.println("=== 组合Future ===");ExecutorService executor = Executors.newFixedThreadPool(3);// 第一个任务Future<Integer> future1 = executor.submit(() -> {Thread.sleep(1000);return 10;});// 第二个任务Future<Integer> future2 = executor.submit(() -> {Thread.sleep(1500);return 20;});// 组合任务Future<Integer> combinedFuture = executor.submit(() -> {try {int result1 = future1.get();int result2 = future2.get();return result1 + result2;} catch (ExecutionException e) {throw new RuntimeException(e);}});try {System.out.println("组合结果: " + combinedFuture.get());} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}private static void demonstrateTimeoutHandling() throws InterruptedException {System.out.println("=== 超时处理 ===");ExecutorService executor = Executors.newFixedThreadPool(2);// 长时间运行的任务Future<String> future = executor.submit(() -> {Thread.sleep(3000);return "长时间任务完成";});try {// 设置超时时间String result = future.get(2, TimeUnit.SECONDS);System.out.println("结果: " + result);} catch (TimeoutException e) {System.out.println("任务超时,取消执行");future.cancel(true);} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}
}
工作窃取模式
public class WorkStealingPattern {public static void main(String[] args) throws InterruptedException {// 1. ForkJoinPool工作窃取demonstrateForkJoinWorkStealing();// 2. 自定义工作窃取demonstrateCustomWorkStealing();// 3. 负载均衡demonstrateLoadBalancing();}private static void demonstrateForkJoinWorkStealing() throws InterruptedException {System.out.println("=== ForkJoinPool工作窃取 ===");ForkJoinPool pool = new ForkJoinPool();// 提交任务List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 20; i++) {final int taskId = i;Future<String> future = pool.submit(() -> {try {Thread.sleep(1000 + (int) (Math.random() * 1000));return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}// 等待所有任务完成for (Future<String> future : futures) {System.out.println(future.get());}pool.shutdown();}private static void demonstrateCustomWorkStealing() throws InterruptedException {System.out.println("=== 自定义工作窃取 ===");WorkStealingQueue<String> queue = new WorkStealingQueue<>();// 添加任务for (int i = 0; i < 20; i++) {queue.add("任务 " + i);}// 创建工作线程List<Thread> workers = new ArrayList<>();for (int i = 0; i < 4; i++) {final int workerId = i;Thread worker = new Thread(() -> {while (true) {String task = queue.steal();if (task == null) {break;}System.out.println("工作线程 " + workerId + " 处理: " + task);try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});workers.add(worker);}// 启动工作线程for (Thread worker : workers) {worker.start();}// 等待所有工作线程完成for (Thread worker : workers) {worker.join();}}private static void demonstrateLoadBalancing() throws InterruptedException {System.out.println("=== 负载均衡 ===");LoadBalancer loadBalancer = new LoadBalancer(4);// 提交任务List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 20; i++) {final int taskId = i;Future<String> future = loadBalancer.submit(() -> {try {Thread.sleep(1000 + (int) (Math.random() * 1000));return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}// 等待所有任务完成for (Future<String> future : futures) {System.out.println(future.get());}loadBalancer.shutdown();}static class WorkStealingQueue<T> {private final Queue<T> queue = new ConcurrentLinkedQueue<>();public void add(T item) {queue.offer(item);}public T steal() {return queue.poll();}}static class LoadBalancer {private final List<ExecutorService> executors;private final AtomicInteger index = new AtomicInteger(0);public LoadBalancer(int poolCount) {executors = new ArrayList<>();for (int i = 0; i < poolCount; i++) {executors.add(Executors.newFixedThreadPool(2));}}public Future<String> submit(Callable<String> task) {int currentIndex = index.getAndIncrement() % executors.size();return executors.get(currentIndex).submit(task);}public void shutdown() {for (ExecutorService executor : executors) {executor.shutdown();}}}
}
Actor模型
public class ActorModel {public static void main(String[] args) throws InterruptedException {// 1. 基本ActordemonstrateBasicActor();// 2. Actor通信demonstrateActorCommunication();// 3. Actor监督demonstrateActorSupervision();}private static void demonstrateBasicActor() throws InterruptedException {System.out.println("=== 基本Actor ===");ActorSystem system = new ActorSystem();// 创建ActorActorRef counter = system.actorOf(CounterActor.class);// 发送消息counter.tell(new IncrementMessage());counter.tell(new IncrementMessage());counter.tell(new GetCountMessage());Thread.sleep(1000);system.shutdown();}private static void demonstrateActorCommunication() throws InterruptedException {System.out.println("=== Actor通信 ===");ActorSystem system = new ActorSystem();// 创建ActorActorRef sender = system.actorOf(SenderActor.class);ActorRef receiver = system.actorOf(ReceiverActor.class);// 发送消息sender.tell(new SendMessage("Hello", receiver));Thread.sleep(1000);system.shutdown();}private static void demonstrateActorSupervision() throws InterruptedException {System.out.println("=== Actor监督 ===");ActorSystem system = new ActorSystem();// 创建监督ActorActorRef supervisor = system.actorOf(SupervisorActor.class);// 发送消息supervisor.tell(new CreateChildMessage());supervisor.tell(new SendMessage("Test", null));Thread.sleep(1000);system.shutdown();}// 消息类static class IncrementMessage {}static class GetCountMessage {}static class SendMessage {private final String content;private final ActorRef target;public SendMessage(String content, ActorRef target) {this.content = content;this.target = target;}public String getContent() { return content; }public ActorRef getTarget() { return target; }}static class CreateChildMessage {}// Actor引用static class ActorRef {private final Actor actor;private final ExecutorService executor;public ActorRef(Actor actor, ExecutorService executor) {this.actor = actor;this.executor = executor;}public void tell(Object message) {executor.submit(() -> actor.receive(message));}}// Actor基类static abstract class Actor {protected ActorRef self;protected ActorRef parent;public void setSelf(ActorRef self) {this.self = self;}public void setParent(ActorRef parent) {this.parent = parent;}public abstract void receive(Object message);}// 计数器Actorstatic class CounterActor extends Actor {private int count = 0;@Overridepublic void receive(Object message) {if (message instanceof IncrementMessage) {count++;System.out.println("计数器增加到: " + count);} else if (message instanceof GetCountMessage) {System.out.println("当前计数: " + count);}}}// 发送者Actorstatic class SenderActor extends Actor {@Overridepublic void receive(Object message) {if (message instanceof SendMessage) {SendMessage msg = (SendMessage) message;System.out.println("发送消息: " + msg.getContent());if (msg.getTarget() != null) {msg.getTarget().tell(msg);}}}}// 接收者Actorstatic class ReceiverActor extends Actor {@Overridepublic void receive(Object message) {if (message instanceof SendMessage) {SendMessage msg = (SendMessage) message;System.out.println("接收消息: " + msg.getContent());}}}// 监督者Actorstatic class SupervisorActor extends Actor {private final List<ActorRef> children = new ArrayList<>();@Overridepublic void receive(Object message) {if (message instanceof CreateChildMessage) {// 创建子ActorActorRef child = new ActorRef(new ChildActor(), Executors.newSingleThreadExecutor());child.setParent(self);children.add(child);System.out.println("创建子Actor");} else if (message instanceof SendMessage) {// 转发消息给子Actorif (!children.isEmpty()) {children.get(0).tell(message);}}}}// 子Actorstatic class ChildActor extends Actor {@Overridepublic void receive(Object message) {if (message instanceof SendMessage) {SendMessage msg = (SendMessage) message;System.out.println("子Actor接收消息: " + msg.getContent());}}}// Actor系统static class ActorSystem {private final ExecutorService executor = Executors.newCachedThreadPool();private final List<ActorRef> actors = new ArrayList<>();public ActorRef actorOf(Class<? extends Actor> actorClass) {try {Actor actor = actorClass.newInstance();ActorRef actorRef = new ActorRef(actor, executor);actor.setSelf(actorRef);actors.add(actorRef);return actorRef;} catch (Exception e) {throw new RuntimeException(e);}}public void shutdown() {executor.shutdown();}}
}
4. 虚拟线程 + 平台线程的混合模型
混合模型设计
public class HybridThreadModel {public static void main(String[] args) throws InterruptedException {// 1. 基本混合模型demonstrateBasicHybridModel();// 2. 任务分类处理demonstrateTaskClassification();// 3. 性能优化demonstratePerformanceOptimization();}private static void demonstrateBasicHybridModel() throws InterruptedException {System.out.println("=== 基本混合模型 ===");// 平台线程池(CPU密集型任务)ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());// 虚拟线程池(I/O密集型任务)ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();// 提交CPU密集型任务到平台线程池List<Future<String>> cpuFutures = new ArrayList<>();for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = platformExecutor.submit(() -> {// CPU密集型计算long sum = 0;for (int j = 0; j < 1000000; j++) {sum += j;}return "CPU任务 " + taskId + " 完成,结果: " + sum;});cpuFutures.add(future);}// 提交I/O密集型任务到虚拟线程池List<Future<String>> ioFutures = new ArrayList<>();for (int i = 0; i < 10; i++) {final int taskId = i;Future<String> future = virtualExecutor.submit(() -> {// I/O密集型操作try {Thread.sleep(1000);return "I/O任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "I/O任务 " + taskId + " 中断";}});ioFutures.add(future);}// 等待所有任务完成for (Future<String> future : cpuFutures) {System.out.println(future.get());}for (Future<String> future : ioFutures) {System.out.println(future.get());}platformExecutor.shutdown();virtualExecutor.shutdown();}private static void demonstrateTaskClassification() throws InterruptedException {System.out.println("=== 任务分类处理 ===");TaskClassifier classifier = new TaskClassifier();// 提交不同类型的任务List<Future<String>> futures = new ArrayList<>();// CPU密集型任务futures.add(classifier.submit(new CPUTask("CPU任务1")));futures.add(classifier.submit(new CPUTask("CPU任务2")));// I/O密集型任务futures.add(classifier.submit(new IOTask("I/O任务1")));futures.add(classifier.submit(new IOTask("I/O任务2")));futures.add(classifier.submit(new IOTask("I/O任务3")));// 混合任务futures.add(classifier.submit(new MixedTask("混合任务1")));futures.add(classifier.submit(new MixedTask("混合任务2")));// 等待所有任务完成for (Future<String> future : futures) {System.out.println(future.get());}classifier.shutdown();}private static void demonstratePerformanceOptimization() throws InterruptedException {System.out.println("=== 性能优化 ===");OptimizedHybridExecutor executor = new OptimizedHybridExecutor();// 提交任务List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 100; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {// 模拟任务try {Thread.sleep(100);return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}// 等待所有任务完成for (Future<String> future : futures) {System.out.println(future.get());}executor.shutdown();}// 任务分类器static class TaskClassifier {private final ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();public Future<String> submit(Task task) {if (task instanceof CPUTask) {return platformExecutor.submit(task);} else if (task instanceof IOTask) {return virtualExecutor.submit(task);} else {// 混合任务,根据当前负载选择if (platformExecutor.getActiveCount() < platformExecutor.getCorePoolSize()) {return platformExecutor.submit(task);} else {return virtualExecutor.submit(task);}}}public void shutdown() {platformExecutor.shutdown();virtualExecutor.shutdown();}}// 任务基类static abstract class Task implements Callable<String> {protected final String name;public Task(String name) {this.name = name;}}// CPU密集型任务static class CPUTask extends Task {public CPUTask(String name) {super(name);}@Overridepublic String call() {// CPU密集型计算long sum = 0;for (int i = 0; i < 1000000; i++) {sum += i;}return name + " 完成,结果: " + sum;}}// I/O密集型任务static class IOTask extends Task {public IOTask(String name) {super(name);}@Overridepublic String call() throws InterruptedException {// I/O密集型操作Thread.sleep(1000);return name + " 完成";}}// 混合任务static class MixedTask extends Task {public MixedTask(String name) {super(name);}@Overridepublic String call() throws InterruptedException {// 混合操作long sum = 0;for (int i = 0; i < 100000; i++) {sum += i;}Thread.sleep(500);return name + " 完成,结果: " + sum;}}// 优化的混合执行器static class OptimizedHybridExecutor {private final ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();private final AtomicInteger taskCount = new AtomicInteger(0);public Future<String> submit(Callable<String> task) {taskCount.incrementAndGet();// 根据任务数量动态选择执行器if (taskCount.get() % 2 == 0) {return platformExecutor.submit(task);} else {return virtualExecutor.submit(task);}}public void shutdown() {platformExecutor.shutdown();virtualExecutor.shutdown();}}
}
5. Java并发未来趋势:结构化并发
结构化并发概念
public class StructuredConcurrency {public static void main(String[] args) throws InterruptedException {// 1. 基本结构化并发demonstrateBasicStructuredConcurrency();// 2. 异常处理demonstrateExceptionHandling();// 3. 资源管理demonstrateResourceManagement();}private static void demonstrateBasicStructuredConcurrency() throws InterruptedException {System.out.println("=== 基本结构化并发 ===");try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {// 提交多个相关任务List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {try {Thread.sleep(1000);return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}// 等待所有任务完成for (Future<String> future : futures) {System.out.println(future.get());}}}private static void demonstrateExceptionHandling() throws InterruptedException {System.out.println("=== 异常处理 ===");try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {if (taskId == 2) {throw new RuntimeException("任务 " + taskId + " 失败");}try {Thread.sleep(1000);return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}// 处理结果和异常for (Future<String> future : futures) {try {System.out.println(future.get());} catch (ExecutionException e) {System.out.println("任务异常: " + e.getCause().getMessage());}}}}private static void demonstrateResourceManagement() throws InterruptedException {System.out.println("=== 资源管理 ===");try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 3; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {try (Resource resource = new Resource("资源 " + taskId)) {Thread.sleep(1000);return "任务 " + taskId + " 完成,使用 " + resource.getName();} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 中断";}});futures.add(future);}for (Future<String> future : futures) {System.out.println(future.get());}}}// 资源类static class Resource implements AutoCloseable {private final String name;public Resource(String name) {this.name = name;System.out.println("创建资源: " + name);}public String getName() {return name;}@Overridepublic void close() {System.out.println("释放资源: " + name);}}
}
结构化并发最佳实践
public class StructuredConcurrencyBestPractices {public static void main(String[] args) throws InterruptedException {// 1. 任务分组demonstrateTaskGrouping();// 2. 超时控制demonstrateTimeoutControl();// 3. 取消操作demonstrateCancellation();}private static void demonstrateTaskGrouping() throws InterruptedException {System.out.println("=== 任务分组 ===");try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {// 第一组任务List<Future<String>> group1 = new ArrayList<>();for (int i = 0; i < 3; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {Thread.sleep(1000);return "组1任务 " + taskId + " 完成";});group1.add(future);}// 第二组任务List<Future<String>> group2 = new ArrayList<>();for (int i = 0; i < 3; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {Thread.sleep(1500);return "组2任务 " + taskId + " 完成";});group2.add(future);}// 等待第一组完成System.out.println("第一组任务结果:");for (Future<String> future : group1) {System.out.println(future.get());}// 等待第二组完成System.out.println("第二组任务结果:");for (Future<String> future : group2) {System.out.println(future.get());}}}private static void demonstrateTimeoutControl() throws InterruptedException {System.out.println("=== 超时控制 ===");try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {Thread.sleep(2000);return "任务 " + taskId + " 完成";});futures.add(future);}// 设置超时时间long timeout = 3000; // 3秒long start = System.currentTimeMillis();for (Future<String> future : futures) {try {long remaining = timeout - (System.currentTimeMillis() - start);if (remaining <= 0) {System.out.println("超时,取消剩余任务");future.cancel(true);break;}String result = future.get(remaining, TimeUnit.MILLISECONDS);System.out.println(result);} catch (TimeoutException e) {System.out.println("任务超时,取消执行");future.cancel(true);}}}}private static void demonstrateCancellation() throws InterruptedException {System.out.println("=== 取消操作 ===");try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = executor.submit(() -> {try {Thread.sleep(5000);return "任务 " + taskId + " 完成";} catch (InterruptedException e) {Thread.currentThread().interrupt();return "任务 " + taskId + " 被取消";}});futures.add(future);}// 等待一段时间后取消所有任务Thread.sleep(2000);for (Future<String> future : futures) {future.cancel(true);}// 检查任务状态for (int i = 0; i < futures.size(); i++) {Future<String> future = futures.get(i);if (future.isCancelled()) {System.out.println("任务 " + i + " 已取消");} else {try {System.out.println(future.get());} catch (ExecutionException e) {System.out.println("任务 " + i + " 执行异常: " + e.getCause().getMessage());}}}}}
}
实践练习
练习1:性能监控系统
public class PerformanceMonitoringSystem {private final AtomicLong totalRequests = new AtomicLong(0);private final AtomicLong successfulRequests = new AtomicLong(0);private final AtomicLong failedRequests = new AtomicLong(0);private final AtomicLong totalResponseTime = new AtomicLong(0);private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();public void processRequest(String requestId) {long startTime = System.currentTimeMillis();totalRequests.incrementAndGet();try {// 模拟请求处理Thread.sleep(100 + (int) (Math.random() * 200));// 模拟成功或失败if (Math.random() > 0.1) {successfulRequests.incrementAndGet();System.out.println("请求 " + requestId + " 处理成功");} else {failedRequests.incrementAndGet();System.out.println("请求 " + requestId + " 处理失败");}} catch (InterruptedException e) {Thread.currentThread().interrupt();failedRequests.incrementAndGet();} finally {long responseTime = System.currentTimeMillis() - startTime;totalResponseTime.addAndGet(responseTime);}}public void printStatistics() {long total = totalRequests.get();long successful = successfulRequests.get();long failed = failedRequests.get();long avgResponseTime = total > 0 ? totalResponseTime.get() / total : 0;System.out.println("=== 性能统计 ===");System.out.println("总请求数: " + total);System.out.println("成功请求数: " + successful);System.out.println("失败请求数: " + failed);System.out.println("成功率: " + (total > 0 ? (double) successful / total * 100 : 0) + "%");System.out.println("平均响应时间: " + avgResponseTime + "ms");}public void shutdown() {executor.shutdown();}public static void main(String[] args) throws InterruptedException {PerformanceMonitoringSystem system = new PerformanceMonitoringSystem();// 模拟处理请求for (int i = 0; i < 100; i++) {final int requestId = i;system.executor.submit(() -> {system.processRequest("REQ-" + requestId);});}// 等待所有请求完成Thread.sleep(5000);// 打印统计信息system.printStatistics();system.shutdown();}
}
练习2:高并发缓存系统
public class HighConcurrencyCacheSystem {private final ConcurrentHashMap<String, CacheEntry> cache = new ConcurrentHashMap<>();private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();private final AtomicLong hits = new AtomicLong(0);private final AtomicLong misses = new AtomicLong(0);public String get(String key) {CacheEntry entry = cache.get(key);if (entry != null && !entry.isExpired()) {hits.incrementAndGet();return entry.getValue();} else {misses.incrementAndGet();return null;}}public void put(String key, String value, long ttl) {CacheEntry entry = new CacheEntry(value, System.currentTimeMillis() + ttl);cache.put(key, entry);}public void remove(String key) {cache.remove(key);}public void clear() {cache.clear();}public void printStatistics() {long total = hits.get() + misses.get();double hitRate = total > 0 ? (double) hits.get() / total * 100 : 0;System.out.println("=== 缓存统计 ===");System.out.println("缓存大小: " + cache.size());System.out.println("命中次数: " + hits.get());System.out.println("未命中次数: " + misses.get());System.out.println("命中率: " + hitRate + "%");}public void shutdown() {executor.shutdown();}static class CacheEntry {private final String value;private final long expirationTime;public CacheEntry(String value, long expirationTime) {this.value = value;this.expirationTime = expirationTime;}public String getValue() {return value;}public boolean isExpired() {return System.currentTimeMillis() > expirationTime;}}public static void main(String[] args) throws InterruptedException {HighConcurrencyCacheSystem cache = new HighConcurrencyCacheSystem();// 模拟并发访问List<Future<Void>> futures = new ArrayList<>();for (int i = 0; i < 1000; i++) {final int taskId = i;Future<Void> future = cache.executor.submit(() -> {String key = "key" + (taskId % 100);String value = cache.get(key);if (value == null) {cache.put(key, "value" + taskId, 5000);}return null;});futures.add(future);}// 等待所有任务完成for (Future<Void> future : futures) {future.get();}// 打印统计信息cache.printStatistics();cache.shutdown();}
}
总结
本部分深入介绍了性能优化、调试与并发设计模式:
- 性能测试工具:JMH、VisualVM、Flight Recorder的使用
- 线程池调优:参数优化、背压控制、自适应调整
- 并发设计模式:生产者-消费者、Future、工作窃取、Actor模型
- 混合模型:虚拟线程与平台线程的结合使用
- 结构化并发:Java并发的未来趋势
- 最佳实践:性能监控、缓存系统等实际应用
这些知识是构建高质量、高性能并发系统的关键,掌握了这些内容,就能够设计和实现优秀的并发应用程序。
