当前位置: 首页 > news >正文

分布式任务调度系统中的线程池使用详解

分布式任务调度系统中的线程池使用详解

1. 线程池参数设置的讲究

1.1 核心线程数、最大线程数、队列大小等关键参数设置

线程池的核心参数直接影响系统的性能、资源利用率和稳定性。以下是关键参数的设置原则和代码实现:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程池工厂类,提供不同场景下的线程池创建方法*/
public class ThreadPoolFactory {/*** 创建CPU密集型任务线程池* * @param poolName 线程池名称,用于标识线程* @return 线程池实例*/public static ThreadPoolExecutor createCpuIntensivePool(String poolName) {// CPU密集型任务,线程数通常设置为CPU核心数+1int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;int maximumPoolSize = corePoolSize;long keepAliveTime = 60L;TimeUnit unit = TimeUnit.SECONDS;// 使用有界队列,防止OOMBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1000);// 自定义线程工厂,便于问题排查ThreadFactory threadFactory = new CustomThreadFactory(poolName);// 自定义拒绝策略RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);}/*** 创建IO密集型任务线程池* * @param poolName 线程池名称* @return 线程池实例*/public static ThreadPoolExecutor createIoIntensivePool(String poolName) {// IO密集型任务,线程数可以设置较高// 一般设置为:CPU核心数 * (1 + 平均等待时间/平均工作时间)int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 4;long keepAliveTime = 60L;TimeUnit unit = TimeUnit.SECONDS;// 使用LinkedBlockingQueue,容量可以设置大一些BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5000);ThreadFactory threadFactory = new CustomThreadFactory(poolName);// 使用AbortPolicy,快速失败,便于发现问题RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);}/*** 创建混合型任务线程池* * @param poolName 线程池名称* @return 线程池实例*/public static ThreadPoolExecutor createMixedWorkloadPool(String poolName) {int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 8;long keepAliveTime = 60L;TimeUnit unit = TimeUnit.SECONDS;// 使用SynchronousQueue,直接交付任务,没有队列缓冲// 适合任务数量不确定,且任务执行时间差异较大的场景BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();ThreadFactory threadFactory = new CustomThreadFactory(poolName);// 使用自定义拒绝策略,记录日志并重试RejectedExecutionHandler handler = new LogAndRetryPolicy();return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);}/*** 自定义线程工厂,创建带有特定前缀的线程,便于问题排查*/static class CustomThreadFactory implements ThreadFactory {private final ThreadGroup group;private final String namePrefix;private final AtomicInteger threadNumber = new AtomicInteger(1);CustomThreadFactory(String poolName) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = "pool-" + poolName + "-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}/*** 自定义拒绝策略:记录日志并尝试重新提交任务*/static class LogAndRetryPolicy implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {// 记录拒绝日志System.err.println("Task rejected, retrying: " + r.toString());// 等待一段时间后重试Thread.sleep(100);// 重新提交任务executor.submit(r);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException("Task " + r + " rejected from " + executor, e);} catch (RejectedExecutionException e) {// 如果再次被拒绝,则在调用线程中执行System.err.println("Task rejected again, running in caller thread: " + r.toString());r.run();}}}
}

1.2 不同场景下的参数调优策略

不同的业务场景需要不同的线程池配置策略:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程池参数调优示例*/
public class ThreadPoolTuningExample {/*** 短任务快速处理线程池* 适用场景:大量短小的计算任务,需要快速响应*/public static ThreadPoolExecutor createFastResponsePool() {return new ThreadPoolExecutor(// 核心线程数较多,保证响应速度Runtime.getRuntime().availableProcessors() * 2,// 最大线程数较大,应对突发流量Runtime.getRuntime().availableProcessors() * 4,// 非核心线程存活时间短,快速回收资源30, TimeUnit.SECONDS,// 使用SynchronousQueue,不缓存任务,直接交给线程处理new SynchronousQueue<>(),new ThreadFactoryBuilder().setNameFormat("fast-task-%d").build(),// 拒绝策略使用CallerRunsPolicy,不丢弃任务,由调用线程执行new ThreadPoolExecutor.CallerRunsPolicy());}/*** 大型批处理任务线程池* 适用场景:数据批处理、ETL等耗时较长的任务*/public static ThreadPoolExecutor createBatchProcessingPool() {return new ThreadPoolExecutor(// 核心线程数适中,避免资源争用Runtime.getRuntime().availableProcessors(),// 最大线程数与核心线程数相同,避免线程数波动Runtime.getRuntime().availableProcessors(),// 长时间保持线程存活0, TimeUnit.MILLISECONDS, // 不会回收核心线程// 使用有界队列,控制任务数量,防止OOMnew ArrayBlockingQueue<>(500),new ThreadFactoryBuilder().setNameFormat("batch-task-%d").build(),// 拒绝策略使用AbortPolicy,快速失败new ThreadPoolExecutor.AbortPolicy());}/*** 定时任务线程池* 适用场景:周期性执行的调度任务*/public static ScheduledThreadPoolExecutor createScheduledTaskPool() {return new ScheduledThreadPoolExecutor(// 线程数适中,通常不需要太多线程Runtime.getRuntime().availableProcessors(),new ThreadFactoryBuilder().setNameFormat("scheduled-task-%d").build(),// 对于调度任务,使用DiscardPolicy,丢弃最新的任务new ThreadPoolExecutor.DiscardPolicy());}/*** 异步IO处理线程池* 适用场景:网络IO、文件IO等操作*/public static ThreadPoolExecutor createAsyncIoPool() {return new ThreadPoolExecutor(// IO密集型,线程数可以设置较大Runtime.getRuntime().availableProcessors() * 8,Runtime.getRuntime().availableProcessors() * 16,// 较长的存活时间,减少线程创建开销60, TimeUnit.SECONDS,// 使用无界队列,避免任务被拒绝// 注意:需要有流控机制,防止队列过大导致OOMnew LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("async-io-%d").build(),new ThreadPoolExecutor.CallerRunsPolicy());}/*** 优先级任务线程池* 适用场景:有优先级区分的任务处理*/public static ThreadPoolExecutor createPriorityTaskPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60, TimeUnit.SECONDS,// 使用优先级队列,按照任务优先级执行new PriorityBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("priority-task-%d").build(),new ThreadPoolExecutor.CallerRunsPolicy());}/*** 线程工厂构建器,简化线程工厂创建*/static class ThreadFactoryBuilder {private String nameFormat = null;private Boolean daemon = false;private Integer priority = Thread.NORM_PRIORITY;public ThreadFactoryBuilder setNameFormat(String nameFormat) {this.nameFormat = nameFormat;return this;}public ThreadFactoryBuilder setDaemon(boolean daemon) {this.daemon = daemon;return this;}public ThreadFactoryBuilder setPriority(int priority) {this.priority = priority;return this;}public ThreadFactory build() {return new ThreadFactory() {private final AtomicInteger count = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);if (nameFormat != null) {thread.setName(String.format(nameFormat, count.getAndIncrement()));}thread.setDaemon(daemon);thread.setPriority(priority);return thread;}};}}
}

1.3 参数设置对系统性能的影响

线程池参数设置对系统性能有显著影响,以下是一个性能测试和监控类,用于评估不同参数配置的性能表现:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;/*** 线程池性能测试与监控工具*/
public class ThreadPoolPerformanceMonitor {/*** 测试不同线程池配置的性能* * @param testName 测试名称* @param poolSupplier 线程池创建函数* @param taskCount 测试任务数量* @param taskDurationMs 每个任务的执行时间(毫秒)*/public static void testPoolPerformance(String testName, Supplier<ThreadPoolExecutor> poolSupplier,int taskCount, int taskDurationMs) {ThreadPoolExecutor pool = poolSupplier.get();// 任务计数器AtomicInteger completedTasks = new AtomicInteger(0);CountDownLatch latch = new CountDownLatch(taskCount);// 启动监控线程ScheduledExecutorService monitorService = Executors.newSingleThreadScheduledExecutor();monitorService.scheduleAtFixedRate(() -> {System.out.println(String.format("[%s] Active: %d, Pool: %d, Queue: %d, Completed: %d/%d",testName,pool.getActiveCount(),pool.getPoolSize(),pool.getQueue().size(),completedTasks.get(),taskCount));}, 0, 1, TimeUnit.SECONDS);// 记录开始时间long startTime = System.currentTimeMillis();// 提交任务for (int i = 0; i < taskCount; i++) {final int taskId = i;pool.submit(() -> {try {// 模拟任务执行Thread.sleep(taskDurationMs);completedTasks.incrementAndGet();return taskId;} catch (InterruptedException e) {Thread.currentThread().interrupt();return -1;} finally {latch.countDown();}});}try {// 等待所有任务完成latch.await();long endTime = System.currentTimeMillis();// 停止监控monitorService.shutdown();// 输出统计信息System.out.println(String.format("\n--- %s Performance Results ---", testName));System.out.println(String.format("Total time: %d ms", (endTime - startTime)));System.out.println(String.format("Throughput: %.2f tasks/second", 1000.0 * taskCount / (endTime - startTime)));System.out.println(String.format("Max pool size reached: %d", pool.getLargestPoolSize()));System.out.println(String.format("Task rejection count: %d", taskCount - completedTasks.get()));System.out.println("-------------------------------\n");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {// 关闭线程池pool.shutdown();}}/*** 主方法:测试不同配置的线程池性能*/public static void main(String[] args) {int TASK_COUNT = 1000;int TASK_DURATION_MS = 50; // 每个任务执行50ms// 测试1: 小核心线程数,大队列testPoolPerformance("Small Core Pool + Large Queue",() -> new ThreadPoolExecutor(4,4,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2000),new ThreadPoolExecutor.AbortPolicy()),TASK_COUNT,TASK_DURATION_MS);// 测试2: 大核心线程数,小队列testPoolPerformance("Large Core Pool + Small Queue",() -> new ThreadPoolExecutor(32,32,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy()),TASK_COUNT,TASK_DURATION_MS);// 测试3: 小核心线程数,大最大线程数,使用SynchronousQueuetestPoolPerformance("Dynamic Pool + SynchronousQueue",() -> new ThreadPoolExecutor(4,100,60, TimeUnit.SECONDS,new SynchronousQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy()),TASK_COUNT,TASK_DURATION_MS);// 测试4: 使用无界队列testPoolPerformance("Fixed Pool + Unbounded Queue",() -> new ThreadPoolExecutor(16,16,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.AbortPolicy()),TASK_COUNT,TASK_DURATION_MS);}
}

2. 线程池如何动态变化

2.1 如何实现线程池的动态扩缩容

在分布式系统中,工作负载可能会随时间变化,因此需要动态调整线程池参数。以下是一个动态线程池的实现:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;/*** 动态可调整的线程池实现*/
public class DynamicThreadPool extends ThreadPoolExecutor {private final ReentrantLock resizeLock = new ReentrantLock();private final AtomicLong lastResizeTime = new AtomicLong(0);private final long resizeCooldownMs; // 调整冷却时间/*** 创建动态线程池*/public DynamicThreadPool(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,long resizeCooldownMs) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.resizeCooldownMs = resizeCooldownMs;// 允许核心线程超时,便于缩容allowCoreThreadTimeOut(true);}/*** 动态调整线程池大小* * @param newCoreSize 新的核心线程数* @param newMaxSize 新的最大线程数* @return 是否调整成功*/public boolean resize(int newCoreSize, int newMaxSize) {// 参数校验if (newCoreSize < 0 || newMaxSize < newCoreSize) {throw new IllegalArgumentException("Invalid pool sizes: core=" + newCoreSize + ", max=" + newMaxSize);}// 检查冷却时间long now = System.currentTimeMillis();long lastResize = lastResizeTime.get();if (now - lastResize < resizeCooldownMs) {System.out.println("Resize operation rejected: cooling down (" + (now - lastResize) + "/" + resizeCooldownMs + "ms)");return false;}// 加锁保证调整的原子性if (resizeLock.tryLock()) {try {// 记录调整前的状态int oldCoreSize = getCorePoolSize();int oldMaxSize = getMaximumPoolSize();// 先设置最大值,再设置核心值(扩容)// 或者先设置核心值,再设置最大值(缩容)if (newMaxSize > oldMaxSize) {setMaximumPoolSize(newMaxSize);setCorePoolSize(newCoreSize);} else {setCorePoolSize(newCoreSize);setMaximumPoolSize(newMaxSize);}// 更新调整时间lastResizeTime.set(now);System.out.println("Thread pool resized: " +"core [" + oldCoreSize + " -> " + newCoreSize + "], " +"max [" + oldMaxSize + " -> " + newMaxSize + "]");return true;} finally {resizeLock.unlock();}} else {System.out.println("Resize operation rejected: another resize in progress");return false;}}/*** 获取线程池当前状态信息*/public ThreadPoolStatus getStatus() {return new ThreadPoolStatus(getCorePoolSize(),getMaximumPoolSize(),getPoolSize(),getActiveCount(),getQueue().size(),getCompletedTaskCount(),getTaskCount());}/*** 线程池状态信息类*/public static class ThreadPoolStatus {private final int corePoolSize;private final int maximumPoolSize;private final int currentPoolSize;private final int activeThreads;private final int queueSize;private final long completedTasks;private final long totalTasks;public ThreadPoolStatus(int corePoolSize,int maximumPoolSize,int currentPoolSize,int activeThreads,int queueSize,long completedTasks,long totalTasks) {this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.currentPoolSize = currentPoolSize;this.activeThreads = activeThreads;this.queueSize = queueSize;this.completedTasks = completedTasks;this.totalTasks = totalTasks;}@Overridepublic String toString() {return "ThreadPoolStatus{" +"coreSize=" + corePoolSize +", maxSize=" + maximumPoolSize +", currentSize=" + currentPoolSize +", active=" + activeThreads +", queued=" + queueSize +", completed=" + completedTasks +", total=" + totalTasks +", pending=" + (totalTasks - completedTasks) +'}';}// Getterspublic int getCorePoolSize() { return corePoolSize; }public int getMaximumPoolSize() { return maximumPoolSize; }public int getCurrentPoolSize() { return currentPoolSize; }public int getActiveThreads() { return activeThreads; }public int getQueueSize() { return queueSize; }public long getCompletedTasks() { return completedTasks; }public long getTotalTasks() { return totalTasks; }public long getPendingTasks() { return totalTasks - completedTasks; }}
}

2.2 基于负载的自适应调整机制

以下是一个基于负载的自适应线程池调整器实现:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 自适应线程池调整器* 根据系统负载和线程池指标自动调整线程池参数*/
public class AdaptiveThreadPoolTuner {private final DynamicThreadPool threadPool;private final ScheduledExecutorService scheduler;// 调整参数private final int minCoreSize;private final int maxCoreSize;private final int minMaxSize;private final int maxMaxSize;// 负载阈值private final double highLoadThreshold;  // 高负载阈值private final double lowLoadThreshold;   // 低负载阈值// 调整步长private final int corePoolSizeStep;private final int maxPoolSizeStep;// 统计数据private final AtomicInteger consecutiveHighLoads = new AtomicInteger(0);private final AtomicInteger consecutiveLowLoads = new AtomicInteger(0);/*** 创建自适应线程池调整器*/public AdaptiveThreadPoolTuner(DynamicThreadPool threadPool,int minCoreSize,int maxCoreSize,int minMaxSize,int maxMaxSize,double highLoadThreshold,double lowLoadThreshold,int corePoolSizeStep,int maxPoolSizeStep,long tuningIntervalMs) {this.threadPool = threadPool;this.minCoreSize = minCoreSize;this.maxCoreSize = maxCoreSize;this.minMaxSize = minMaxSize;this.maxMaxSize = maxMaxSize;this.highLoadThreshold = highLoadThreshold;this.lowLoadThreshold = lowLoadThreshold;this.corePoolSizeStep = corePoolSizeStep;this.maxPoolSizeStep = maxPoolSizeStep;// 创建调度器this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {Thread t = new Thread(r, "thread-pool-tuner");t.setDaemon(true);return t;});// 启动定期调整任务scheduler.scheduleAtFixedRate(this::tune, tuningIntervalMs, tuningIntervalMs, TimeUnit.MILLISECONDS);}/*** 执行线程池参数调整*/private void tune() {try {DynamicThreadPool.ThreadPoolStatus status = threadPool.getStatus();// 计算负载指标double activeRatio = calculateActiveRatio(status);double queueLoadRatio = calculateQueueLoadRatio(status);System.out.println("Current load - Active ratio: " + activeRatio + ", Queue load ratio: " + queueLoadRatio);// 根据负载决定扩容或缩容if (isHighLoad(activeRatio, queueLoadRatio)) {handleHighLoad(status);} else if (isLowLoad(activeRatio, queueLoadRatio)) {handleLowLoad(status);} else {// 负载适中,重置计数器consecutiveHighLoads.set(0);consecutiveLowLoads.set(0);}} catch (Exception e) {System.err.println("Error during thread pool tuning: " + e.getMessage());e.printStackTrace();}}/*** 计算活跃线程比例*/private double calculateActiveRatio(DynamicThreadPool.ThreadPoolStatus status) {int currentSize = status.getCurrentPoolSize();if (currentSize == 0) return 0;return (double) status.getActiveThreads() / currentSize;}/*** 计算队列负载比例*/private double calculateQueueLoadRatio(DynamicThreadPool.ThreadPoolStatus status) {BlockingQueue<Runnable> queue = threadPool.getQueue();if (queue instanceof ArrayBlockingQueue) {int capacity = ((ArrayBlockingQueue<Runnable>) queue).remainingCapacity() + queue.size();if (capacity == 0) return 0;return (double) queue.size() / capacity;}// 对于无界队列,使用任务处理速率作为指标long completed = status.getCompletedTasks();long total = status.getTotalTasks();if (total == 0) return 0;return (double) (total - completed) / total;}/*** 判断是否为高负载*/private boolean isHighLoad(double activeRatio, double queueLoadRatio) {return activeRatio > highLoadThreshold || queueLoadRatio > highLoadThreshold;}/*** 判断是否为低负载*/private boolean isLowLoad(double activeRatio, double queueLoadRatio) {return activeRatio < lowLoadThreshold && queueLoadRatio < lowLoadThreshold;}/*** 处理高负载情况*/private void handleHighLoad(DynamicThreadPool.ThreadPoolStatus status) {int consecutiveCount = consecutiveHighLoads.incrementAndGet();consecutiveLowLoads.set(0);// 连续多次高负载才扩容,避免短期波动导致频繁调整if (consecutiveCount >= 3) {int currentCore = status.getCorePoolSize();int currentMax = status.getMaximumPoolSize();int newCoreSize = Math.min(currentCore + corePoolSizeStep, maxCoreSize);int newMaxSize = Math.min(currentMax + maxPoolSizeStep, maxMaxSize);if (newCoreSize > currentCore || newMaxSize > currentMax) {System.out.println("High load detected for " + consecutiveCount + " consecutive checks, scaling up thread pool");threadPool.resize(newCoreSize, newMaxSize);consecutiveHighLoads.set(0);}}}/*** 处理低负载情况*/private void handleLowLoad(DynamicThreadPool.ThreadPoolStatus status) {int consecutiveCount = consecutiveLowLoads.incrementAndGet();consecutiveHighLoads.set(0);// 连续多次低负载才缩容,避免短期波动导致频繁调整if (consecutiveCount >= 5) {  // 缩容需要更多的连续低负载检测int currentCore = status.getCorePoolSize();int currentMax = status.getMaximumPoolSize();int newCoreSize = Math.max(currentCore - corePoolSizeStep, minCoreSize);int newMaxSize = Math.max(currentMax - maxPoolSizeStep, minMaxSize);if (newCoreSize < currentCore || newMaxSize < currentMax) {System.out.println("Low load detected for " + consecutiveCount + " consecutive checks, scaling down thread pool");threadPool.resize(newCoreSize, newMaxSize);consecutiveLowLoads.set(0);}}}/*** 关闭调整器*/public void shutdown() {scheduler.shutdown();}/*** 使用示例*/public static void main(String[] args) throws InterruptedException {// 创建动态线程池DynamicThreadPool pool = new DynamicThreadPool(4,  // 初始核心线程数8,  // 初始最大线程数60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new ThreadFactory() {private final AtomicInteger counter = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "adaptive-pool-" + counter.getAndIncrement());}},new ThreadPoolExecutor.CallerRunsPolicy(),5000  // 5秒调整冷却时间);// 创建自适应调整器AdaptiveThreadPoolTuner tuner = new AdaptiveThreadPoolTuner(pool,2,    // 最小核心线程数32,   // 最大核心线程数4,    // 最小最大线程数64,   // 最大最大线程数0.7,  // 高负载阈值 (70%)0.3,  // 低负载阈值 (30%)2,    // 核心线程数调整步长4,    // 最大线程数调整步长10000 // 10秒调整间隔);// 模拟负载变化simulateWorkload(pool);// 等待一段时间后关闭Thread.sleep(5 * 60 * 1000);  // 5分钟tuner.shutdown();pool.shutdown();}/*** 模拟工作负载变化*/private static void simulateWorkload(ExecutorService pool) {// 创建负载生成器线程Thread loadGenerator = new Thread(() -> {try {// 模拟波动的工作负载while (!Thread.currentThread().isInterrupted()) {// 高负载阶段System.out.println("Generating high workload...");generateTasks(pool, 500, 50, 100);Thread.sleep(30000);  // 30秒高负载// 中等负载阶段System.out.println("Generating medium workload...");generateTasks(pool, 200, 30, 80);Thread.sleep(30000);  // 30秒中等负载// 低负载阶段System.out.println("Generating low workload...");generateTasks(pool, 50, 20, 50);Thread.sleep(30000);  // 30秒低负载}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});loadGenerator.setDaemon(true);loadGenerator.start();}/*** 生成指定数量的任务*/private static void generateTasks(ExecutorService pool, int taskCount, int minDurationMs, int maxDurationMs) {for (int i = 0; i < taskCount; i++) {pool.submit(() -> {try {// 随机任务执行时间int duration = minDurationMs + ThreadLocalRandom.current().nextInt(maxDurationMs - minDurationMs);Thread.sleep(duration);return duration;} catch (InterruptedException e) {Thread.currentThread().interrupt();return -1;}});// 控制任务提交速率try {Thread.sleep(ThreadLocalRandom.current().nextInt(10, 50));} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}
}

2.3 监控指标与调整策略的关联

以下是一个线程池监控系统的实现,它收集关键指标并提供可视化和告警功能:

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;/*** 线程池监控系统* 收集线程池指标,提供监控和告警功能*/
public class ThreadPoolMonitor {private final Map<String, MonitoredThreadPool> monitoredPools = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler;private final List<ThreadPoolMetricsListener> listeners = new CopyOnWriteArrayList<>();// 指标收集间隔private final long metricsIntervalMs;/*** 创建线程池监控系统* * @param metricsIntervalMs 指标收集间隔(毫秒)*/public ThreadPoolMonitor(long metricsIntervalMs) {this.metricsIntervalMs = metricsIntervalMs;// 创建调度器this.scheduler = Executors.newScheduledThreadPool(1, r -> {Thread t = new Thread(r, "thread-pool-monitor");t.setDaemon(true);return t;});// 启动定期指标收集任务scheduler.scheduleAtFixedRate(this::collectMetrics,metricsIntervalMs,metricsIntervalMs,TimeUnit.MILLISECONDS);}/*** 注册线程池进行监控* * @param name 线程池名称* @param pool 线程池实例* @return 包装后的线程池*/public <T extends Runnable> ThreadPoolExecutor register(String name, ThreadPoolExecutor pool) {MonitoredThreadPool monitoredPool = new MonitoredThreadPool(name, pool);monitoredPools.put(name, monitoredPool);return monitoredPool;}/*** 添加指标监听器*/public void addListener(ThreadPoolMetricsListener listener) {listeners.add(listener);}/*** 收集所有线程池的指标*/private void collectMetrics() {LocalDateTime timestamp = LocalDateTime.now();for (MonitoredThreadPool pool : monitoredPools.values()) {ThreadPoolMetrics metrics = pool.collectMetrics();metrics.setTimestamp(timestamp);// 通知所有监听器for (ThreadPoolMetricsListener listener : listeners) {try {listener.onMetrics(metrics);} catch (Exception e) {System.err.println("Error notifying metrics listener: " + e.getMessage());}}}}/*** 关闭监控系统*/public void shutdown() {scheduler.shutdown();}/*** 线程池指标监听器接口*/public interface ThreadPoolMetricsListener {void onMetrics(ThreadPoolMetrics metrics);}/*** 线程池指标数据类*/public static class ThreadPoolMetrics {private String poolName;private LocalDateTime timestamp;private int corePoolSize;private int maximumPoolSize;private int poolSize;private int activeThreads;private int queueSize;private int queueRemainingCapacity;private long completedTaskCount;private long totalTaskCount;private double taskRejectionRate;private double averageTaskWaitTimeMs;private double averageTaskExecutionTimeMs;private double throughputTasksPerSecond;public ThreadPoolMetrics(String poolName) {this.poolName = poolName;}@Overridepublic String toString() {DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");return String.format("[%s] %s - Pool: %d/%d/%d (active/size/max), " +"Queue: %d/%d, Tasks: %d/%d (completed/total), " +"Throughput: %.2f tasks/sec, Avg Wait: %.2f ms, Avg Exec: %.2f ms",poolName,timestamp.format(formatter),activeThreads,poolSize,maximumPoolSize,queueSize,queueSize + queueRemainingCapacity,completedTaskCount,totalTaskCount,throughputTasksPerSecond,averageTaskWaitTimeMs,averageTaskExecutionTimeMs);}// Getters and Setterspublic String getPoolName() { return poolName; }public void setPoolName(String poolName) { this.poolName = poolName; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public int getCorePoolSize() { return corePoolSize; }public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; }public int getMaximumPoolSize() { return maximumPoolSize; }public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; }public int getPoolSize() { return poolSize; }public void setPoolSize(int poolSize) { this.poolSize = poolSize; }public int getActiveThreads() { return activeThreads; }public void setActiveThreads(int activeThreads) { this.activeThreads = activeThreads; }public int getQueueSize() { return queueSize; }public void setQueueSize(int queueSize) { this.queueSize = queueSize; }public int getQueueRemainingCapacity() { return queueRemainingCapacity; }public void setQueueRemainingCapacity(int queueRemainingCapacity) { this.queueRemainingCapacity = queueRemainingCapacity; }public long getCompletedTaskCount() { return completedTaskCount; }public void setCompletedTaskCount(long completedTaskCount) { this.completedTaskCount = completedTaskCount; }public long getTotalTaskCount() { return totalTaskCount; }public void setTotalTaskCount(long totalTaskCount) { this.totalTaskCount = totalTaskCount; }public double getTaskRejectionRate() { return taskRejectionRate; }public void setTaskRejectionRate(double taskRejectionRate) { this.taskRejectionRate = taskRejectionRate; }public double getAverageTaskWaitTimeMs() { return averageTaskWaitTimeMs; }public void setAverageTaskWaitTimeMs(double averageTaskWaitTimeMs) { this.averageTaskWaitTimeMs = averageTaskWaitTimeMs; }public double getAverageTaskExecutionTimeMs() { return averageTaskExecutionTimeMs; }public void setAverageTaskExecutionTimeMs(double averageTaskExecutionTimeMs) { this.averageTaskExecutionTimeMs = averageTaskExecutionTimeMs; }public double getThroughputTasksPerSecond() { return throughputTasksPerSecond; }public void setThroughputTasksPerSecond(double throughputTasksPerSecond) { this.throughputTasksPerSecond = throughputTasksPerSecond; }}/*** 被监控的线程池包装类*/private static class MonitoredThreadPool extends ThreadPoolExecutor {private final String poolName;private final ThreadPoolExecutor delegate;// 指标收集private final AtomicLong totalTasksSubmitted = new AtomicLong(0);private final AtomicLong totalTasksRejected = new AtomicLong(0);private final ConcurrentHashMap<Runnable, Long> taskSubmitTimes = new ConcurrentHashMap<>();private final ConcurrentHashMap<Runnable, Long> taskStartTimes = new ConcurrentHashMap<>();private final Queue<Long> recentWaitTimes = new ConcurrentLinkedQueue<>();private final Queue<Long> recentExecutionTimes = new ConcurrentLinkedQueue<>();private final int maxRecentTimeSamples = 100;// 上次指标收集时间和完成任务数private long lastCollectionTimeMs = System.currentTimeMillis();private long lastCompletedTaskCount = 0;public MonitoredThreadPool(String poolName, ThreadPoolExecutor delegate) {super(delegate.getCorePoolSize(),delegate.getMaximumPoolSize(),delegate.getKeepAliveTime(TimeUnit.MILLISECONDS),TimeUnit.MILLISECONDS,delegate.getQueue(),delegate.getThreadFactory(),delegate.getRejectedExecutionHandler());this.poolName = poolName;this.delegate = delegate;}@Overridepublic void execute(Runnable command) {totalTasksSubmitted.incrementAndGet();long submitTime = System.currentTimeMillis();taskSubmitTimes.put(command, submitTime);try {delegate.execute(wrapTask(command));} catch (RejectedExecutionException e) {totalTasksRejected.incrementAndGet();taskSubmitTimes.remove(command);throw e;}}/*** 包装任务,用于收集执行时间指标*/private Runnable wrapTask(Runnable command) {return () -> {long startTime = System.currentTimeMillis();taskStartTimes.put(command, startTime);// 计算等待时间Long submitTime = taskSubmitTimes.remove(command);if (submitTime != null) {long waitTime = startTime - submitTime;addRecentWaitTime(waitTime);}try {command.run();} finally {// 计算执行时间long endTime = System.currentTimeMillis();Long taskStartTime = taskStartTimes.remove(command);if (taskStartTime != null) {long executionTime = endTime - taskStartTime;addRecentExecutionTime(executionTime);}}};}/*** 添加最近的等待时间样本*/private synchronized void addRecentWaitTime(long waitTimeMs) {recentWaitTimes.add(waitTimeMs);while (recentWaitTimes.size() > maxRecentTimeSamples) {recentWaitTimes.poll();}}/*** 添加最近的执行时间样本*/private synchronized void addRecentExecutionTime(long executionTimeMs) {recentExecutionTimes.add(executionTimeMs);while (recentExecutionTimes.size() > maxRecentTimeSamples) {recentExecutionTimes.poll();}}/*** 收集线程池指标*/public ThreadPoolMetrics collectMetrics() {ThreadPoolMetrics metrics = new ThreadPoolMetrics(poolName);// 基本线程池指标metrics.setCorePoolSize(delegate.getCorePoolSize());metrics.setMaximumPoolSize(delegate.getMaximumPoolSize());metrics.setPoolSize(delegate.getPoolSize());metrics.setActiveThreads(delegate.getActiveCount());// 队列指标BlockingQueue<Runnable> queue = delegate.getQueue();metrics.setQueueSize(queue.size());metrics.setQueueRemainingCapacity(queue.remainingCapacity());// 任务指标long currentCompletedTasks = delegate.getCompletedTaskCount();metrics.setCompletedTaskCount(currentCompletedTasks);metrics.setTotalTaskCount(delegate.getTaskCount());// 计算拒绝率long totalSubmitted = totalTasksSubmitted.get();long totalRejected = totalTasksRejected.get();metrics.setTaskRejectionRate(totalSubmitted > 0 ? (double) totalRejected / totalSubmitted : 0);// 计算平均等待时间metrics.setAverageTaskWaitTimeMs(calculateAverage(recentWaitTimes));// 计算平均执行时间metrics.setAverageTaskExecutionTimeMs(calculateAverage(recentExecutionTimes));// 计算吞吐量long currentTimeMs = System.currentTimeMillis();long timeDiffMs = currentTimeMs - lastCollectionTimeMs;if (timeDiffMs > 0) {long completedDiff = currentCompletedTasks - lastCompletedTaskCount;metrics.setThroughputTasksPerSecond((double) completedDiff * 1000 / timeDiffMs);}// 更新上次收集时间和完成任务数lastCollectionTimeMs = currentTimeMs;lastCompletedTaskCount = currentCompletedTasks;return metrics;}/*** 计算队列中数值的平均值*/private double calculateAverage(Queue<Long> values) {if (values.isEmpty()) {return 0;}long sum = 0;for (Long value : values) {sum += value;}return (double) sum / values.size();}// 委托其他方法给原始线程池@Overridepublic void shutdown() { delegate.shutdown(); }@Overridepublic List<Runnable> shutdownNow() { return delegate.shutdownNow(); }@Overridepublic boolean isShutdown() { return delegate.isShutdown(); }@Overridepublic boolean isTerminated() { return delegate.isTerminated(); }@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return delegate.awaitTermination(timeout, unit);}@Overridepublic void setCorePoolSize(int corePoolSize) { delegate.setCorePoolSize(corePoolSize); }@Overridepublic void setMaximumPoolSize(int maximumPoolSize) { delegate.setMaximumPoolSize(maximumPoolSize); }@Overridepublic void setKeepAliveTime(long time, TimeUnit unit) { delegate.setKeepAliveTime(time, unit); }@Overridepublic void allowCoreThreadTimeOut(boolean value) { delegate.allowCoreThreadTimeOut(value); }}/*** 控制台指标打印监听器*/public static class ConsoleMetricsListener implements ThreadPoolMetricsListener {@Overridepublic void onMetrics(ThreadPoolMetrics metrics) {System.out.println(metrics);}}/*** 告警监听器*/public static class AlertingMetricsListener implements ThreadPoolMetricsListener {private final double highUtilizationThreshold;private final double highQueueUtilizationThreshold;private final double highRejectionRateThreshold;public AlertingMetricsListener(double highUtilizationThreshold,double highQueueUtilizationThreshold,double highRejectionRateThreshold) {this.highUtilizationThreshold = highUtilizationThreshold;this.highQueueUtilizationThreshold = highQueueUtilizationThreshold;this.highRejectionRateThreshold = highRejectionRateThreshold;}@Overridepublic void onMetrics(ThreadPoolMetrics metrics) {// 检查线程池利用率double utilization = (double) metrics.getActiveThreads() / metrics.getPoolSize();if (utilization >= highUtilizationThreshold) {System.err.println(String.format("ALERT: High thread pool utilization (%.2f) for %s",utilization,metrics.getPoolName()));}// 检查队列利用率int totalQueueCapacity = metrics.getQueueSize() + metrics.getQueueRemainingCapacity();if (totalQueueCapacity > 0) {double queueUtilization = (double) metrics.getQueueSize() / totalQueueCapacity;if (queueUtilization >= highQueueUtilizationThreshold) {System.err.println(String.format("ALERT: High queue utilization (%.2f) for %s",queueUtilization,metrics.getPoolName()));}}// 检查任务拒绝率if (metrics.getTaskRejectionRate() >= highRejectionRateThreshold) {System.err.println(String.format("ALERT: High task rejection rate (%.2f) for %s",metrics.getTaskRejectionRate(),metrics.getPoolName()));}}}/*** 使用示例*/public static void main(String[] args) throws InterruptedException {// 创建线程池ThreadPoolExecutor pool1 = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy());ThreadPoolExecutor pool2 = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50),new ThreadPoolExecutor.CallerRunsPolicy());// 创建监控系统ThreadPoolMonitor monitor = new ThreadPoolMonitor(5000);  // 5秒收集一次指标// 添加监听器monitor.addListener(new ConsoleMetricsListener());monitor.addListener(new AlertingMetricsListener(0.8, 0.7, 0.1));// 注册线程池ThreadPoolExecutor monitoredPool1 = monitor.register("service-pool", pool1);ThreadPoolExecutor monitoredPool2 = monitor.register("background-pool", pool2);// 模拟工作负载simulateWorkload(monitoredPool1, monitoredPool2);// 运行一段时间后关闭Thread.sleep(60000);  // 1分钟monitor.shutdown();monitoredPool1.shutdown();monitoredPool2.shutdown();}/*** 模拟工作负载*/private static void simulateWorkload(ExecutorService servicePool, ExecutorService backgroundPool) {// 创建负载生成器线程Thread loadGenerator = new Thread(() -> {try {Random random = new Random();while (!Thread.currentThread().isInterrupted()) {// 提交服务任务for (int i = 0; i < 20; i++) {final int taskId = i;servicePool.submit(() -> {try {// 模拟服务任务Thread.sleep(100 + random.nextInt(200));return "Service task " + taskId + " completed";} catch (InterruptedException e) {Thread.currentThread().interrupt();return null;}});}// 提交后台任务for (int i = 0; i < 5; i++) {final int taskId = i;backgroundPool.submit(() -> {try {// 模拟后台任务,执行时间较长Thread.sleep(500 + random.nextInt(1000));return "Background task " + taskId + " completed";} catch (InterruptedException e) {Thread.currentThread().interrupt();return null;}});}// 等待一段时间后再次生成负载Thread.sleep(2000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});loadGenerator.setDaemon(true);loadGenerator.start();}
}

3. 如何获取任务的执行结果

3.1 同步与异步结果获取机制

在分布式任务调度系统中,获取任务执行结果是一个关键功能。以下是同步和异步结果获取的实现:

import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;/*** 任务结果获取工具类* 提供同步和异步获取任务结果的方法*/
public class TaskResultFetcher {/*** 同步获取任务结果* * @param future 任务的Future对象* @param timeout 超时时间* @param timeUnit 时间单位* @param <T> 结果类型* @return 任务结果* @throws TimeoutException 超时异常* @throws ExecutionException 执行异常* @throws InterruptedException 中断异常*/public static <T> T getResultSync(Future<T> future, long timeout, TimeUnit timeUnit)throws TimeoutException, ExecutionException, InterruptedException {return future.get(timeout, timeUnit);}/*** 同步获取任务结果,带有默认值* * @param future 任务的Future对象* @param timeout 超时时间* @param timeUnit 时间单位* @param defaultValue 默认值* @param <T> 结果类型* @return 任务结果,如果发生异常则返回默认值*/public static <T> T getResultSyncWithDefault(Future<T> future, long timeout, TimeUnit timeUnit, T defaultValue) {try {return future.get(timeout, timeUnit);} catch (Exception e) {System.err.println("Error getting result: " + e.getMessage());return defaultValue;}}/*** 异步获取任务结果* * @param future 任务的Future对象* @param callback 结果回调函数* @param errorHandler 异常处理函数* @param <T> 结果类型*/public static <T> void getResultAsync(Future<T> future, Consumer<T> callback, Consumer<Throwable> errorHandler) {CompletableFuture.runAsync(() -> {try {T result = future.get();callback.accept(result);} catch (Throwable e) {errorHandler.accept(e);}});}/*** 异步获取任务结果,带有超时* * @param future 任务的Future对象* @param timeout 超时时间* @param timeUnit 时间单位* @param callback 结果回调函数* @param errorHandler 异常处理函数* @param <T> 结果类型*/public static <T> void getResultAsyncWithTimeout(Future<T> future, long timeout, TimeUnit timeUnit,Consumer<T> callback, Consumer<Throwable> errorHandler) {CompletableFuture.runAsync(() -> {try {T result = future.get(timeout, timeUnit);callback.accept(result);} catch (Throwable e) {errorHandler.accept(e);}});}/*** 批量获取任务结果* * @param futures 任务Future列表* @param timeout 每个任务的超时时间* @param timeUnit 时间单位* @param <T> 结果类型* @return 结果列表*/public static <T> CompletableFuture<T>[] getResultsAsync(Future<T>[] futures, long timeout, TimeUnit timeUnit) {@SuppressWarnings("unchecked")CompletableFuture<T>[] completableFutures = new CompletableFuture[futures.length];for (int i = 0; i < futures.length; i++) {final Future<T> future = futures[i];completableFutures[i] = CompletableFuture.supplyAsync(() -> {try {return future.get(timeout, timeUnit);} catch (Exception e) {throw new CompletionException(e);}});}return completableFutures;}/*** 使用示例*/public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(4);// 提交一些任务Future<String> future1 = executor.submit(() -> {Thread.sleep(1000);return "Task 1 Result";});Future<String> future2 = executor.submit(() -> {Thread.sleep(2000);return "Task 2 Result";});Future<String> future3 = executor.submit(() -> {Thread.sleep(500);throw new RuntimeException("Task 3 Failed");});// 同步获取结果try {String result1 = getResultSync(future1, 2000, TimeUnit.MILLISECONDS);System.out.println("Sync result 1: " + result1);} catch (Exception e) {System.err.println("Error getting result 1: " + e.getMessage());}// 同步获取结果,带有默认值String result2 = getResultSyncWithDefault(future2, 1000, TimeUnit.MILLISECONDS, "Default Result");System.out.println("Sync result 2 (with default): " + result2);// 异步获取结果getResultAsync(future3,result -> System.out.println("Async result 3: " + result),error -> System.err.println("Async error 3: " + error.getMessage()));// 等待异步回调完成Thread.sleep(1000);// 批量获取结果@SuppressWarnings("unchecked")Future<String>[] futures = new Future[]{future1, future2, future3};CompletableFuture<String>[] results = getResultsAsync(futures, 3000, TimeUnit.MILLISECONDS);// 等待所有结果完成CompletableFuture.allOf(results).exceptionally(ex -> null).join();// 处理结果for (int i = 0; i < results.length; i++) {CompletableFuture<String> result = results[i];try {System.out.println("Batch result " + (i + 1) + ": " + result.join());} catch (Exception e) {System.err.println("Batch error " + (i + 1) + ": " + e.getMessage());}}executor.shutdown();}
}

3.2 Future、CompletableFuture 等结果获取方式的实现

以下是使用 Future 和 CompletableFuture 获取任务结果的更高级实现:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;/*** 高级任务结果获取工具* 提供基于Future和CompletableFuture的结果获取方法*/
public class AdvancedTaskResultFetcher {/*** 使用CompletableFuture包装Future* * @param future 原始Future* @param <T> 结果类型* @return CompletableFuture*/public static <T> CompletableFuture<T> fromFuture(Future<T> future) {return CompletableFuture.supplyAsync(() -> {try {return future.get();} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new CompletionException(e);} catch (ExecutionException e) {throw new CompletionException(e.getCause());}});}/*** 使用CompletableFuture包装Future,带有超时* * @param future 原始Future* @param timeout 超时时间* @param unit 时间单位* @param <T> 结果类型* @return CompletableFuture*/public static <T> CompletableFuture<T> fromFutureWithTimeout(Future<T> future, long timeout, TimeUnit unit) {return CompletableFuture.supplyAsync(() -> {try {return future.get(timeout, unit);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new CompletionException(e);} catch (ExecutionException e) {throw new CompletionException(e.getCause());} catch (TimeoutException e) {future.cancel(true);throw new CompletionException(e);}});}/*** 批量转换Future为CompletableFuture* * @param futures Future列表* @param <T> 结果类型* @return CompletableFuture列表*/public static <T> List<CompletableFuture<T>> fromFutures(List<Future<T>> futures) {return futures.stream().map(AdvancedTaskResultFetcher::fromFuture).collect(Collectors.toList());}/*** 等待所有任务完成,返回所有成功的结果* * @param futures Future列表* @param <T> 结果类型* @return 成功的结果列表*/public static <T> CompletableFuture<List<T>> allSuccessful(List<CompletableFuture<T>> futures) {return CompletableFuture.supplyAsync(() -> {List<T> results = new ArrayList<>();for (CompletableFuture<T> future : futures) {try {results.add(future.join());} catch (Exception e) {// 忽略失败的任务}}return results;});}/*** 等待任意一个任务成功完成* * @param futures Future列表* @param <T> 结果类型* @return 第一个成功的结果*/public static <T> CompletableFuture<T> anySuccessful(List<CompletableFuture<T>> futures) {CompletableFuture<T> result = new CompletableFuture<>();for (CompletableFuture<T> future : futures) {future.thenAccept(result::complete).exceptionally(ex -> null);}return result;}/*** 带有超时的任务执行* * @param callable 任务* @param timeout 超时时间* @param unit 时间单位* @param <T> 结果类型* @return CompletableFuture*/public static <T> CompletableFuture<T> withTimeout(Callable<T> callable, long timeout, TimeUnit unit) {CompletableFuture<T> result = new CompletableFuture<>();// 创建执行任务的线程Thread taskThread = new Thread(() -> {try {T value = callable.call();result.complete(value);} catch (Exception e) {result.completeExceptionally(e);}});// 创建超时任务ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.schedule(() -> {if (!result.isDone()) {result.completeExceptionally(new TimeoutException("Task timed out after " + timeout + " " + unit));taskThread.interrupt();}scheduler.shutdown();}, timeout, unit);// 启动任务taskThread.start();return result;}/*** 链式任务执行* * @param initialTask 初始任务* @param nextTasks 后续任务链* @param <T> 结果类型* @return CompletableFuture*/@SafeVarargspublic static <T> CompletableFuture<T> chainTasks(Callable<T> initialTask, Function<T, CompletableFuture<T>>... nextTasks) {CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {try {return initialTask.call();} catch (Exception e) {throw new CompletionException(e);}});for (Function<T, CompletableFuture<T>> nextTask : nextTasks) {future = future.thenCompose(nextTask);}return future;}/*** 使用示例*/public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(4);// 创建一些任务Future<String> future1 = executor.submit(() -> {Thread.sleep(1000);return "Task 1 Result";});Future<String> future2 = executor.submit(() -> {Thread.sleep(2000);return "Task 2 Result";});Future<String> future3 = executor.submit(() -> {Thread.sleep(500);throw new RuntimeException("Task 3 Failed");});// 将Future转换为CompletableFutureCompletableFuture<String> cf1 = fromFuture(future1);CompletableFuture<String> cf2 = fromFutureWithTimeout(future2, 3000, TimeUnit.MILLISECONDS);CompletableFuture<String> cf3 = fromFuture(future3);// 组合多个CompletableFutureList<CompletableFuture<String>> futures = new ArrayList<>();futures.add(cf1);futures.add(cf2);futures.add(cf3);// 等待所有成功的任务CompletableFuture<List<String>> allSuccessful = allSuccessful(futures);allSuccessful.thenAccept(results -> {System.out.println("All successful results: " + results);});// 等待任意一个成功的任务CompletableFuture<String> anySuccessful = anySuccessful(futures);anySuccessful.thenAccept(result -> {System.out.println("First successful result: " + result);});// 带有超时的任务CompletableFuture<String> timeoutTask = withTimeout(() -> {Thread.sleep(2000);return "Timeout Task Result";}, 1500, TimeUnit.MILLISECONDS);timeoutTask.handle((result, ex) -> {if (ex != null) {System.err.println("Timeout task error: " + ex.getMessage());} else {System.out.println("Timeout task result: " + result);}return null;});// 链式任务CompletableFuture<String> chainedTask = chainTasks(() -> "Initial Result",result -> CompletableFuture.supplyAsync(() -> result + " -> Step 1"),result -> CompletableFuture.supplyAsync(() -> result + " -> Step 2"),result -> CompletableFuture.supplyAsync(() -> result + " -> Final"));chainedTask.thenAccept(result -> {System.out.println("Chained task result: " + result);});// 等待所有任务完成Thread.sleep(5000);executor.shutdown();}
}

3.3 结果聚合与处理的代码实现

以下是任务结果聚合与处理的实现:

import java.util.*;
import java.util.concurrent.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;/*** 任务结果聚合处理器* 提供各种结果聚合和处理方法*/
public class TaskResultAggregator {/*** 聚合多个任务的结果* * @param futures 任务Future列表* @param aggregator 聚合函数* @param initialValue 初始值* @param <T> 任务结果类型* @param <R> 聚合结果类型* @return 聚合后的结果*/public static <T, R> CompletableFuture<R> aggregateResults(List<CompletableFuture<T>> futures,BiFunction<R, T, R> aggregator,R initialValue) {return CompletableFuture.supplyAsync(() -> {R result = initialValue;for (CompletableFuture<T> future : futures) {try {T value = future.join();result = aggregator.apply(result, value);} catch (Exception e) {// 忽略失败的任务}}return result;});}/*** 聚合多个任务的结果,带有超时* * @param futures 任务Future列表* @param aggregator 聚合函数* @param initialValue 初始值* @param timeout 超时时间* @param unit 时间单位* @param <T> 任务结果类型* @param <R> 聚合结果类型* @return 聚合后的结果*/public static <T, R> CompletableFuture<R> aggregateResultsWithTimeout(List<CompletableFuture<T>> futures,BiFunction<R, T, R> aggregator,R initialValue,long timeout,TimeUnit unit) {// 创建超时任务CompletableFuture<Object> timeoutFuture = new CompletableFuture<>();ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.schedule(() -> {timeoutFuture.complete(null);scheduler.shutdown();}, timeout, unit);// 创建聚合任务CompletableFuture<R> aggregationFuture = CompletableFuture.supplyAsync(() -> {R result = initialValue;for (CompletableFuture<T> future : futures) {try {// 等待任务完成或超时CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future, timeoutFuture);anyOf.join();// 如果任务已完成,则聚合结果if (future.isDone() && !future.isCompletedExceptionally()) {T value = future.join();result = aggregator.apply(result, value);
}}}return result;});}/*** 分组处理任务结果* * @param futures 任务Future列表* @param classifier 分类函数* @param <T> 任务结果类型* @param <K> 分组键类型* @return 分组后的结果*/public static <T, K> CompletableFuture<Map<K, List<T>>> groupResults(List<CompletableFuture<T>> futures,Function<T, K> classifier) {return CompletableFuture.supplyAsync(() -> {Map<K, List<T>> groups = new HashMap<>();for (CompletableFuture<T> future : futures) {try {T value = future.join();K key = classifier.apply(value);groups.computeIfAbsent(key, k -> new ArrayList<>()).add(value);} catch (Exception e) {// 忽略失败的任务}}return groups;});}/*** 过滤任务结果* * @param futures 任务Future列表* @param predicate 过滤条件* @param <T> 任务结果类型* @return 过滤后的结果列表*/public static <T> CompletableFuture<List<T>> filterResults(List<CompletableFuture<T>> futures,java.util.function.Predicate<T> predicate) {return CompletableFuture.supplyAsync(() -> {List<T> filteredResults = new ArrayList<>();for (CompletableFuture<T> future : futures) {try {T value = future.join();if (predicate.test(value)) {filteredResults.add(value);}} catch (Exception e) {// 忽略失败的任务}}return filteredResults;});}/*** 映射任务结果* * @param futures 任务Future列表* @param mapper 映射函数* @param <T> 任务结果类型* @param <R> 映射结果类型* @return 映射后的结果列表*/public static <T, R> CompletableFuture<List<R>> mapResults(List<CompletableFuture<T>> futures,Function<T, R> mapper) {return CompletableFuture.supplyAsync(() -> {List<R> mappedResults = new ArrayList<>();for (CompletableFuture<T> future : futures) {try {T value = future.join();R mappedValue = mapper.apply(value);mappedResults.add(mappedValue);} catch (Exception e) {// 忽略失败的任务}}return mappedResults;});}/*** 获取第一个成功的结果* * @param futures 任务Future列表* @param <T> 任务结果类型* @return 第一个成功的结果*/public static <T> CompletableFuture<T> firstSuccessful(List<CompletableFuture<T>> futures) {CompletableFuture<T> result = new CompletableFuture<>();// 计数器,用于跟踪失败的任务数量AtomicInteger failureCount = new AtomicInteger(0);for (CompletableFuture<T> future : futures) {future.whenComplete((value, ex) -> {if (ex == null) {// 成功完成,设置结果result.complete(value);} else {// 失败,检查是否所有任务都失败了if (failureCount.incrementAndGet() == futures.size()) {result.completeExceptionally(new CompletionException("All tasks failed", ex));}}});}return result;}/*** 使用示例*/public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(4);// 创建一些任务List<CompletableFuture<Integer>> futures = new ArrayList<>();for (int i = 0; i < 10; i++) {final int value = i;CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(100 * value);if (value == 7) {throw new RuntimeException("Task " + value + " failed");}return value;} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new CompletionException(e);}}, executor);futures.add(future);}// 聚合结果:计算所有成功任务的结果之和CompletableFuture<Integer> sum = aggregateResults(futures,(acc, value) -> acc + value,0);sum.thenAccept(result -> {System.out.println("Sum of all successful tasks: " + result);});// 分组结果:按照奇偶性分组CompletableFuture<Map<String, List<Integer>>> groups = groupResults(futures,value -> value % 2 == 0 ? "Even" : "Odd");groups.thenAccept(result -> {System.out.println("Grouped results: " + result);});// 过滤结果:只保留大于5的结果CompletableFuture<List<Integer>> filtered = filterResults(futures,value -> value > 5);filtered.thenAccept(result -> {System.out.println("Filtered results (> 5): " + result);});// 映射结果:将每个结果乘以2CompletableFuture<List<Integer>> mapped = mapResults(futures,value -> value * 2);mapped.thenAccept(result -> {System.out.println("Mapped results (x2): " + result);});// 获取第一个成功的结果CompletableFuture<Integer> first = firstSuccessful(futures);first.thenAccept(result -> {System.out.println("First successful result: " + result);});// 等待所有处理完成Thread.sleep(2000);executor.shutdown();}
}

4. 任务失败时的处理机制

4.1 异常捕获与处理流程

在分布式任务调度系统中,任务失败是不可避免的,因此需要有完善的异常处理机制:

import java.util.concurrent.*;
import java.util.function.Function;/*** 任务异常处理器* 提供各种异常处理策略*/
public class TaskExceptionHandler {/*** 使用try-catch包装任务,捕获异常* * @param task 原始任务* @param <T> 结果类型* @return 包装后的任务*/public static <T> Callable<T> withExceptionHandling(Callable<T> task) {return () -> {try {return task.call();} catch (Exception e) {System.err.println("Task failed: " + e.getMessage());throw e;}};}/*** 使用try-catch包装任务,提供默认值* * @param task 原始任务* @param defaultValue 默认值* @param <T> 结果类型* @return 包装后的任务*/public static <T> Callable<T> withDefaultValue(Callable<T> task, T defaultValue) {return () -> {try {return task.call();} catch (Exception e) {System.err.println("Task failed, using default value: " + e.getMessage());return defaultValue;}};}/*** 使用try-catch包装任务,提供异常转换* * @param task 原始任务* @param exceptionMapper 异常转换函数* @param <T> 结果类型* @return 包装后的任务*/public static <T> Callable<T> withExceptionMapping(Callable<T> task,Function<Exception, ? extends Exception> exceptionMapper) {return () -> {try {return task.call();} catch (Exception e) {throw exceptionMapper.apply(e);}};}/*** 使用try-catch包装任务,记录异常并重新抛出* * @param task 原始任务* @param logger 日志记录器* @param <T> 结果类型* @return 包装后的任务*/public static <T> Callable<T> withLogging(Callable<T> task, ExceptionLogger logger) {return () -> {try {return task.call();} catch (Exception e) {logger.log(e);throw e;}};}/*** 异常日志记录器接口*/public interface ExceptionLogger {void log(Exception e);}/*** 使用示例*/public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(2);// 创建一个会失败的任务Callable<String> failingTask = () -> {if (Math.random() < 0.7) {throw new RuntimeException("Task randomly failed");}return "Task succeeded";};// 使用异常处理包装任务Callable<String> handledTask = withExceptionHandling(failingTask);// 使用默认值包装任务Callable<String> taskWithDefault = withDefaultValue(failingTask, "Default Result");// 使用异常转换包装任务Callable<String> taskWithMappedException = withExceptionMapping(failingTask,e -> new BusinessException("Business operation failed", e));// 使用日志记录包装任务Callable<String> taskWithLogging = withLogging(failingTask,e -> System.err.println("LOGGED ERROR: " + e.getMessage()));// 执行任务try {Future<String> future1 = executor.submit(handledTask);try {String result1 = future1.get();System.out.println("Task 1 result: " + result1);} catch (Exception e) {System.err.println("Task 1 failed: " + e.getMessage());}Future<String> future2 = executor.submit(taskWithDefault);String result2 = future2.get();System.out.println("Task 2 result: " + result2);Future<String> future3 = executor.submit(taskWithMappedException);try {String result3 = future3.get();System.out.println("Task 3 result: " + result3);} catch (Exception e) {System.err.println("Task 3 failed: " + e.getMessage());}Future<String> future4 = executor.submit(taskWithLogging);try {String result4 = future4.get();System.out.println("Task 4 result: " + result4);} catch (Exception e) {System.err.println("Task 4 failed: " + e.getMessage());}} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}}/*** 自定义业务异常*/static class BusinessException extends Exception {public BusinessException(String message, Throwable cause) {super(message, cause);}}
}

4.2 重试策略的实现

以下是任务重试策略的实现:

import java.util.concurrent.*;
import java.util.function.Predicate;/*** 任务重试器* 提供各种重试策略*/
public class TaskRetryExecutor {/*** 固定次数重试策略* * @param task 任务* @param maxRetries 最大重试次数* @param retryDelayMs 重试延迟(毫秒)* @param <T> 结果类型* @return 任务结果* @throws Exception 如果所有重试都失败*/public static <T> T executeWithFixedRetries(Callable<T> task, int maxRetries, long retryDelayMs) throws Exception {int retries = 0;Exception lastException = null;while (retries <= maxRetries) {try {if (retries > 0) {System.out.println("Retry attempt " + retries + " of " + maxRetries);Thread.sleep(retryDelayMs);}return task.call();} catch (Exception e) {lastException = e;System.err.println("Attempt " + retries + " failed: " + e.getMessage());retries++;}}throw new Exception("All retry attempts failed", lastException);}/*** 指数退避重试策略* * @param task 任务* @param maxRetries 最大重试次数* @param initialDelayMs 初始延迟(毫秒)* @param maxDelayMs 最大延迟(毫秒)* @param <T> 结果类型* @return 任务结果* @throws Exception 如果所有重试都失败*/public static <T> T executeWithExponentialBackoff(Callable<T> task, int maxRetries, long initialDelayMs,long maxDelayMs) throws Exception {int retries = 0;long delay = initialDelayMs;Exception lastException = null;while (retries <= maxRetries) {try {if (retries > 0) {System.out.println("Retry attempt " + retries + " of " + maxRetries + " (delay: " + delay + "ms)");Thread.sleep(delay);// 计算下一次延迟,指数增长delay = Math.min(delay * 2, maxDelayMs);}return task.call();} catch (Exception e) {lastException = e;System.err.println("Attempt " + retries + " failed: " + e.getMessage());retries++;}}throw new Exception("All retry attempts failed", lastException);}/*** 条件重试策略* * @param task 任务* @param maxRetries 最大重试次数* @param retryDelayMs 重试延迟(毫秒)* @param retryPredicate 重试条件* @param <T> 结果类型* @return 任务结果* @throws Exception 如果所有重试都失败*/public static <T> T executeWithConditionalRetry(Callable<T> task, int maxRetries, long retryDelayMs,Predicate<Exception> retryPredicate) throws Exception {int retries = 0;Exception lastException = null;while (retries <= maxRetries) {try {if (retries > 0) {System.out.println("Retry attempt " + retries + " of " + maxRetries);Thread.sleep(retryDelayMs);}return task.call();} catch (Exception e) {lastException = e;System.err.println("Attempt " + retries + " failed: " + e.getMessage());// 检查是否应该重试if (!retryPredicate.test(e)) {System.err.println("Exception does not qualify for retry, aborting");throw e;}retries++;}}throw new Exception("All retry attempts failed", lastException);}/*** 异步重试策略* * @param task 任务* @param maxRetries 最大重试次数* @param retryDelayMs 重试延迟(毫秒)* @param <T> 结果类型* @return CompletableFuture*/public static <T> CompletableFuture<T> executeWithAsyncRetry(Callable<T> task, int maxRetries, long retryDelayMs) {CompletableFuture<T> future = new CompletableFuture<>();// 创建调度器ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();// 执行带有重试的异步任务executeAsyncWithRetry(task, maxRetries, retryDelayMs, 0, scheduler, future);// 任务完成后关闭调度器future.whenComplete((result, ex) -> scheduler.shutdown());return future;}/*** 内部方法:执行异步重试*/private static <T> void executeAsyncWithRetry(Callable<T> task, int maxRetries, long retryDelayMs,int currentRetry,ScheduledExecutorService scheduler,CompletableFuture<T> future) {CompletableFuture.runAsync(() -> {try {T result = task.call();future.complete(result);} catch (Exception e) {System.err.println("Attempt " + currentRetry + " failed: " + e.getMessage());if (currentRetry < maxRetries) {System.out.println("Scheduling retry " + (currentRetry + 1) + " of " + maxRetries + " after " + retryDelayMs + "ms");// 调度下一次重试scheduler.schedule(() -> executeAsyncWithRetry(task, maxRetries, retryDelayMs, currentRetry + 1, scheduler, future),retryDelayMs,TimeUnit.MILLISECONDS);} else {future.completeExceptionally(new Exception("All retry attempts failed", e));}}});}/*** 使用示例*/public static void main(String[] args) throws Exception {// 创建一个随机失败的任务AtomicInteger counter = new AtomicInteger(0);Callable<String> unreliableTask = () -> {int attempt = counter.incrementAndGet();System.out.println("Executing task, attempt: " + attempt);if (Math.random() < 0.7) {throw new IOException("Network error occurred");}return "Task succeeded on attempt " + attempt;};// 使用固定次数重试try {counter.set(0);String result1 = executeWithFixedRetries(unreliableTask, 3, 100);System.out.println("Fixed retry result: " + result1);} catch (Exception e) {System.err.println("Fixed retry failed: " + e.getMessage());}// 使用指数退避重试try {counter.set(0);String result2 = executeWithExponentialBackoff(unreliableTask, 4, 100, 1000);System.out.println("Exponential backoff result: " + result2);} catch (Exception e) {System.err.println("Exponential backoff failed: " + e.getMessage());}// 使用条件重试try {counter.set(0);String result3 = executeWithConditionalRetry(unreliableTask, 3, 100,e -> e instanceof IOException  // 只有IO异常才重试);System.out.println("Conditional retry result: " + result3);} catch (Exception e) {System.err.println("Conditional retry failed: " + e.getMessage());}// 使用异步重试counter.set(0);CompletableFuture<String> future = executeWithAsyncRetry(unreliableTask, 5, 200);future.whenComplete((result, ex) -> {if (ex != null) {System.err.println("Async retry failed: " + ex.getMessage());} else {System.out.println("Async retry result: " + result);}});// 等待异步任务完成Thread.sleep(5000);}
}

4.3 失败任务的恢复机制

以下是失败任务的恢复机制实现:

import java.io.*;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;/*** 任务恢复管理器* 提供失败任务的持久化和恢复功能*/
public class TaskRecoveryManager<T, R> {private final ConcurrentMap<String, FailedTask<T>> failedTasks = new ConcurrentHashMap<>();private final ExecutorService executor;private final Function<T, R> taskProcessor;private final String recoveryDir;private final ScheduledExecutorService recoveryScheduler;/*** 创建任务恢复管理器* * @param executor 执行任务的线程池* @param taskProcessor 任务处理函数* @param recoveryDir 恢复文件目录* @param recoveryIntervalMinutes 恢复检查间隔(分钟)*/public TaskRecoveryManager(ExecutorService executor,Function<T, R> taskProcessor,String recoveryDir,int recoveryIntervalMinutes) {this.executor = executor;this.taskProcessor = taskProcessor;this.recoveryDir = recoveryDir;// 创建恢复目录File dir = new File(recoveryDir);if (!dir.exists()) {dir.mkdirs();}// 创建恢复调度器this.recoveryScheduler = Executors.newSingleThreadScheduledExecutor();// 启动定期恢复任务recoveryScheduler.scheduleAtFixedRate(this::recoverFailedTasks,recoveryIntervalMinutes,recoveryIntervalMinutes,TimeUnit.MINUTES);// 启动时加载已有的失败任务loadFailedTasks();}/*** 提交任务执行* * @param taskId 任务ID* @param taskData 任务数据* @return 任务Future*/public CompletableFuture<R> submitTask(String taskId, T taskData) {return CompletableFuture.supplyAsync(() -> {try {// 执行任务R result = taskProcessor.apply(taskData);// 如果任务成功,从失败列表中移除failedTasks.remove(taskId);// 删除恢复文件deleteRecoveryFile(taskId);return result;} catch (Exception e) {// 记录失败任务recordFailedTask(taskId, taskData, e);throw new CompletionException(e);}}, executor);}/*** 记录失败任务* * @param taskId 任务ID* @param taskData 任务数据* @param exception 异常*/private void recordFailedTask(String taskId, T taskData, Exception exception) {FailedTask<T> failedTask = failedTasks.computeIfAbsent(taskId,id -> new FailedTask<>(id, taskData));// 更新失败信息failedTask.incrementFailureCount();failedTask.setLastFailureTime(LocalDateTime.now());failedTask.setLastException(exception);// 持久化失败任务persistFailedTask(failedTask);}/*** 持久化失败任务* * @param failedTask 失败任务*/private void persistFailedTask(FailedTask<T> failedTask) {String filePath = recoveryDir + File.separator + failedTask.getTaskId() + ".recovery";try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(filePath))) {oos.writeObject(failedTask);} catch (IOException e) {System.err.println("Failed to persist task: " + e.getMessage());}}/*** 删除恢复文件* * @param taskId 任务ID*/private void deleteRecoveryFile(String taskId) {String filePath = recoveryDir + File.separator + taskId + ".recovery";File file = new File(filePath);if (file.exists()) {file.delete();}}/*** 加载失败任务*/@SuppressWarnings("unchecked")private void loadFailedTasks() {File dir = new File(recoveryDir);File[] files = dir.listFiles((d, name) -> name.endsWith(".recovery"));if (files != null) {for (File file : files) {try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(file))) {FailedTask<T> failedTask = (FailedTask<T>) ois.readObject();failedTasks.put(failedTask.getTaskId(), failedTask);System.out.println("Loaded failed task: " + failedTask.getTaskId());} catch (Exception e) {System.err.println("Failed to load task from " + file.getName() + ": " + e.getMessage());}}}}/*** 恢复失败任务*/private void recoverFailedTasks() {System.out.println("Starting recovery of failed tasks...");// 获取当前时间LocalDateTime now = LocalDateTime.now();// 遍历失败任务for (FailedTask<T> failedTask : failedTasks.values()) {// 检查是否应该重试if (shouldRetry(failedTask, now)) {System.out.println("Recovering task: " + failedTask.getTaskId());// 重新提交任务submitTask(failedTask.getTaskId(), failedTask.getTaskData()).whenComplete((result, ex) -> {if (ex != null) {System.err.println("Recovery failed for task " + failedTask.getTaskId() + ": " + ex.getMessage());} else {System.out.println("Successfully recovered task: " + failedTask.getTaskId());}});}}}/*** 判断是否应该重试任务* * @param failedTask 失败任务* @param now 当前时间* @return 是否应该重试*/private boolean shouldRetry(FailedTask<T> failedTask, LocalDateTime now) {// 获取上次失败时间LocalDateTime lastFailureTime = failedTask.getLastFailureTime();if (lastFailureTime == null) {return true;}// 计算重试延迟int failureCount = failedTask.getFailureCount();int delayMinutes = Math.min(failureCount * 5, 60);  // 最多延迟1小时// 检查是否已经到达重试时间return lastFailureTime.plusMinutes(delayMinutes).isBefore(now);}/*** 获取所有失败任务* * @return 失败任务列表*/public List<FailedTask<T>> getFailedTasks() {return new ArrayList<>(failedTasks.values());}/*** 手动触发恢复*/public void triggerRecovery() {recoverFailedTasks();}/*** 关闭恢复管理器*/public void shutdown() {recoveryScheduler.shutdown();}/*** 失败任务类*/public static class FailedTask<T> implements Serializable {private static final long serialVersionUID = 1L;private final String taskId;private final T taskData;private int failureCount;private LocalDateTime lastFailureTime;private Exception lastException;public FailedTask(String taskId, T taskData) {this.taskId = taskId;this.taskData = taskData;this.failureCount = 0;}public String getTaskId() {return taskId;}public T getTaskData() {return taskData;}public int getFailureCount() {return failureCount;}public void incrementFailureCount() {this.failureCount++;}public LocalDateTime getLastFailureTime() {return lastFailureTime;}public void setLastFailureTime(LocalDateTime lastFailureTime) {this.lastFailureTime = lastFailureTime;}public Exception getLastException() {return lastException;}public void setLastException(Exception lastException) {this.lastException = lastException;}@Overridepublic String toString() {return "FailedTask{" +"taskId='" + taskId + '\'' +", failureCount=" + failureCount +", lastFailureTime=" + lastFailureTime +", lastException=" + (lastException != null ? lastException.getMessage() : "null") +'}';}}/*** 使用示例*/public static void main(String[] args) throws Exception {// 创建执行器ExecutorService executor = Executors.newFixedThreadPool(4);// 创建任务处理函数Function<String, Integer> taskProcessor = data -> {System.out.println("Processing task: " + data);// 模拟随机失败if (Math.random() < 0.5) {throw new RuntimeException("Random task failure");}return data.length();};// 创建恢复管理器TaskRecoveryManager<String, Integer> recoveryManager = new TaskRecoveryManager<>(executor,taskProcessor,"recovery_data",1  // 1分钟恢复间隔);// 提交一些任务for (int i = 0; i < 10; i++) {final String taskId = "task-" + i;final String taskData = "Task data for " + i;recoveryManager.submitTask(taskId, taskData).whenComplete((result, ex) -> {if (ex != null) {System.err.println("Task " + taskId + " failed: " + ex.getMessage());} else {System.out.println("Task " + taskId + " succeeded with result: " + result);}});}// 等待一段时间Thread.sleep(2000);// 打印失败任务List<TaskRecoveryManager.FailedTask<String>> failedTasks = recoveryManager.getFailedTasks();System.out.println("\nFailed tasks: " + failedTasks.size());for (TaskRecoveryManager.FailedTask<String> task : failedTasks) {System.out.println(task);}// 手动触发恢复System.out.println("\nTriggering manual recovery...");recoveryManager.triggerRecovery();// 等待恢复完成Thread.sleep(2000);// 再次打印失败任务failedTasks = recoveryManager.getFailedTasks();System.out.println("\nFailed tasks after recovery: " + failedTasks.size());for (TaskRecoveryManager.FailedTask<String> task : failedTasks) {System.out.println(task);}// 关闭recoveryManager.shutdown();executor.shutdown();}
}

4.4 死锁与资源泄漏的防范措施

以下是防范死锁和资源泄漏的实现:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** 死锁和资源泄漏防范工具*/
public class DeadlockPreventionUtil {/*** 带有超时的锁获取* 防止死锁* * @param lock 锁对象* @param timeout 超时时间* @param unit 时间单位* @return 是否成功获取锁* @throws InterruptedException 中断异常*/public static boolean acquireLockWithTimeout(Lock lock, long timeout, TimeUnit unit) throws InterruptedException {return lock.tryLock(timeout, unit);}/*** 带有超时的资源获取* 防止资源耗尽* * @param semaphore 信号量* @param timeout 超时时间* @param unit 时间单位* @return 是否成功获取资源* @throws InterruptedException 中断异常*/public static boolean acquireResourceWithTimeout(Semaphore semaphore, long timeout, TimeUnit unit) throws InterruptedException {return semaphore.tryAcquire(timeout, unit);}/*** 带有超时的任务执行* 防止任务执行时间过长* * @param task 任务* @param timeout 超时时间* @param unit 时间单位* @param <T> 结果类型* @return 任务结果* @throws TimeoutException 超时异常* @throws ExecutionException 执行异常* @throws InterruptedException 中断异常*/public static <T> T executeWithTimeout(Callable<T> task, long timeout, TimeUnit unit)throws TimeoutException, ExecutionException, InterruptedException {ExecutorService executor = Executors.newSingleThreadExecutor();try {Future<T> future = executor.submit(task);return future.get(timeout, unit);} finally {executor.shutdownNow();}}/*** 资源自动关闭包装器* 防止资源泄漏* * @param <T> 资源类型*/public static class ResourceGuard<T extends AutoCloseable> implements AutoCloseable {private final T resource;private final AtomicInteger referenceCount = new AtomicInteger(1);public ResourceGuard(T resource) {this.resource = resource;}public T getResource() {if (referenceCount.get() <= 0) {throw new IllegalStateException("Resource already closed");}return resource;}public ResourceGuard<T> acquire() {if (referenceCount.incrementAndGet() <= 1) {referenceCount.decrementAndGet();throw new IllegalStateException("Resource already closed");}return this;}@Overridepublic void close() {if (referenceCount.decrementAndGet() == 0) {try {resource.close();} catch (Exception e) {throw new RuntimeException("Error closing resource", e);}}}}/*** 线程死锁检测器*/public static class DeadlockDetector {private final ThreadMXBean threadMXBean;private final ScheduledExecutorService scheduler;private final long detectionIntervalMs;public DeadlockDetector(long detectionIntervalMs) {this.threadMXBean = ManagementFactory.getThreadMXBean();this.scheduler = Executors.newSingleThreadScheduledExecutor();this.detectionIntervalMs = detectionIntervalMs;}/*** 启动死锁检测*/public void start() {scheduler.scheduleAtFixedRate(this::checkForDeadlocks,detectionIntervalMs,detectionIntervalMs,TimeUnit.MILLISECONDS);}/*** 检测死锁*/private void checkForDeadlocks() {long[] deadlockedThreadIds = threadMXBean.findDeadlockedThreads();if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {System.err.println("DEADLOCK DETECTED!");ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreadIds, true, true);for (ThreadInfo threadInfo : threadInfos) {System.err.println(threadInfo);// 打印线程堆栈StackTraceElement[] stackTrace = threadInfo.getStackTrace();for (StackTraceElement element : stackTrace) {System.err.println("\tat " + element);}System.err.println();}// 可以在这里添加告警或其他处理逻辑}}/*** 停止死锁检测*/public void stop() {scheduler.shutdown();}}/*** 资源泄漏检测器*/public static class ResourceLeakDetector<T> {private final ConcurrentMap<T, LeakInfo> trackedResources = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler;private final long detectionIntervalMs;private final long resourceMaxAgeMs;public ResourceLeakDetector(long detectionIntervalMs, long resourceMaxAgeMs) {this.scheduler = Executors.newSingleThreadScheduledExecutor();this.detectionIntervalMs = detectionIntervalMs;this.resourceMaxAgeMs = resourceMaxAgeMs;}/*** 跟踪资源* * @param resource 资源对象*/public void trackResource(T resource) {trackedResources.put(resource, new LeakInfo(new Exception("Resource allocation stack trace")));}/*** 释放资源* * @param resource 资源对象*/public void releaseResource(T resource) {trackedResources.remove(resource);}/*** 启动泄漏检测*/public void start() {scheduler.scheduleAtFixedRate(this::checkForLeaks,detectionIntervalMs,detectionIntervalMs,TimeUnit.MILLISECONDS);}/*** 检测泄漏*/private void checkForLeaks() {long now = System.currentTimeMillis();for (Map.Entry<T, LeakInfo> entry : trackedResources.entrySet()) {LeakInfo info = entry.getValue();long age = now - info.allocationTime;if (age > resourceMaxAgeMs) {System.err.println("RESOURCE LEAK DETECTED: Resource " + entry.getKey() + " has been allocated for " + age + "ms");System.err.println("Allocation stack trace:");info.allocationStack.printStackTrace(System.err);// 可以在这里添加告警或其他处理逻辑}}}/*** 停止泄漏检测*/public void stop() {scheduler.shutdown();}/*** 泄漏信息类*/private static class LeakInfo {final long allocationTime;final Exception allocationStack;LeakInfo(Exception allocationStack) {this.allocationTime = System.currentTimeMillis();this.allocationStack = allocationStack;}}}/*** 使用示例*/public static void main(String[] args) throws Exception {// 启动死锁检测器DeadlockDetector deadlockDetector = new DeadlockDetector(1000);deadlockDetector.start();// 启动资源泄漏检测器ResourceLeakDetector<Connection> leakDetector = new ResourceLeakDetector<>(1000, 5000);leakDetector.start();// 模拟资源使用ExecutorService executor = Executors.newFixedThreadPool(10);// 正常资源使用executor.submit(() -> {try (ResourceGuard<Connection> guard = new ResourceGuard<>(createConnection())) {Connection connection = guard.getResource();System.out.println("Using connection: " + connection);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}});// 模拟资源泄漏executor.submit(() -> {try {Connection leakedConnection = createConnection();leakDetector.trackResource(leakedConnection);System.out.println("Leaking connection: " + leakedConnection);// 没有关闭连接} catch (Exception e) {e.printStackTrace();}});// 模拟带有超时的锁获取executor.submit(() -> {Lock lock = new ReentrantLock();try {if (acquireLockWithTimeout(lock, 1, TimeUnit.SECONDS)) {try {System.out.println("Lock acquired");Thread.sleep(500);} finally {lock.unlock();}} else {System.out.println("Failed to acquire lock within timeout");}} catch (Exception e) {e.printStackTrace();}});// 模拟带有超时的任务执行executor.submit(() -> {try {String result = executeWithTimeout(() -> {Thread.sleep(500);return "Task completed";}, 1, TimeUnit.SECONDS);System.out.println("Task result: " + result);} catch (TimeoutException e) {System.err.println("Task timed out");} catch (Exception e) {e.printStackTrace();}});// 等待一段时间Thread.sleep(10000);// 关闭executor.shutdown();deadlockDetector.stop();leakDetector.stop();}/*** 模拟创建数据库连接*/private static Connection createConnection() {return new Connection() {private boolean closed = false;@Overridepublic void close() {closed = true;System.out.println("Connection closed");}@Overridepublic String toString() {return "Connection@" + Integer.toHexString(hashCode()) + (closed ? " (closed)" : "");}};}/*** 模拟连接接口*/private interface Connection extends AutoCloseable {@Overridevoid close();}
}
http://www.dtcms.com/a/424011.html

相关文章:

  • pc开奖网站开发濮阳建网站
  • JWT token 简要介绍以及使用场景和案例
  • 网站在线留言怎么做行政法规
  • 语义网络(Semantic Net)对人工智能中自然语言处理的深层语义分析的影响与启示
  • 南通网站建设优化网站建设服务器配置
  • “AI+“行动下的可控智能体:GPT-5 与 GPT-OSS 高性能推理 安全可控 产业落地 GPT-OSS 一可控AI目前全球唯一开源解决方案
  • 零基础网站建设视频想做一个自己的网站怎么做
  • UWB实操:使用litepoint测试FCC 1ms PSD功率谱密度
  • 企业网站自己可以做吗工作室做什么项目好
  • 商丘做网站的电话怎样做自己公司的网站
  • 复旦华为提出首个空间理解和生成统一框架UniUGG,支持参考图像和任意视图变换的 3D 场景生成和空间视觉问答 (VQA) 任务。
  • InnoDB与MySQL复制的关键注意事项
  • 【python】五个容器
  • 网站后台信息发布这样做利用html做博客网站
  • SSM数字图书馆on33n(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 网站开发项目团队人员易车网汽车之家
  • 个人建站公司好看个人网页模板
  • 高端网站建设 源码wordpress去log
  • 杭州网站建设哪里好牡丹江有做网站的人吗
  • PHP转JAVA入门知识解析 (指南一)
  • Django之APPEND_SLASH配置爬坑
  • 连连建设跨境电商网站在线名片设计
  • dw和vs做网站跨境电商怎么注册开店
  • Meep 和 MPB 软件的关系:姊妹软件
  • 动易网站风格免费下载广州天河区必去的地方
  • Vue3 全局 API 转移详解
  • Vue中的计算属性和监视属性
  • Element UI 组件样式自定义详解与最佳实践
  • Kali Linux 2025.3 正式发布:更贴近前沿的安全平台
  • 做卡盟网站教程拓者网室内设计官网app