医疗设备设计seo服务商技术好的公司
以下是基于 Java 的异步任务管理器的设计与实现方案,涵盖核心功能、工具推荐和代码示例:
1. 核心功能实现
1.1 基础版本(使用 ExecutorService
和 Future
)
import java.util.concurrent.*;public class AsyncTaskManager {private final ExecutorService executor;private final ConcurrentMap<String, Future<?>> taskMap = new ConcurrentHashMap<>();public AsyncTaskManager(int poolSize) {this.executor = Executors.newFixedThreadPool(poolSize);}// 添加任务public String addTask(String taskId, Runnable task) {Future<?> future = executor.submit(task);taskMap.put(taskId, future);return taskId;}// 获取任务状态public String getTaskStatus(String taskId) {Future<?> future = taskMap.get(taskId);if (future == null) return "Not Found";if (future.isDone()) {try {future.get(); // 检查是否异常return "Completed";} catch (Exception e) {return "Failed";}}return "Running";}// 关闭管理器public void shutdown() {executor.shutdown();}
}
1.2 使用 CompletableFuture
(支持异步链式调用)
public class AdvancedAsyncTaskManager {private final ExecutorService executor = Executors.newCachedThreadPool();private final ConcurrentMap<String, CompletableFuture<?>> taskMap = new ConcurrentHashMap<>();public String submitTask(String taskId, Supplier<?> task) {CompletableFuture<?> future = CompletableFuture.supplyAsync(task, executor);taskMap.put(taskId, future);future.whenComplete((result, ex) -> {if (ex != null) {System.err.println("Task failed: " + ex.getMessage());}});return taskId;}public <T> T getResult(String taskId) throws ExecutionException, InterruptedException {CompletableFuture<?> future = taskMap.get(taskId);return (T) (future != null ? future.get() : null);}
}
2. 高级功能扩展
2.1 重试机制(使用 Resilience4j 或 Spring Retry)
// 依赖:io.github.resilience4j:resilience4j-retry
RetryConfig config = RetryConfig.custom().maxAttempts(3).waitDuration(Duration.ofMillis(1000)).retryOnException(e -> e instanceof IOException).build();Retry retry = Retry.of("taskRetry", config);Runnable retryableTask = Retry.decorateRunnable(retry, () -> {// 可能会失败的任务if (Math.random() > 0.5) throw new IOException("模拟失败");
});new AsyncTaskManager(4).addTask("task1", retryableTask);
2.2 任务超时控制
CompletableFuture.supplyAsync(() -> {// 长时间任务return "Result";
}).completeOnTimeout("Timeout Fallback", 5, TimeUnit.SECONDS);
2.3 任务依赖(DAG 调度)
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> task2 = task1.thenApplyAsync(result -> "Task2 uses " + result);
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
allTasks.join();
3. 分布式异步任务(扩展思路)
3.1 消息队列集成(如 RabbitMQ/Kafka)
// 使用 RabbitMQ 分发任务(示例代码)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare("task_queue", true, false, false, null);// 发布任务channel.basicPublish("", "task_queue", null, "Task Data".getBytes());
}
3.2 分布式任务框架
- PowerJob:轻量级分布式任务调度框架(GitHub)。
- Elastic-Job:基于 ZooKeeper 的弹性分布式任务调度(官网)。
- Quartz Cluster:集群化定时任务管理。
4. 完整示例(Spring Boot 集成)
4.1 配置异步任务管理器
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("AsyncTask-");executor.initialize();return executor;}
}
4.2 定义异步任务
@Service
public class EmailService {@Asyncpublic CompletableFuture<String> sendEmailAsync(String to) {// 模拟耗时操作Thread.sleep(2000);return CompletableFuture.completedFuture("Email sent to " + to);}
}
4.3 监控任务状态
@RestController
public class TaskController {@Autowiredprivate EmailService emailService;private final Map<String, CompletableFuture<String>> tasks = new ConcurrentHashMap<>();@PostMapping("/sendEmail")public String triggerEmailTask(@RequestParam String email) {CompletableFuture<String> future = emailService.sendEmailAsync(email);String taskId = UUID.randomUUID().toString();tasks.put(taskId, future);return taskId;}@GetMapping("/taskStatus")public String getTaskStatus(@RequestParam String taskId) {CompletableFuture<String> future = tasks.get(taskId);if (future == null) return "Not Found";if (future.isDone()) {try {return "Completed: " + future.get();} catch (Exception e) {return "Failed: " + e.getMessage();}}return "Running";}
}
5. 推荐工具与框架
场景 | 工具推荐 |
---|---|
单机任务 | ExecutorService 、CompletableFuture 、Guava 的 ListenableFuture |
定时任务 | Quartz、Spring @Scheduled |
分布式任务 | PowerJob、Elastic-Job、XXL-JOB |
消息队列 | RabbitMQ、Kafka、Redis Stream |
监控 | Micrometer + Prometheus、Spring Boot Actuator |
6. 最佳实践
- 线程池调优:根据任务类型(CPU 密集型 vs I/O 密集型)配置线程池参数。
- 优雅关闭:调用
shutdown()
或shutdownNow()
确保任务安全终止。 - 异常处理:通过
Future.get()
或CompletableFuture.exceptionally()
捕获异常。 - 任务持久化:使用数据库(如 MySQL)记录任务元数据,避免内存丢失。
如果需要更具体的实现(如分布式任务分片、动态扩缩容)或某个框架的详细用法,请进一步说明!