基于 Java 的异步任务管理器的设计与实现方案
以下是基于 Java 的异步任务管理器的设计与实现方案,涵盖核心功能、工具推荐和代码示例:
1. 核心功能实现
1.1 基础版本(使用 ExecutorService
和 Future
)
import java.util.concurrent.*;
public class AsyncTaskManager {
private final ExecutorService executor;
private final ConcurrentMap<String, Future<?>> taskMap = new ConcurrentHashMap<>();
public AsyncTaskManager(int poolSize) {
this.executor = Executors.newFixedThreadPool(poolSize);
}
// 添加任务
public String addTask(String taskId, Runnable task) {
Future<?> future = executor.submit(task);
taskMap.put(taskId, future);
return taskId;
}
// 获取任务状态
public String getTaskStatus(String taskId) {
Future<?> future = taskMap.get(taskId);
if (future == null) return "Not Found";
if (future.isDone()) {
try {
future.get(); // 检查是否异常
return "Completed";
} catch (Exception e) {
return "Failed";
}
}
return "Running";
}
// 关闭管理器
public void shutdown() {
executor.shutdown();
}
}
1.2 使用 CompletableFuture
(支持异步链式调用)
public class AdvancedAsyncTaskManager {
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ConcurrentMap<String, CompletableFuture<?>> taskMap = new ConcurrentHashMap<>();
public String submitTask(String taskId, Supplier<?> task) {
CompletableFuture<?> future = CompletableFuture.supplyAsync(task, executor);
taskMap.put(taskId, future);
future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Task failed: " + ex.getMessage());
}
});
return taskId;
}
public <T> T getResult(String taskId) throws ExecutionException, InterruptedException {
CompletableFuture<?> future = taskMap.get(taskId);
return (T) (future != null ? future.get() : null);
}
}
2. 高级功能扩展
2.1 重试机制(使用 Resilience4j 或 Spring Retry)
// 依赖:io.github.resilience4j:resilience4j-retry
RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(1000))
.retryOnException(e -> e instanceof IOException)
.build();
Retry retry = Retry.of("taskRetry", config);
Runnable retryableTask = Retry.decorateRunnable(retry, () -> {
// 可能会失败的任务
if (Math.random() > 0.5) throw new IOException("模拟失败");
});
new AsyncTaskManager(4).addTask("task1", retryableTask);
2.2 任务超时控制
CompletableFuture.supplyAsync(() -> {
// 长时间任务
return "Result";
}).completeOnTimeout("Timeout Fallback", 5, TimeUnit.SECONDS);
2.3 任务依赖(DAG 调度)
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> task2 = task1.thenApplyAsync(result -> "Task2 uses " + result);
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
allTasks.join();
3. 分布式异步任务(扩展思路)
3.1 消息队列集成(如 RabbitMQ/Kafka)
// 使用 RabbitMQ 分发任务(示例代码)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare("task_queue", true, false, false, null);
// 发布任务
channel.basicPublish("", "task_queue", null, "Task Data".getBytes());
}
3.2 分布式任务框架
- PowerJob:轻量级分布式任务调度框架(GitHub)。
- Elastic-Job:基于 ZooKeeper 的弹性分布式任务调度(官网)。
- Quartz Cluster:集群化定时任务管理。
4. 完整示例(Spring Boot 集成)
4.1 配置异步任务管理器
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncTask-");
executor.initialize();
return executor;
}
}
4.2 定义异步任务
@Service
public class EmailService {
@Async
public CompletableFuture<String> sendEmailAsync(String to) {
// 模拟耗时操作
Thread.sleep(2000);
return CompletableFuture.completedFuture("Email sent to " + to);
}
}
4.3 监控任务状态
@RestController
public class TaskController {
@Autowired
private EmailService emailService;
private final Map<String, CompletableFuture<String>> tasks = new ConcurrentHashMap<>();
@PostMapping("/sendEmail")
public String triggerEmailTask(@RequestParam String email) {
CompletableFuture<String> future = emailService.sendEmailAsync(email);
String taskId = UUID.randomUUID().toString();
tasks.put(taskId, future);
return taskId;
}
@GetMapping("/taskStatus")
public String getTaskStatus(@RequestParam String taskId) {
CompletableFuture<String> future = tasks.get(taskId);
if (future == null) return "Not Found";
if (future.isDone()) {
try {
return "Completed: " + future.get();
} catch (Exception e) {
return "Failed: " + e.getMessage();
}
}
return "Running";
}
}
5. 推荐工具与框架
场景 | 工具推荐 |
---|---|
单机任务 | ExecutorService 、CompletableFuture 、Guava 的 ListenableFuture |
定时任务 | Quartz、Spring @Scheduled |
分布式任务 | PowerJob、Elastic-Job、XXL-JOB |
消息队列 | RabbitMQ、Kafka、Redis Stream |
监控 | Micrometer + Prometheus、Spring Boot Actuator |
6. 最佳实践
- 线程池调优:根据任务类型(CPU 密集型 vs I/O 密集型)配置线程池参数。
- 优雅关闭:调用
shutdown()
或shutdownNow()
确保任务安全终止。 - 异常处理:通过
Future.get()
或CompletableFuture.exceptionally()
捕获异常。 - 任务持久化:使用数据库(如 MySQL)记录任务元数据,避免内存丢失。
如果需要更具体的实现(如分布式任务分片、动态扩缩容)或某个框架的详细用法,请进一步说明!