通用的动态定时任务系统
下面是一个通用的动态定时任务系统的完整实现。这套系统采用策略模式和Spring的依赖注入,使得添加新的任务类型变得更加便捷和模块化。以下内容包括项目结构、必要的依赖配置、实体类、仓库接口、任务处理器接口与实现、任务处理器工厂、服务层、控制器层、异常处理、定时任务调度配置以及应用启动与任务恢复等关键组件的完整代码。
目录
- 项目结构
- 依赖配置 (
pom.xml
) - 应用配置 (
application.properties
) - 实体类
- ScheduledTask.java
- 仓库接口
- ScheduledTaskRepository.java
- 任务处理器
- TaskHandler.java
- EmailTaskHandler.java
- DataProcessTaskHandler.java
- 任务处理器工厂
- TaskHandlerFactory.java
- 定时任务调度配置
- SchedulerConfig.java
- 服务层
- ScheduledTaskService.java
- StartupService.java
- CleanupService.java
- 控制器层
- ScheduledTaskController.java
- 异常处理
- GlobalExceptionHandler.java
- 主应用类
- TaskSchedulerApplication.java
1. 项目结构
以下是项目的包结构示意:
src
├── main
│ ├── java
│ │ └── com
│ │ └── example
│ │ └── taskscheduler
│ │ ├── TaskSchedulerApplication.java
│ │ ├── config
│ │ │ └── SchedulerConfig.java
│ │ ├── controller
│ │ │ └── ScheduledTaskController.java
│ │ ├── entity
│ │ │ └── ScheduledTask.java
│ │ ├── exception
│ │ │ └── GlobalExceptionHandler.java
│ │ ├── factory
│ │ │ └── TaskHandlerFactory.java
│ │ ├── handler
│ │ │ ├── DataProcessTaskHandler.java
│ │ │ ├── EmailTaskHandler.java
│ │ │ └── TaskHandler.java
│ │ ├── repository
│ │ │ └── ScheduledTaskRepository.java
│ │ └── service
│ │ ├── CleanupService.java
│ │ ├── ScheduledTaskService.java
│ │ └── StartupService.java
│ └── resources
│ └── application.properties
└── test
└── java
└── com
└── example
└── taskscheduler
└── TaskSchedulerApplicationTests.java
2. 依赖配置 (pom.xml
)
首先,确保在项目的 pom.xml
文件中添加必要的依赖。以下是一个示例配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>taskscheduler</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>TaskScheduler</name>
<description>通用的动态定时任务系统</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.1</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version> <!-- 确保使用支持的Java版本 -->
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- 数据库驱动(以MySQL为例) -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok(可选,用于简化代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<!-- Spring Boot Starter Test(用于测试) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<annotationProcessorPaths>
<!-- Lombok 注解处理器 -->
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
说明:
- Spring Boot Starter Web:用于构建Web应用,包括REST API。
- Spring Boot Starter Data JPA:用于与数据库交互。
- Jackson Databind:用于JSON处理。
- MySQL Connector/J:如果使用MySQL数据库,需添加对应驱动。
- Lombok:用于简化Java代码(如生成getter/setter),可选但推荐。
- Spring Boot Starter Test:用于测试。
3. 应用配置 (application.properties
)
在 src/main/resources/
目录下创建 application.properties
文件,配置数据库连接和JPA设置。
# 数据库配置(以MySQL为例)
spring.datasource.url=jdbc:mysql://localhost:3306/taskscheduler?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=yourpassword
# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.format_sql=true
# 定时任务初始延迟和间隔(可选,根据需要调整)
# 我们将在服务层自定义这些,通常不需要在此配置
# Logging配置(可选)
logging.level.org.springframework=INFO
logging.level.com.example.taskscheduler=DEBUG
注意:
- 数据库URL:确保替换为您实际的数据库URL、用户名和密码。
spring.jpa.hibernate.ddl-auto=update
:让JPA自动创建或更新数据库表结构。对于生产环境,建议使用更严格的设置。- 日志级别:根据需要调整,以获取更多或更少的日志信息。
4. 实体类
ScheduledTask.java
package com.example.taskscheduler.entity;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Table(name = "scheduled_tasks", uniqueConstraints = @UniqueConstraint(columnNames = "taskId"))
@Data
@NoArgsConstructor
public class ScheduledTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String taskId; // 业务上的唯一标识
@Column(nullable = false)
private String taskType; // 任务类型
@Column(columnDefinition = "TEXT")
private String parameters; // JSON格式的参数
@Column(nullable = false)
private LocalDateTime scheduledTime; // 任务执行时间
@Column(nullable = false, updatable = false)
private LocalDateTime createdAt = LocalDateTime.now();
private LocalDateTime updatedAt = LocalDateTime.now();
private boolean executed = false;
private String executionStatus;
@Column(columnDefinition = "TEXT")
private String executionResult;
// 构造函数
public ScheduledTask(String taskId, String taskType, String parameters, LocalDateTime scheduledTime) {
this.taskId = taskId;
this.taskType = taskType;
this.parameters = parameters;
this.scheduledTime = scheduledTime;
}
@PreUpdate
public void preUpdate() {
this.updatedAt = LocalDateTime.now();
}
}
说明:
- Lombok注解:
@Data
自动生成getter、setter、toString等方法;@NoArgsConstructor
生成无参构造函数。 - 字段解释:
taskId
:业务上的唯一标识,用于任务去重。taskType
:任务类型标识,如EMAIL
、DATA_PROCESS
。parameters
:任务执行所需的参数,以JSON字符串形式存储。scheduledTime
:任务的执行时间。executed
:任务是否已执行。executionStatus
和executionResult
:记录任务执行状态和结果。
5. 仓库接口
ScheduledTaskRepository.java
package com.example.taskscheduler.repository;
import com.example.taskscheduler.entity.ScheduledTask;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public interface ScheduledTaskRepository extends JpaRepository<ScheduledTask, Long> {
Optional<ScheduledTask> findByTaskId(String taskId);
boolean existsByTaskId(String taskId);
}
说明:
- 提供根据
taskId
查询和验证任务是否存在的方法,以实现任务去重。
6. 任务处理器
采用策略模式,为每种任务类型创建对应的处理器,实现灵活和可扩展的任务执行逻辑。
6.1 TaskHandler.java
package com.example.taskscheduler.handler;
import java.util.Map;
public interface TaskHandler {
/**
* 执行任务的逻辑。
*
* @param parameters 任务参数,以键值对形式传递
* @throws Exception 执行过程中可能抛出的异常
*/
void handle(Map<String, Object> parameters) throws Exception;
/**
* 返回该处理器支持的任务类型。
*
* @return 任务类型的字符串标识
*/
String getTaskType();
}
6.2 EmailTaskHandler.java
package com.example.taskscheduler.handler;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class EmailTaskHandler implements TaskHandler {
@Override
public void handle(Map<String, Object> parameters) throws Exception {
String recipient = (String) parameters.get("recipient");
String subject = (String) parameters.get("subject");
String body = (String) parameters.get("body");
// 实现发送邮件的逻辑,例如使用JavaMailSender
System.out.println("Sending email to: " + recipient);
System.out.println("Subject: " + subject);
System.out.println("Body: " + body);
// 模拟邮件发送成功
// 在实际应用中,应集成邮件发送服务
}
@Override
public String getTaskType() {
return "EMAIL";
}
}
6.3 DataProcessTaskHandler.java
package com.example.taskscheduler.handler;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class DataProcessTaskHandler implements TaskHandler {
@Override
public void handle(Map<String, Object> parameters) throws Exception {
String dataSource = (String) parameters.get("dataSource");
String operation = (String) parameters.get("operation");
// 实现数据处理的逻辑
System.out.println("Processing data from: " + dataSource);
System.out.println("Operation: " + operation);
// 模拟数据处理成功
// 在实际应用中,应执行具体的数据处理任务
}
@Override
public String getTaskType() {
return "DATA_PROCESS";
}
}
扩展性:
- 添加新的任务类型时,只需创建新的
TaskHandler
实现类,并通过@Component
注解注册,即可自动被工厂识别和调用,无需修改现有代码。
7. 任务处理器工厂
负责根据任务类型查找并返回对应的 TaskHandler
实例。
TaskHandlerFactory.java
package com.example.taskscheduler.factory;
import com.example.taskscheduler.handler.TaskHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class TaskHandlerFactory {
private final List<TaskHandler> handlers;
private final Map<String, TaskHandler> handlerMap = new HashMap<>();
@Autowired
public TaskHandlerFactory(List<TaskHandler> handlers) {
this.handlers = handlers;
}
@PostConstruct
public void init() {
for (TaskHandler handler : handlers) {
handlerMap.put(handler.getTaskType(), handler);
}
}
/**
* 根据任务类型获取对应的处理器。
*
* @param taskType 任务类型的字符串标识
* @return 对应的 TaskHandler 实例
* @throws IllegalArgumentException 如果没有找到对应的处理器
*/
public TaskHandler getHandler(String taskType) {
TaskHandler handler = handlerMap.get(taskType);
if (handler == null) {
throw new IllegalArgumentException("No handler found for task type: " + taskType);
}
return handler;
}
}
说明:
- 自动注入所有实现了
TaskHandler
接口的Bean,并在@PostConstruct
阶段初始化handlerMap
,映射任务类型到处理器。 getHandler
方法根据任务类型获取对应的处理器实例。
8. 定时任务调度配置
配置 ThreadPoolTaskScheduler
Bean,用于动态调度任务。
SchedulerConfig.java
package com.example.taskscheduler.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10); // 根据需求调整
scheduler.setThreadNamePrefix("DynamicTaskScheduler-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.initialize();
return scheduler;
}
}
说明:
- 线程池大小:根据预期的并发任务量调整
poolSize
。 - 线程名称前缀:便于在日志中追踪任务执行。
- 关闭时等待任务完成:确保在应用关闭时,正在执行的任务能够完成。
9. 服务层
负责核心的任务管理逻辑,包括任务创建、调度、执行和取消。
9.1 ScheduledTaskService.java
package com.example.taskscheduler.service;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.factory.TaskHandlerFactory;
import com.example.taskscheduler.handler.TaskHandler;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@Service
public class ScheduledTaskService {
private final ScheduledTaskRepository scheduledTaskRepository;
private final ThreadPoolTaskScheduler taskScheduler;
private final ObjectMapper objectMapper;
private final TaskHandlerFactory taskHandlerFactory;
// 用于管理 ScheduledFuture 对象,以支持任务取消等操作
private final Map<Long, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
@Autowired
public ScheduledTaskService(ScheduledTaskRepository scheduledTaskRepository,
ThreadPoolTaskScheduler taskScheduler,
ObjectMapper objectMapper,
TaskHandlerFactory taskHandlerFactory) {
this.scheduledTaskRepository = scheduledTaskRepository;
this.taskScheduler = taskScheduler;
this.objectMapper = objectMapper;
this.taskHandlerFactory = taskHandlerFactory;
}
/**
* 创建并调度一个新任务。
*
* @param taskId 业务上的唯一任务标识
* @param taskType 任务类型
* @param parameters 任务参数(Map<String, Object>)
* @param delayMinutes 延迟执行的分钟数
* @return 创建的 ScheduledTask 对象
*/
public ScheduledTask createAndScheduleTask(String taskId, String taskType, Map<String, Object> parameters, long delayMinutes) {
// 任务去重:检查是否已存在相同 taskId 的未执行任务
if (scheduledTaskRepository.existsByTaskId(taskId)) {
throw new IllegalArgumentException("Task with taskId " + taskId + " already exists.");
}
// 计算执行时间
LocalDateTime scheduledTime = LocalDateTime.now().plusMinutes(delayMinutes);
// 序列化参数为 JSON 字符串
String parametersJson;
try {
parametersJson = objectMapper.writeValueAsString(parameters);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize task parameters.", e);
}
// 创建 ScheduledTask 实体
ScheduledTask scheduledTask = new ScheduledTask(taskId, taskType, parametersJson, scheduledTime);
scheduledTaskRepository.save(scheduledTask);
// 创建并调度任务
Runnable taskRunnable = () -> executeTask(scheduledTask.getId());
ScheduledFuture<?> future = taskScheduler.schedule(taskRunnable, scheduledTime.atZone(java.time.ZoneId.systemDefault()).toInstant());
// 保存 ScheduledFuture 对象
scheduledFutures.put(scheduledTask.getId(), future);
return scheduledTask;
}
/**
* 执行任务逻辑,根据任务类型调用不同的处理方法。
*
* @param scheduledTaskId 数据库中的 ScheduledTask 的 ID
*/
public void executeTask(Long scheduledTaskId) {
Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findById(scheduledTaskId);
if (!optionalTask.isPresent()) {
// 任务记录不存在,可能已经被删除
return;
}
ScheduledTask task = optionalTask.get();
if (task.isExecuted()) {
// 任务已执行,跳过
return;
}
try {
// 反序列化参数
Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);
// 获取任务类型对应的处理器
TaskHandler handler = taskHandlerFactory.getHandler(task.getTaskType());
// 执行任务
handler.handle(params);
// 更新任务状态
task.setExecuted(true);
task.setExecutionStatus("SUCCESS");
task.setExecutionResult("Task executed successfully.");
scheduledTaskRepository.save(task);
} catch (Exception e) {
// 更新任务状态为失败
task.setExecuted(true);
task.setExecutionStatus("FAILED");
task.setExecutionResult(e.getMessage());
scheduledTaskRepository.save(task);
// 记录日志或采取其他措施
System.err.println("Failed to execute taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
}
}
/**
* 取消指定任务。
*
* @param taskId 业务上的唯一任务标识
* @return true 如果任务成功取消,false 否则
*/
public boolean cancelTask(String taskId) {
Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findByTaskId(taskId);
if (!optionalTask.isPresent()) {
return false; // 任务不存在
}
ScheduledTask task = optionalTask.get();
if (task.isExecuted()) {
return false; // 任务已执行,无法取消
}
// 获取 ScheduledFuture 对象
ScheduledFuture<?> future = scheduledFutures.get(task.getId());
if (future != null && !future.isDone()) {
boolean cancelled = future.cancel(false);
if (cancelled) {
// 更新任务状态
task.setExecuted(true);
task.setExecutionStatus("CANCELLED");
task.setExecutionResult("Task was cancelled before execution.");
scheduledTaskRepository.save(task);
// 从 map 中移除
scheduledFutures.remove(task.getId());
return true;
}
}
return false;
}
/**
* 查找任务详情。
*
* @param taskId 业务上的唯一任务标识
* @return Optional<ScheduledTask>
*/
public Optional<ScheduledTask> findByTaskId(String taskId) {
return scheduledTaskRepository.findByTaskId(taskId);
}
}
9.2 StartupService.java
负责在应用启动时恢复未执行的任务,确保系统重启后任务不丢失。
package com.example.taskscheduler.service;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class StartupService {
private final ScheduledTaskRepository scheduledTaskRepository;
private final ScheduledTaskService scheduledTaskService;
private final ObjectMapper objectMapper;
@Autowired
public StartupService(ScheduledTaskRepository scheduledTaskRepository,
ScheduledTaskService scheduledTaskService,
ObjectMapper objectMapper) {
this.scheduledTaskRepository = scheduledTaskRepository;
this.scheduledTaskService = scheduledTaskService;
this.objectMapper = objectMapper;
}
@PostConstruct
public void init() {
List<ScheduledTask> pendingTasks = scheduledTaskRepository.findAll()
.stream()
.filter(task -> !task.isExecuted())
.collect(Collectors.toList());
for (ScheduledTask task : pendingTasks) {
// 计算剩余延迟时间
Duration delay = Duration.between(LocalDateTime.now(), task.getScheduledTime());
long delayInMinutes = delay.toMinutes();
if (delayInMinutes > 0) {
// 重新调度任务
try {
Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);
scheduledTaskService.createAndScheduleTask(task.getTaskId(),
task.getTaskType(),
params,
delayInMinutes);
} catch (Exception e) {
System.err.println("Failed to reschedule taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
}
} else {
// 执行延迟已过的任务
scheduledTaskService.executeTask(task.getId());
}
}
System.out.println("Recovered and rescheduled " + pendingTasks.size() + " tasks.");
}
}
9.3 CleanupService.java
负责定期清理已执行或取消的任务,防止数据库膨胀。
package com.example.taskscheduler.service;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class CleanupService {
private final ScheduledTaskRepository scheduledTaskRepository;
private final ObjectMapper objectMapper;
@Autowired
public CleanupService(ScheduledTaskRepository scheduledTaskRepository,
ObjectMapper objectMapper) {
this.scheduledTaskRepository = scheduledTaskRepository;
this.objectMapper = objectMapper;
}
/**
* 定期清理已执行且超过保留期限的任务。
* 例如,保留已执行任务7天后删除。
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void cleanUpExecutedTasks() {
LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
List<ScheduledTask> tasksToDelete = scheduledTaskRepository.findAll()
.stream()
.filter(task -> task.isExecuted() &&
(task.getScheduledTime().isBefore(cutoff) ||
"CANCELLED".equals(task.getExecutionStatus()) ||
"FAILED".equals(task.getExecutionStatus())))
.collect(Collectors.toList());
scheduledTaskRepository.deleteAll(tasksToDelete);
System.out.println("Cleaned up " + tasksToDelete.size() + " tasks.");
}
}
说明:
@Scheduled(cron = "0 0 2 * * ?")
:设置任务每天凌晨2点执行。- 清理条件:
- 任务已执行且执行时间早于7天前。
- 任务状态为
CANCELLED
或FAILED
。
10. 控制器层
提供REST API端点,用于创建、取消和查询任务。
ScheduledTaskController.java
package com.example.taskscheduler.controller;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.service.ScheduledTaskService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Optional;
@RestController
@RequestMapping("/api/tasks")
public class ScheduledTaskController {
private final ScheduledTaskService scheduledTaskService;
private final ObjectMapper objectMapper;
@Autowired
public ScheduledTaskController(ScheduledTaskService scheduledTaskService,
ObjectMapper objectMapper) {
this.scheduledTaskService = scheduledTaskService;
this.objectMapper = objectMapper;
}
/**
* 创建并调度新任务。
*
* @param taskRequest 包含 taskId, taskType, parameters, delayMinutes 的请求体
* @return 创建的任务信息
*/
@PostMapping("/schedule")
public ResponseEntity<?> scheduleTask(@RequestBody TaskRequest taskRequest) {
try {
ScheduledTask task = scheduledTaskService.createAndScheduleTask(
taskRequest.getTaskId(),
taskRequest.getTaskType(),
taskRequest.getParameters(),
taskRequest.getDelayMinutes()
);
return ResponseEntity.ok(task);
} catch (IllegalArgumentException ex) {
return ResponseEntity.badRequest().body(ex.getMessage());
} catch (Exception ex) {
return ResponseEntity.status(500).body("Failed to schedule task.");
}
}
/**
* 取消指定任务。
*
* @param taskId 业务上的唯一任务标识
* @return 取消结果
*/
@DeleteMapping("/cancel/{taskId}")
public ResponseEntity<?> cancelTask(@PathVariable String taskId) {
boolean cancelled = scheduledTaskService.cancelTask(taskId);
if (cancelled) {
return ResponseEntity.ok("Task cancelled successfully.");
} else {
return ResponseEntity.badRequest().body("Failed to cancel task. It may have been executed or does not exist.");
}
}
/**
* 查询任务详情。
*
* @param taskId 业务上的唯一任务标识
* @return 任务详情
*/
@GetMapping("/{taskId}")
public ResponseEntity<?> getTaskStatus(@PathVariable String taskId) {
Optional<ScheduledTask> optionalTask = scheduledTaskService.findByTaskId(taskId);
if (optionalTask.isPresent()) {
return ResponseEntity.ok(optionalTask.get());
} else {
return ResponseEntity.status(404).body("Task not found.");
}
}
// DTO 类用于接收请求体
public static class TaskRequest {
private String taskId;
private String taskType;
private Map<String, Object> parameters;
private long delayMinutes;
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public Map<String, Object> getParameters() {
return parameters;
}
public void setParameters(Map<String, Object> parameters) {
this.parameters = parameters;
}
public long getDelayMinutes() {
return delayMinutes;
}
public void setDelayMinutes(long delayMinutes) {
this.delayMinutes = delayMinutes;
}
}
}
说明:
- 创建任务:通过
POST /api/tasks/schedule
端点接收任务请求,包含taskId
、taskType
、parameters
和delayMinutes
。 - 取消任务:通过
DELETE /api/tasks/cancel/{taskId}
端点取消特定任务。 - 查询任务:通过
GET /api/tasks/{taskId}
端点获取任务详情。
11. 异常处理
统一的全局异常处理,确保API响应的一致性和信息的安全性。
GlobalExceptionHandler.java
package com.example.taskscheduler.exception;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@ControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleBadRequest(Exception ex) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
}
@ExceptionHandler(UnsupportedOperationException.class)
public ResponseEntity<String> handleUnsupportedOperation(Exception ex) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
}
@ExceptionHandler(Exception.class)
public ResponseEntity<String> handleInternalError(Exception ex) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("An unexpected error occurred.");
}
}
说明:
@ControllerAdvice
:允许定义全局的异常处理规则。- 特定异常处理:为常见的业务异常(如
IllegalArgumentException
和UnsupportedOperationException
)提供定制化的响应。 - 通用异常处理:捕获所有未处理的异常,避免泄露敏感信息。
12. 主应用类
定义Spring Boot的主应用入口。
TaskSchedulerApplication.java
package com.example.taskscheduler;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling // 启用定时任务调度
public class TaskSchedulerApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSchedulerApplication.class, args);
}
}
说明:
@SpringBootApplication
:标识这是一个Spring Boot应用。@EnableScheduling
:启用Spring的定时任务调度功能,允许使用@Scheduled
注解的方法。
完整代码汇总
为了便于理解,以下是各个关键类的完整代码。
12.1. TaskHandler.java
package com.example.taskscheduler.handler;
import java.util.Map;
public interface TaskHandler {
void handle(Map<String, Object> parameters) throws Exception;
String getTaskType();
}
12.2. EmailTaskHandler.java
package com.example.taskscheduler.handler;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class EmailTaskHandler implements TaskHandler {
@Override
public void handle(Map<String, Object> parameters) throws Exception {
String recipient = (String) parameters.get("recipient");
String subject = (String) parameters.get("subject");
String body = (String) parameters.get("body");
// 实现发送邮件的逻辑,例如使用JavaMailSender
System.out.println("Sending email to: " + recipient);
System.out.println("Subject: " + subject);
System.out.println("Body: " + body);
// 模拟邮件发送成功
// 在实际应用中,应集成邮件发送服务
}
@Override
public String getTaskType() {
return "EMAIL";
}
}
12.3. DataProcessTaskHandler.java
package com.example.taskscheduler.handler;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class DataProcessTaskHandler implements TaskHandler {
@Override
public void handle(Map<String, Object> parameters) throws Exception {
String dataSource = (String) parameters.get("dataSource");
String operation = (String) parameters.get("operation");
// 实现数据处理的逻辑
System.out.println("Processing data from: " + dataSource);
System.out.println("Operation: " + operation);
// 模拟数据处理成功
// 在实际应用中,应执行具体的数据处理任务
}
@Override
public String getTaskType() {
return "DATA_PROCESS";
}
}
12.4. TaskHandlerFactory.java
package com.example.taskscheduler.factory;
import com.example.taskscheduler.handler.TaskHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class TaskHandlerFactory {
private final List<TaskHandler> handlers;
private final Map<String, TaskHandler> handlerMap = new HashMap<>();
@Autowired
public TaskHandlerFactory(List<TaskHandler> handlers) {
this.handlers = handlers;
}
@PostConstruct
public void init() {
for (TaskHandler handler : handlers) {
handlerMap.put(handler.getTaskType(), handler);
}
}
public TaskHandler getHandler(String taskType) {
TaskHandler handler = handlerMap.get(taskType);
if (handler == null) {
throw new IllegalArgumentException("No handler found for task type: " + taskType);
}
return handler;
}
}
12.5. SchedulerConfig.java
package com.example.taskscheduler.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10); // 根据需求调整
scheduler.setThreadNamePrefix("DynamicTaskScheduler-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.initialize();
return scheduler;
}
}
12.6. ScheduledTask.java
package com.example.taskscheduler.entity;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Table(name = "scheduled_tasks", uniqueConstraints = @UniqueConstraint(columnNames = "taskId"))
@Data
@NoArgsConstructor
public class ScheduledTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String taskId; // 业务上的唯一标识
@Column(nullable = false)
private String taskType; // 任务类型
@Column(columnDefinition = "TEXT")
private String parameters; // JSON格式的参数
@Column(nullable = false)
private LocalDateTime scheduledTime; // 任务执行时间
@Column(nullable = false, updatable = false)
private LocalDateTime createdAt = LocalDateTime.now();
private LocalDateTime updatedAt = LocalDateTime.now();
private boolean executed = false;
private String executionStatus;
@Column(columnDefinition = "TEXT")
private String executionResult;
// 构造函数
public ScheduledTask(String taskId, String taskType, String parameters, LocalDateTime scheduledTime) {
this.taskId = taskId;
this.taskType = taskType;
this.parameters = parameters;
this.scheduledTime = scheduledTime;
}
@PreUpdate
public void preUpdate() {
this.updatedAt = LocalDateTime.now();
}
}
12.7. ScheduledTaskRepository.java
package com.example.taskscheduler.repository;
import com.example.taskscheduler.entity.ScheduledTask;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public interface ScheduledTaskRepository extends JpaRepository<ScheduledTask, Long> {
Optional<ScheduledTask> findByTaskId(String taskId);
boolean existsByTaskId(String taskId);
}
12.8. ScheduledTaskService.java
package com.example.taskscheduler.service;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.factory.TaskHandlerFactory;
import com.example.taskscheduler.handler.TaskHandler;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@Service
public class ScheduledTaskService {
private final ScheduledTaskRepository scheduledTaskRepository;
private final ThreadPoolTaskScheduler taskScheduler;
private final ObjectMapper objectMapper;
private final TaskHandlerFactory taskHandlerFactory;
// 用于管理 ScheduledFuture 对象,以支持任务取消等操作
private final Map<Long, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
@Autowired
public ScheduledTaskService(ScheduledTaskRepository scheduledTaskRepository,
ThreadPoolTaskScheduler taskScheduler,
ObjectMapper objectMapper,
TaskHandlerFactory taskHandlerFactory) {
this.scheduledTaskRepository = scheduledTaskRepository;
this.taskScheduler = taskScheduler;
this.objectMapper = objectMapper;
this.taskHandlerFactory = taskHandlerFactory;
}
/**
* 创建并调度一个新任务。
*
* @param taskId 业务上的唯一任务标识
* @param taskType 任务类型
* @param parameters 任务参数(Map<String, Object>)
* @param delayMinutes 延迟执行的分钟数
* @return 创建的 ScheduledTask 对象
*/
public ScheduledTask createAndScheduleTask(String taskId, String taskType, Map<String, Object> parameters, long delayMinutes) {
// 任务去重:检查是否已存在相同 taskId 的未执行任务
if (scheduledTaskRepository.existsByTaskId(taskId)) {
throw new IllegalArgumentException("Task with taskId " + taskId + " already exists.");
}
// 计算执行时间
LocalDateTime scheduledTime = LocalDateTime.now().plusMinutes(delayMinutes);
// 序列化参数为 JSON 字符串
String parametersJson;
try {
parametersJson = objectMapper.writeValueAsString(parameters);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize task parameters.", e);
}
// 创建 ScheduledTask 实体
ScheduledTask scheduledTask = new ScheduledTask(taskId, taskType, parametersJson, scheduledTime);
scheduledTaskRepository.save(scheduledTask);
// 创建并调度任务
Runnable taskRunnable = () -> executeTask(scheduledTask.getId());
ScheduledFuture<?> future = taskScheduler.schedule(taskRunnable, scheduledTime.atZone(java.time.ZoneId.systemDefault()).toInstant());
// 保存 ScheduledFuture 对象
scheduledFutures.put(scheduledTask.getId(), future);
return scheduledTask;
}
/**
* 执行任务逻辑,根据任务类型调用不同的处理方法。
*
* @param scheduledTaskId 数据库中的 ScheduledTask 的 ID
*/
public void executeTask(Long scheduledTaskId) {
Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findById(scheduledTaskId);
if (!optionalTask.isPresent()) {
// 任务记录不存在,可能已经被删除
return;
}
ScheduledTask task = optionalTask.get();
if (task.isExecuted()) {
// 任务已执行,跳过
return;
}
try {
// 反序列化参数
Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);
// 获取任务类型对应的处理器
TaskHandler handler = taskHandlerFactory.getHandler(task.getTaskType());
// 执行任务
handler.handle(params);
// 更新任务状态
task.setExecuted(true);
task.setExecutionStatus("SUCCESS");
task.setExecutionResult("Task executed successfully.");
scheduledTaskRepository.save(task);
} catch (Exception e) {
// 更新任务状态为失败
task.setExecuted(true);
task.setExecutionStatus("FAILED");
task.setExecutionResult(e.getMessage());
scheduledTaskRepository.save(task);
// 记录日志或采取其他措施
System.err.println("Failed to execute taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
}
}
/**
* 取消指定任务。
*
* @param taskId 业务上的唯一任务标识
* @return true 如果任务成功取消,false 否则
*/
public boolean cancelTask(String taskId) {
Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findByTaskId(taskId);
if (!optionalTask.isPresent()) {
return false; // 任务不存在
}
ScheduledTask task = optionalTask.get();
if (task.isExecuted()) {
return false; // 任务已执行,无法取消
}
// 获取 ScheduledFuture 对象
ScheduledFuture<?> future = scheduledFutures.get(task.getId());
if (future != null && !future.isDone()) {
boolean cancelled = future.cancel(false);
if (cancelled) {
// 更新任务状态
task.setExecuted(true);
task.setExecutionStatus("CANCELLED");
task.setExecutionResult("Task was cancelled before execution.");
scheduledTaskRepository.save(task);
// 从 map 中移除
scheduledFutures.remove(task.getId());
return true;
}
}
return false;
}
/**
* 查找任务详情。
*
* @param taskId 业务上的唯一任务标识
* @return Optional<ScheduledTask>
*/
public Optional<ScheduledTask> findByTaskId(String taskId) {
return scheduledTaskRepository.findByTaskId(taskId);
}
}
9.4. StartupService.java
package com.example.taskscheduler.service;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class StartupService {
private final ScheduledTaskRepository scheduledTaskRepository;
private final ScheduledTaskService scheduledTaskService;
private final ObjectMapper objectMapper;
@Autowired
public StartupService(ScheduledTaskRepository scheduledTaskRepository,
ScheduledTaskService scheduledTaskService,
ObjectMapper objectMapper) {
this.scheduledTaskRepository = scheduledTaskRepository;
this.scheduledTaskService = scheduledTaskService;
this.objectMapper = objectMapper;
}
@PostConstruct
public void init() {
List<ScheduledTask> pendingTasks = scheduledTaskRepository.findAll()
.stream()
.filter(task -> !task.isExecuted())
.collect(Collectors.toList());
for (ScheduledTask task : pendingTasks) {
// 计算剩余延迟时间
Duration delay = Duration.between(LocalDateTime.now(), task.getScheduledTime());
long delayInMinutes = delay.toMinutes();
if (delayInMinutes > 0) {
// 重新调度任务
try {
Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);
scheduledTaskService.createAndScheduleTask(task.getTaskId(),
task.getTaskType(),
params,
delayInMinutes);
} catch (Exception e) {
System.err.println("Failed to reschedule taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
}
} else {
// 执行延迟已过的任务
scheduledTaskService.executeTask(task.getId());
}
}
System.out.println("Recovered and rescheduled " + pendingTasks.size() + " tasks.");
}
}
9.5. CleanupService.java
package com.example.taskscheduler.service;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class CleanupService {
private final ScheduledTaskRepository scheduledTaskRepository;
@Autowired
public CleanupService(ScheduledTaskRepository scheduledTaskRepository) {
this.scheduledTaskRepository = scheduledTaskRepository;
}
/**
* 定期清理已执行且超过保留期限的任务。
* 例如,保留已执行任务7天后删除。
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void cleanUpExecutedTasks() {
LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
List<ScheduledTask> tasksToDelete = scheduledTaskRepository.findAll()
.stream()
.filter(task -> task.isExecuted() &&
(task.getScheduledTime().isBefore(cutoff) ||
"CANCELLED".equals(task.getExecutionStatus()) ||
"FAILED".equals(task.getExecutionStatus())))
.collect(Collectors.toList());
scheduledTaskRepository.deleteAll(tasksToDelete);
System.out.println("Cleaned up " + tasksToDelete.size() + " tasks.");
}
}
说明:
@Scheduled
注解用于定时执行清理任务。- 清理条件:
- 任务已执行且执行时间早于7天前。
- 任务状态为
CANCELLED
或FAILED
。
10. 控制器层
负责接收HTTP请求,管理任务的创建、取消和查询。
ScheduledTaskController.java
package com.example.taskscheduler.controller;
import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.service.ScheduledTaskService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Optional;
@RestController
@RequestMapping("/api/tasks")
public class ScheduledTaskController {
private final ScheduledTaskService scheduledTaskService;
private final ObjectMapper objectMapper;
@Autowired
public ScheduledTaskController(ScheduledTaskService scheduledTaskService,
ObjectMapper objectMapper) {
this.scheduledTaskService = scheduledTaskService;
this.objectMapper = objectMapper;
}
/**
* 创建并调度新任务。
*
* @param taskRequest 包含 taskId, taskType, parameters, delayMinutes 的请求体
* @return 创建的任务信息
*/
@PostMapping("/schedule")
public ResponseEntity<?> scheduleTask(@RequestBody TaskRequest taskRequest) {
try {
ScheduledTask task = scheduledTaskService.createAndScheduleTask(
taskRequest.getTaskId(),
taskRequest.getTaskType(),
taskRequest.getParameters(),
taskRequest.getDelayMinutes()
);
return ResponseEntity.ok(task);
} catch (IllegalArgumentException ex) {
return ResponseEntity.badRequest().body(ex.getMessage());
} catch (Exception ex) {
return ResponseEntity.status(500).body("Failed to schedule task.");
}
}
/**
* 取消指定任务。
*
* @param taskId 业务上的唯一任务标识
* @return 取消结果
*/
@DeleteMapping("/cancel/{taskId}")
public ResponseEntity<?> cancelTask(@PathVariable String taskId) {
boolean cancelled = scheduledTaskService.cancelTask(taskId);
if (cancelled) {
return ResponseEntity.ok("Task cancelled successfully.");
} else {
return ResponseEntity.badRequest().body("Failed to cancel task. It may have been executed or does not exist.");
}
}
/**
* 查询任务详情。
*
* @param taskId 业务上的唯一任务标识
* @return 任务详情
*/
@GetMapping("/{taskId}")
public ResponseEntity<?> getTaskStatus(@PathVariable String taskId) {
Optional<ScheduledTask> optionalTask = scheduledTaskService.findByTaskId(taskId);
if (optionalTask.isPresent()) {
return ResponseEntity.ok(optionalTask.get());
} else {
return ResponseEntity.status(404).body("Task not found.");
}
}
// DTO 类用于接收请求体
public static class TaskRequest {
private String taskId;
private String taskType;
private Map<String, Object> parameters;
private long delayMinutes;
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public Map<String, Object> getParameters() {
return parameters;
}
public void setParameters(Map<String, Object> parameters) {
this.parameters = parameters;
}
public long getDelayMinutes() {
return delayMinutes;
}
public void setDelayMinutes(long delayMinutes) {
this.delayMinutes = delayMinutes;
}
}
}
说明:
- 创建任务:通过
POST /api/tasks/schedule
端点提交任务请求。 - 取消任务:通过
DELETE /api/tasks/cancel/{taskId}
端点取消指定任务。 - 查询任务:通过
GET /api/tasks/{taskId}
端点查询任务详情。 - DTO类:内部静态类
TaskRequest
用于接收和解析请求体。
13. 异常处理
统一处理不同类型的异常,返回一致的 API 响应。
GlobalExceptionHandler.java
package com.example.taskscheduler.exception;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@ControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleBadRequest(Exception ex) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
}
@ExceptionHandler(UnsupportedOperationException.class)
public ResponseEntity<String> handleUnsupportedOperation(Exception ex) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
}
@ExceptionHandler(Exception.class)
public ResponseEntity<String> handleInternalError(Exception ex) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("An unexpected error occurred.");
}
}
说明:
- 业务异常处理:处理如
IllegalArgumentException
和UnsupportedOperationException
,返回400 Bad Request
。 - 通用异常处理:捕获所有其他异常,返回
500 Internal Server Error
,避免泄露敏感信息。
14. 主应用类
TaskSchedulerApplication.java
package com.example.taskscheduler;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling // 启用定时任务调度
public class TaskSchedulerApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSchedulerApplication.class, args);
}
}
说明:
@EnableScheduling
:启用Spring的定时任务调度功能,允许使用@Scheduled
注解的方法。
测试与验证
以下是如何测试和验证该动态定时任务系统的方法。
1. 启动应用程序
确保您的MySQL数据库已启动,并且application.properties
中的配置正确。然后运行主应用类。
使用Maven命令:
mvn spring-boot:run
或者在IDE(如 IntelliJ IDEA 或 Eclipse)中运行 TaskSchedulerApplication
类。
2. 创建任务
使用Postman或cURL发送HTTP请求来创建新任务。
示例1:发送EMAIL任务
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json
{
"taskId": "TASK_EMAIL_001",
"taskType": "EMAIL",
"parameters": {
"recipient": "user@example.com",
"subject": "Welcome!",
"body": "Thank you for registering."
},
"delayMinutes": 1
}
响应:
{
"id": 1,
"taskId": "TASK_EMAIL_001",
"taskType": "EMAIL",
"parameters": "{\"recipient\":\"user@example.com\",\"subject\":\"Welcome!\",\"body\":\"Thank you for registering.\"}",
"scheduledTime": "2023-10-01T10:01:00",
"createdAt": "2023-10-01T10:00:00",
"updatedAt": "2023-10-01T10:00:00",
"executed": false,
"executionStatus": null,
"executionResult": null
}
控制台输出(在1分钟后):
Sending email to: user@example.com
Subject: Welcome!
Body: Thank you for registering.
示例2:发送DATA_PROCESS任务
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json
{
"taskId": "TASK_DATA_001",
"taskType": "DATA_PROCESS",
"parameters": {
"dataSource": "database",
"operation": "backup"
},
"delayMinutes": 2
}
响应:
{
"id": 2,
"taskId": "TASK_DATA_001",
"taskType": "DATA_PROCESS",
"parameters": "{\"dataSource\":\"database\",\"operation\":\"backup\"}",
"scheduledTime": "2023-10-01T10:02:00",
"createdAt": "2023-10-01T10:00:00",
"updatedAt": "2023-10-01T10:00:00",
"executed": false,
"executionStatus": null,
"executionResult": null
}
控制台输出(在2分钟后):
Processing data from: database
Operation: backup
3. 任务去重测试
尝试创建具有相同 taskId
的任务,验证系统是否阻止重复创建。
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json
{
"taskId": "TASK_EMAIL_001",
"taskType": "EMAIL",
"parameters": {
"recipient": "duplicate@example.com",
"subject": "Duplicate Task",
"body": "This should not be scheduled."
},
"delayMinutes": 3
}
响应:
"Task with taskId TASK_EMAIL_001 already exists."
4. 取消任务
创建一个任务并在其执行前取消。
创建任务请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json
{
"taskId": "TASK_CANCEL_001",
"taskType": "EMAIL",
"parameters": {
"recipient": "cancel@example.com",
"subject": "Cancel Task",
"body": "This task will be cancelled."
},
"delayMinutes": 5
}
取消任务请求:
DELETE http://localhost:8080/api/tasks/cancel/TASK_CANCEL_001
响应:
"Task cancelled successfully."
确认取消(查询任务状态):
GET http://localhost:8080/api/tasks/TASK_CANCEL_001
响应:
{
"id": 3,
"taskId": "TASK_CANCEL_001",
"taskType": "EMAIL",
"parameters": "{\"recipient\":\"cancel@example.com\",\"subject\":\"Cancel Task\",\"body\":\"This task will be cancelled.\"}",
"scheduledTime": "2023-10-01T10:05:00",
"createdAt": "2023-10-01T10:00:00",
"updatedAt": "2023-10-01T10:00:00",
"executed": true,
"executionStatus": "CANCELLED",
"executionResult": "Task was cancelled before execution."
}
说明:
- 任务已取消:在任务执行前取消后,系统不会执行该任务。
5. 任务恢复测试
- 创建多个任务,确保有未执行的任务(如延迟执行)。
- 重启应用程序。
- 确认未执行的任务是否已恢复并按预定时间执行。
说明:
StartupService
在应用启动时会扫描未执行的任务并重新调度,确保系统重启后任务不丢失。
6. 清理任务测试
- 创建并执行多个任务。
- 等待超过保留期限(如7天)。
- 确认已执行的任务是否已被
CleanupService
清理。
说明:
CleanupService
会定期清理已执行且超过保留期限的任务,防止数据库膨胀。
优化与扩展
1. 添加新的任务类型
示例:SMS任务
创建 SmsTaskHandler.java
:
package com.example.taskscheduler.handler;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class SmsTaskHandler implements TaskHandler {
@Override
public void handle(Map<String, Object> parameters) throws Exception {
String phoneNumber = (String) parameters.get("phoneNumber");
String message = (String) parameters.get("message");
// 实现发送短信的逻辑,例如使用第三方短信服务API
System.out.println("Sending SMS to: " + phoneNumber);
System.out.println("Message: " + message);
// 模拟短信发送成功
// 在实际应用中,应集成短信发送服务
}
@Override
public String getTaskType() {
return "SMS";
}
}
测试SMS任务:
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json
{
"taskId": "TASK_SMS_001",
"taskType": "SMS",
"parameters": {
"phoneNumber": "+1234567890",
"message": "Your verification code is 123456."
},
"delayMinutes": 1
}
预期响应:
{
"id": 4,
"taskId": "TASK_SMS_001",
"taskType": "SMS",
"parameters": "{\"phoneNumber\":\"+1234567890\",\"message\":\"Your verification code is 123456.\"}",
"scheduledTime": "2023-10-01T10:06:00",
"createdAt": "2023-10-01T10:00:00",
"updatedAt": "2023-10-01T10:00:00",
"executed": false,
"executionStatus": null,
"executionResult": null
}
控制台输出(在1分钟后):
Sending SMS to: +1234567890
Message: Your verification code is 123456.
2. 集成邮件和短信发送服务
实际应用中,应集成真实的邮件和短信发送服务,如JavaMailSender
或第三方短信API。以下为集成 JavaMailSender
的示例。
2.1. 添加依赖
在 pom.xml
中添加Spring Boot Starter Mail依赖:
<!-- Spring Boot Starter Mail -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
2.2. 配置邮件属性
在 application.properties
中添加邮件服务器配置:
# 邮件服务器配置(示例使用Gmail)
spring.mail.host=smtp.gmail.com
spring.mail.port=587
spring.mail.username=your_email@gmail.com
spring.mail.password=your_email_password
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
注意:为了使用Gmail发送邮件,可能需要调整Google账户的安全设置,允许低安全性应用访问或使用App Passwords。
2.3. 修改 EmailTaskHandler.java
package com.example.taskscheduler.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class EmailTaskHandler implements TaskHandler {
private final JavaMailSender mailSender;
@Autowired
public EmailTaskHandler(JavaMailSender mailSender) {
this.mailSender = mailSender;
}
@Override
public void handle(Map<String, Object> parameters) throws Exception {
String recipient = (String) parameters.get("recipient");
String subject = (String) parameters.get("subject");
String body = (String) parameters.get("body");
// 创建简单邮件消息
SimpleMailMessage message = new SimpleMailMessage();
message.setTo(recipient);
message.setSubject(subject);
message.setText(body);
// 发送邮件
mailSender.send(message);
System.out.println("Email sent to: " + recipient);
}
@Override
public String getTaskType() {
return "EMAIL";
}
}
说明:
- 使用
JavaMailSender
发送邮件。 - 确保
application.properties
中的邮件配置正确。
3. 使用DTO类替代Map(提高类型安全)
虽然使用Map<String, Object>
提供了参数传递的灵活性,但缺乏类型安全和可读性。可以为不同的任务类型定义专用的DTO类。
示例:EmailTaskParameters.java
package com.example.taskscheduler.dto;
public class EmailTaskParameters {
private String recipient;
private String subject;
private String body;
// Getters and Setters
public String getRecipient() {
return recipient;
}
public void setRecipient(String recipient) {
this.recipient = recipient;
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
修改 EmailTaskHandler.java
以使用DTO
package com.example.taskscheduler.handler;
import com.example.taskscheduler.dto.EmailTaskParameters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
@Component
public class EmailTaskHandler implements TaskHandler {
private final JavaMailSender mailSender;
private final ObjectMapper objectMapper;
@Autowired
public EmailTaskHandler(JavaMailSender mailSender, ObjectMapper objectMapper) {
this.mailSender = mailSender;
this.objectMapper = objectMapper;
}
@Override
public void handle(Map<String, Object> parameters) throws Exception {
// 反序列化参数到DTO类
EmailTaskParameters emailParams = objectMapper.convertValue(parameters, EmailTaskParameters.class);
// 创建简单邮件消息
SimpleMailMessage message = new SimpleMailMessage();
message.setTo(emailParams.getRecipient());
message.setSubject(emailParams.getSubject());
message.setText(emailParams.getBody());
// 发送邮件
mailSender.send(message);
System.out.println("Email sent to: " + emailParams.getRecipient());
}
@Override
public String getTaskType() {
return "EMAIL";
}
}
优势:
- 类型安全:通过专用的DTO类,确保参数的类型正确,减少运行时错误。
- 可读性:代码更具可读性和可维护性。
扩展:
- 为每种任务类型定义对应的DTO类,修改相应的
TaskHandler
实现以使用这些DTO。
总结
通过以上设计和实现,您可以构建一个通用、灵活且可扩展的动态定时任务系统。该系统具备以下特点:
- 通用性:支持多种类型的任务,通过
TaskHandler
接口实现不同的任务逻辑。 - 灵活的参数传递:使用JSON格式存储任务参数,或通过专用的DTO类提高类型安全。
- 任务去重:通过
taskId
确保任务的唯一性,避免重复调度。 - 任务持久化与恢复:任务信息存储在数据库中,应用重启时自动恢复未执行的任务。
- 可扩展性:新增任务类型仅需创建新的
TaskHandler
实现类,无需修改核心逻辑。 - 任务管理:提供API端点用于创建、取消和查询任务。
- 定期清理:定时清理已执行或取消的任务,保持数据库整洁。
- 异常处理:统一的异常处理机制,确保API的一致性和安全性。
您可以根据具体业务需求进一步扩展和优化该系统,如:
- 任务重试机制:为失败的任务设置自动重试策略。
- 任务优先级:为任务设置优先级,调整调度逻辑以优先处理高优先级任务。
- 分布式调度:如果系统需要在多实例下运行,可考虑集成分布式调度框架(如Quartz集群)。
- 监控与报警:集成监控工具,实时监控任务执行状态,设置报警规则。
希望这个完整的实现能帮助您顺利构建所需的动态定时任务系统。