手机网站怎么导入微信朋友圈抖音关键词用户搜索排名靠前
下面是一个通用的动态定时任务系统的完整实现。这套系统采用策略模式和Spring的依赖注入,使得添加新的任务类型变得更加便捷和模块化。以下内容包括项目结构、必要的依赖配置、实体类、仓库接口、任务处理器接口与实现、任务处理器工厂、服务层、控制器层、异常处理、定时任务调度配置以及应用启动与任务恢复等关键组件的完整代码。
目录
- 项目结构
- 依赖配置 (
pom.xml
) - 应用配置 (
application.properties
) - 实体类
- ScheduledTask.java
- 仓库接口
- ScheduledTaskRepository.java
- 任务处理器
- TaskHandler.java
- EmailTaskHandler.java
- DataProcessTaskHandler.java
- 任务处理器工厂
- TaskHandlerFactory.java
- 定时任务调度配置
- SchedulerConfig.java
- 服务层
- ScheduledTaskService.java
- StartupService.java
- CleanupService.java
- 控制器层
- ScheduledTaskController.java
- 异常处理
- GlobalExceptionHandler.java
- 主应用类
- TaskSchedulerApplication.java
1. 项目结构
以下是项目的包结构示意:
src
├── main
│ ├── java
│ │ └── com
│ │ └── example
│ │ └── taskscheduler
│ │ ├── TaskSchedulerApplication.java
│ │ ├── config
│ │ │ └── SchedulerConfig.java
│ │ ├── controller
│ │ │ └── ScheduledTaskController.java
│ │ ├── entity
│ │ │ └── ScheduledTask.java
│ │ ├── exception
│ │ │ └── GlobalExceptionHandler.java
│ │ ├── factory
│ │ │ └── TaskHandlerFactory.java
│ │ ├── handler
│ │ │ ├── DataProcessTaskHandler.java
│ │ │ ├── EmailTaskHandler.java
│ │ │ └── TaskHandler.java
│ │ ├── repository
│ │ │ └── ScheduledTaskRepository.java
│ │ └── service
│ │ ├── CleanupService.java
│ │ ├── ScheduledTaskService.java
│ │ └── StartupService.java
│ └── resources
│ └── application.properties
└── test└── java└── com└── example└── taskscheduler└── TaskSchedulerApplicationTests.java
2. 依赖配置 (pom.xml
)
首先,确保在项目的 pom.xml
文件中添加必要的依赖。以下是一个示例配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>taskscheduler</artifactId><version>0.0.1-SNAPSHOT</version><name>TaskScheduler</name><description>通用的动态定时任务系统</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.1</version><relativePath/></parent><properties><java.version>17</java.version> <!-- 确保使用支持的Java版本 --></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter Data JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 数据库驱动(以MySQL为例) --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Lombok(可选,用于简化代码) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><!-- Spring Boot Starter Test(用于测试) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><!-- Maven Compiler Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>${java.version}</source><target>${java.version}</target><annotationProcessorPaths><!-- Lombok 注解处理器 --><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></path></annotationProcessorPaths></configuration></plugin></plugins></build>
</project>
说明:
- Spring Boot Starter Web:用于构建Web应用,包括REST API。
- Spring Boot Starter Data JPA:用于与数据库交互。
- Jackson Databind:用于JSON处理。
- MySQL Connector/J:如果使用MySQL数据库,需添加对应驱动。
- Lombok:用于简化Java代码(如生成getter/setter),可选但推荐。
- Spring Boot Starter Test:用于测试。
3. 应用配置 (application.properties
)
在 src/main/resources/
目录下创建 application.properties
文件,配置数据库连接和JPA设置。
# 数据库配置(以MySQL为例)
spring.datasource.url=jdbc:mysql://localhost:3306/taskscheduler?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=yourpassword# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.format_sql=true# 定时任务初始延迟和间隔(可选,根据需要调整)
# 我们将在服务层自定义这些,通常不需要在此配置# Logging配置(可选)
logging.level.org.springframework=INFO
logging.level.com.example.taskscheduler=DEBUG
注意:
- 数据库URL:确保替换为您实际的数据库URL、用户名和密码。
spring.jpa.hibernate.ddl-auto=update
:让JPA自动创建或更新数据库表结构。对于生产环境,建议使用更严格的设置。- 日志级别:根据需要调整,以获取更多或更少的日志信息。
4. 实体类
ScheduledTask.java
package com.example.taskscheduler.entity;import lombok.Data;
import lombok.NoArgsConstructor;import javax.persistence.*;
import java.time.LocalDateTime;@Entity
@Table(name = "scheduled_tasks", uniqueConstraints = @UniqueConstraint(columnNames = "taskId"))
@Data
@NoArgsConstructor
public class ScheduledTask {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(nullable = false, unique = true)private String taskId; // 业务上的唯一标识@Column(nullable = false)private String taskType; // 任务类型@Column(columnDefinition = "TEXT")private String parameters; // JSON格式的参数@Column(nullable = false)private LocalDateTime scheduledTime; // 任务执行时间@Column(nullable = false, updatable = false)private LocalDateTime createdAt = LocalDateTime.now();private LocalDateTime updatedAt = LocalDateTime.now();private boolean executed = false;private String executionStatus;@Column(columnDefinition = "TEXT")private String executionResult;// 构造函数public ScheduledTask(String taskId, String taskType, String parameters, LocalDateTime scheduledTime) {this.taskId = taskId;this.taskType = taskType;this.parameters = parameters;this.scheduledTime = scheduledTime;}@PreUpdatepublic void preUpdate() {this.updatedAt = LocalDateTime.now();}
}
说明:
- Lombok注解:
@Data
自动生成getter、setter、toString等方法;@NoArgsConstructor
生成无参构造函数。 - 字段解释:
taskId
:业务上的唯一标识,用于任务去重。taskType
:任务类型标识,如EMAIL
、DATA_PROCESS
。parameters
:任务执行所需的参数,以JSON字符串形式存储。scheduledTime
:任务的执行时间。executed
:任务是否已执行。executionStatus
和executionResult
:记录任务执行状态和结果。
5. 仓库接口
ScheduledTaskRepository.java
package com.example.taskscheduler.repository;import com.example.taskscheduler.entity.ScheduledTask;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;import java.util.Optional;@Repository
public interface ScheduledTaskRepository extends JpaRepository<ScheduledTask, Long> {Optional<ScheduledTask> findByTaskId(String taskId);boolean existsByTaskId(String taskId);
}
说明:
- 提供根据
taskId
查询和验证任务是否存在的方法,以实现任务去重。
6. 任务处理器
采用策略模式,为每种任务类型创建对应的处理器,实现灵活和可扩展的任务执行逻辑。
6.1 TaskHandler.java
package com.example.taskscheduler.handler;import java.util.Map;public interface TaskHandler {/*** 执行任务的逻辑。** @param parameters 任务参数,以键值对形式传递* @throws Exception 执行过程中可能抛出的异常*/void handle(Map<String, Object> parameters) throws Exception;/*** 返回该处理器支持的任务类型。** @return 任务类型的字符串标识*/String getTaskType();
}
6.2 EmailTaskHandler.java
package com.example.taskscheduler.handler;import org.springframework.stereotype.Component;import java.util.Map;@Component
public class EmailTaskHandler implements TaskHandler {@Overridepublic void handle(Map<String, Object> parameters) throws Exception {String recipient = (String) parameters.get("recipient");String subject = (String) parameters.get("subject");String body = (String) parameters.get("body");// 实现发送邮件的逻辑,例如使用JavaMailSenderSystem.out.println("Sending email to: " + recipient);System.out.println("Subject: " + subject);System.out.println("Body: " + body);// 模拟邮件发送成功// 在实际应用中,应集成邮件发送服务}@Overridepublic String getTaskType() {return "EMAIL";}
}
6.3 DataProcessTaskHandler.java
package com.example.taskscheduler.handler;import org.springframework.stereotype.Component;import java.util.Map;@Component
public class DataProcessTaskHandler implements TaskHandler {@Overridepublic void handle(Map<String, Object> parameters) throws Exception {String dataSource = (String) parameters.get("dataSource");String operation = (String) parameters.get("operation");// 实现数据处理的逻辑System.out.println("Processing data from: " + dataSource);System.out.println("Operation: " + operation);// 模拟数据处理成功// 在实际应用中,应执行具体的数据处理任务}@Overridepublic String getTaskType() {return "DATA_PROCESS";}
}
扩展性:
- 添加新的任务类型时,只需创建新的
TaskHandler
实现类,并通过@Component
注解注册,即可自动被工厂识别和调用,无需修改现有代码。
7. 任务处理器工厂
负责根据任务类型查找并返回对应的 TaskHandler
实例。
TaskHandlerFactory.java
package com.example.taskscheduler.factory;import com.example.taskscheduler.handler.TaskHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Component
public class TaskHandlerFactory {private final List<TaskHandler> handlers;private final Map<String, TaskHandler> handlerMap = new HashMap<>();@Autowiredpublic TaskHandlerFactory(List<TaskHandler> handlers) {this.handlers = handlers;}@PostConstructpublic void init() {for (TaskHandler handler : handlers) {handlerMap.put(handler.getTaskType(), handler);}}/*** 根据任务类型获取对应的处理器。** @param taskType 任务类型的字符串标识* @return 对应的 TaskHandler 实例* @throws IllegalArgumentException 如果没有找到对应的处理器*/public TaskHandler getHandler(String taskType) {TaskHandler handler = handlerMap.get(taskType);if (handler == null) {throw new IllegalArgumentException("No handler found for task type: " + taskType);}return handler;}
}
说明:
- 自动注入所有实现了
TaskHandler
接口的Bean,并在@PostConstruct
阶段初始化handlerMap
,映射任务类型到处理器。 getHandler
方法根据任务类型获取对应的处理器实例。
8. 定时任务调度配置
配置 ThreadPoolTaskScheduler
Bean,用于动态调度任务。
SchedulerConfig.java
package com.example.taskscheduler.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10); // 根据需求调整scheduler.setThreadNamePrefix("DynamicTaskScheduler-");scheduler.setAwaitTerminationSeconds(60);scheduler.setWaitForTasksToCompleteOnShutdown(true);scheduler.initialize();return scheduler;}
}
说明:
- 线程池大小:根据预期的并发任务量调整
poolSize
。 - 线程名称前缀:便于在日志中追踪任务执行。
- 关闭时等待任务完成:确保在应用关闭时,正在执行的任务能够完成。
9. 服务层
负责核心的任务管理逻辑,包括任务创建、调度、执行和取消。
9.1 ScheduledTaskService.java
package com.example.taskscheduler.service;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.factory.TaskHandlerFactory;
import com.example.taskscheduler.handler.TaskHandler;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;@Service
public class ScheduledTaskService {private final ScheduledTaskRepository scheduledTaskRepository;private final ThreadPoolTaskScheduler taskScheduler;private final ObjectMapper objectMapper;private final TaskHandlerFactory taskHandlerFactory;// 用于管理 ScheduledFuture 对象,以支持任务取消等操作private final Map<Long, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();@Autowiredpublic ScheduledTaskService(ScheduledTaskRepository scheduledTaskRepository,ThreadPoolTaskScheduler taskScheduler,ObjectMapper objectMapper,TaskHandlerFactory taskHandlerFactory) {this.scheduledTaskRepository = scheduledTaskRepository;this.taskScheduler = taskScheduler;this.objectMapper = objectMapper;this.taskHandlerFactory = taskHandlerFactory;}/*** 创建并调度一个新任务。** @param taskId 业务上的唯一任务标识* @param taskType 任务类型* @param parameters 任务参数(Map<String, Object>)* @param delayMinutes 延迟执行的分钟数* @return 创建的 ScheduledTask 对象*/public ScheduledTask createAndScheduleTask(String taskId, String taskType, Map<String, Object> parameters, long delayMinutes) {// 任务去重:检查是否已存在相同 taskId 的未执行任务if (scheduledTaskRepository.existsByTaskId(taskId)) {throw new IllegalArgumentException("Task with taskId " + taskId + " already exists.");}// 计算执行时间LocalDateTime scheduledTime = LocalDateTime.now().plusMinutes(delayMinutes);// 序列化参数为 JSON 字符串String parametersJson;try {parametersJson = objectMapper.writeValueAsString(parameters);} catch (Exception e) {throw new RuntimeException("Failed to serialize task parameters.", e);}// 创建 ScheduledTask 实体ScheduledTask scheduledTask = new ScheduledTask(taskId, taskType, parametersJson, scheduledTime);scheduledTaskRepository.save(scheduledTask);// 创建并调度任务Runnable taskRunnable = () -> executeTask(scheduledTask.getId());ScheduledFuture<?> future = taskScheduler.schedule(taskRunnable, scheduledTime.atZone(java.time.ZoneId.systemDefault()).toInstant());// 保存 ScheduledFuture 对象scheduledFutures.put(scheduledTask.getId(), future);return scheduledTask;}/*** 执行任务逻辑,根据任务类型调用不同的处理方法。** @param scheduledTaskId 数据库中的 ScheduledTask 的 ID*/public void executeTask(Long scheduledTaskId) {Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findById(scheduledTaskId);if (!optionalTask.isPresent()) {// 任务记录不存在,可能已经被删除return;}ScheduledTask task = optionalTask.get();if (task.isExecuted()) {// 任务已执行,跳过return;}try {// 反序列化参数Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);// 获取任务类型对应的处理器TaskHandler handler = taskHandlerFactory.getHandler(task.getTaskType());// 执行任务handler.handle(params);// 更新任务状态task.setExecuted(true);task.setExecutionStatus("SUCCESS");task.setExecutionResult("Task executed successfully.");scheduledTaskRepository.save(task);} catch (Exception e) {// 更新任务状态为失败task.setExecuted(true);task.setExecutionStatus("FAILED");task.setExecutionResult(e.getMessage());scheduledTaskRepository.save(task);// 记录日志或采取其他措施System.err.println("Failed to execute taskId: " + task.getTaskId() + ", Error: " + e.getMessage());}}/*** 取消指定任务。** @param taskId 业务上的唯一任务标识* @return true 如果任务成功取消,false 否则*/public boolean cancelTask(String taskId) {Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findByTaskId(taskId);if (!optionalTask.isPresent()) {return false; // 任务不存在}ScheduledTask task = optionalTask.get();if (task.isExecuted()) {return false; // 任务已执行,无法取消}// 获取 ScheduledFuture 对象ScheduledFuture<?> future = scheduledFutures.get(task.getId());if (future != null && !future.isDone()) {boolean cancelled = future.cancel(false);if (cancelled) {// 更新任务状态task.setExecuted(true);task.setExecutionStatus("CANCELLED");task.setExecutionResult("Task was cancelled before execution.");scheduledTaskRepository.save(task);// 从 map 中移除scheduledFutures.remove(task.getId());return true;}}return false;}/*** 查找任务详情。** @param taskId 业务上的唯一任务标识* @return Optional<ScheduledTask>*/public Optional<ScheduledTask> findByTaskId(String taskId) {return scheduledTaskRepository.findByTaskId(taskId);}
}
9.2 StartupService.java
负责在应用启动时恢复未执行的任务,确保系统重启后任务不丢失。
package com.example.taskscheduler.service;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Service
public class StartupService {private final ScheduledTaskRepository scheduledTaskRepository;private final ScheduledTaskService scheduledTaskService;private final ObjectMapper objectMapper;@Autowiredpublic StartupService(ScheduledTaskRepository scheduledTaskRepository,ScheduledTaskService scheduledTaskService,ObjectMapper objectMapper) {this.scheduledTaskRepository = scheduledTaskRepository;this.scheduledTaskService = scheduledTaskService;this.objectMapper = objectMapper;}@PostConstructpublic void init() {List<ScheduledTask> pendingTasks = scheduledTaskRepository.findAll().stream().filter(task -> !task.isExecuted()).collect(Collectors.toList());for (ScheduledTask task : pendingTasks) {// 计算剩余延迟时间Duration delay = Duration.between(LocalDateTime.now(), task.getScheduledTime());long delayInMinutes = delay.toMinutes();if (delayInMinutes > 0) {// 重新调度任务try {Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);scheduledTaskService.createAndScheduleTask(task.getTaskId(),task.getTaskType(),params,delayInMinutes);} catch (Exception e) {System.err.println("Failed to reschedule taskId: " + task.getTaskId() + ", Error: " + e.getMessage());}} else {// 执行延迟已过的任务scheduledTaskService.executeTask(task.getId());}}System.out.println("Recovered and rescheduled " + pendingTasks.size() + " tasks.");}
}
9.3 CleanupService.java
负责定期清理已执行或取消的任务,防止数据库膨胀。
package com.example.taskscheduler.service;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;@Service
public class CleanupService {private final ScheduledTaskRepository scheduledTaskRepository;private final ObjectMapper objectMapper;@Autowiredpublic CleanupService(ScheduledTaskRepository scheduledTaskRepository,ObjectMapper objectMapper) {this.scheduledTaskRepository = scheduledTaskRepository;this.objectMapper = objectMapper;}/*** 定期清理已执行且超过保留期限的任务。* 例如,保留已执行任务7天后删除。*/@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行public void cleanUpExecutedTasks() {LocalDateTime cutoff = LocalDateTime.now().minusDays(7);List<ScheduledTask> tasksToDelete = scheduledTaskRepository.findAll().stream().filter(task -> task.isExecuted() &&(task.getScheduledTime().isBefore(cutoff) ||"CANCELLED".equals(task.getExecutionStatus()) ||"FAILED".equals(task.getExecutionStatus()))).collect(Collectors.toList());scheduledTaskRepository.deleteAll(tasksToDelete);System.out.println("Cleaned up " + tasksToDelete.size() + " tasks.");}
}
说明:
@Scheduled(cron = "0 0 2 * * ?")
:设置任务每天凌晨2点执行。- 清理条件:
- 任务已执行且执行时间早于7天前。
- 任务状态为
CANCELLED
或FAILED
。
10. 控制器层
提供REST API端点,用于创建、取消和查询任务。
ScheduledTaskController.java
package com.example.taskscheduler.controller;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.service.ScheduledTaskService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Optional;@RestController
@RequestMapping("/api/tasks")
public class ScheduledTaskController {private final ScheduledTaskService scheduledTaskService;private final ObjectMapper objectMapper;@Autowiredpublic ScheduledTaskController(ScheduledTaskService scheduledTaskService,ObjectMapper objectMapper) {this.scheduledTaskService = scheduledTaskService;this.objectMapper = objectMapper;}/*** 创建并调度新任务。** @param taskRequest 包含 taskId, taskType, parameters, delayMinutes 的请求体* @return 创建的任务信息*/@PostMapping("/schedule")public ResponseEntity<?> scheduleTask(@RequestBody TaskRequest taskRequest) {try {ScheduledTask task = scheduledTaskService.createAndScheduleTask(taskRequest.getTaskId(),taskRequest.getTaskType(),taskRequest.getParameters(),taskRequest.getDelayMinutes());return ResponseEntity.ok(task);} catch (IllegalArgumentException ex) {return ResponseEntity.badRequest().body(ex.getMessage());} catch (Exception ex) {return ResponseEntity.status(500).body("Failed to schedule task.");}}/*** 取消指定任务。** @param taskId 业务上的唯一任务标识* @return 取消结果*/@DeleteMapping("/cancel/{taskId}")public ResponseEntity<?> cancelTask(@PathVariable String taskId) {boolean cancelled = scheduledTaskService.cancelTask(taskId);if (cancelled) {return ResponseEntity.ok("Task cancelled successfully.");} else {return ResponseEntity.badRequest().body("Failed to cancel task. It may have been executed or does not exist.");}}/*** 查询任务详情。** @param taskId 业务上的唯一任务标识* @return 任务详情*/@GetMapping("/{taskId}")public ResponseEntity<?> getTaskStatus(@PathVariable String taskId) {Optional<ScheduledTask> optionalTask = scheduledTaskService.findByTaskId(taskId);if (optionalTask.isPresent()) {return ResponseEntity.ok(optionalTask.get());} else {return ResponseEntity.status(404).body("Task not found.");}}// DTO 类用于接收请求体public static class TaskRequest {private String taskId;private String taskType;private Map<String, Object> parameters;private long delayMinutes;public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public String getTaskType() {return taskType;}public void setTaskType(String taskType) {this.taskType = taskType;}public Map<String, Object> getParameters() {return parameters;}public void setParameters(Map<String, Object> parameters) {this.parameters = parameters;}public long getDelayMinutes() {return delayMinutes;}public void setDelayMinutes(long delayMinutes) {this.delayMinutes = delayMinutes;}}
}
说明:
- 创建任务:通过
POST /api/tasks/schedule
端点接收任务请求,包含taskId
、taskType
、parameters
和delayMinutes
。 - 取消任务:通过
DELETE /api/tasks/cancel/{taskId}
端点取消特定任务。 - 查询任务:通过
GET /api/tasks/{taskId}
端点获取任务详情。
11. 异常处理
统一的全局异常处理,确保API响应的一致性和信息的安全性。
GlobalExceptionHandler.java
package com.example.taskscheduler.exception;import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@ControllerAdvice
public class GlobalExceptionHandler {@ExceptionHandler(IllegalArgumentException.class)public ResponseEntity<String> handleBadRequest(Exception ex) {return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());}@ExceptionHandler(UnsupportedOperationException.class)public ResponseEntity<String> handleUnsupportedOperation(Exception ex) {return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());}@ExceptionHandler(Exception.class)public ResponseEntity<String> handleInternalError(Exception ex) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("An unexpected error occurred.");}
}
说明:
@ControllerAdvice
:允许定义全局的异常处理规则。- 特定异常处理:为常见的业务异常(如
IllegalArgumentException
和UnsupportedOperationException
)提供定制化的响应。 - 通用异常处理:捕获所有未处理的异常,避免泄露敏感信息。
12. 主应用类
定义Spring Boot的主应用入口。
TaskSchedulerApplication.java
package com.example.taskscheduler;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 启用定时任务调度
public class TaskSchedulerApplication {public static void main(String[] args) {SpringApplication.run(TaskSchedulerApplication.class, args);}
}
说明:
@SpringBootApplication
:标识这是一个Spring Boot应用。@EnableScheduling
:启用Spring的定时任务调度功能,允许使用@Scheduled
注解的方法。
完整代码汇总
为了便于理解,以下是各个关键类的完整代码。
12.1. TaskHandler.java
package com.example.taskscheduler.handler;import java.util.Map;public interface TaskHandler {void handle(Map<String, Object> parameters) throws Exception;String getTaskType();
}
12.2. EmailTaskHandler.java
package com.example.taskscheduler.handler;import org.springframework.stereotype.Component;import java.util.Map;@Component
public class EmailTaskHandler implements TaskHandler {@Overridepublic void handle(Map<String, Object> parameters) throws Exception {String recipient = (String) parameters.get("recipient");String subject = (String) parameters.get("subject");String body = (String) parameters.get("body");// 实现发送邮件的逻辑,例如使用JavaMailSenderSystem.out.println("Sending email to: " + recipient);System.out.println("Subject: " + subject);System.out.println("Body: " + body);// 模拟邮件发送成功// 在实际应用中,应集成邮件发送服务}@Overridepublic String getTaskType() {return "EMAIL";}
}
12.3. DataProcessTaskHandler.java
package com.example.taskscheduler.handler;import org.springframework.stereotype.Component;import java.util.Map;@Component
public class DataProcessTaskHandler implements TaskHandler {@Overridepublic void handle(Map<String, Object> parameters) throws Exception {String dataSource = (String) parameters.get("dataSource");String operation = (String) parameters.get("operation");// 实现数据处理的逻辑System.out.println("Processing data from: " + dataSource);System.out.println("Operation: " + operation);// 模拟数据处理成功// 在实际应用中,应执行具体的数据处理任务}@Overridepublic String getTaskType() {return "DATA_PROCESS";}
}
12.4. TaskHandlerFactory.java
package com.example.taskscheduler.factory;import com.example.taskscheduler.handler.TaskHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Component
public class TaskHandlerFactory {private final List<TaskHandler> handlers;private final Map<String, TaskHandler> handlerMap = new HashMap<>();@Autowiredpublic TaskHandlerFactory(List<TaskHandler> handlers) {this.handlers = handlers;}@PostConstructpublic void init() {for (TaskHandler handler : handlers) {handlerMap.put(handler.getTaskType(), handler);}}public TaskHandler getHandler(String taskType) {TaskHandler handler = handlerMap.get(taskType);if (handler == null) {throw new IllegalArgumentException("No handler found for task type: " + taskType);}return handler;}
}
12.5. SchedulerConfig.java
package com.example.taskscheduler.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10); // 根据需求调整scheduler.setThreadNamePrefix("DynamicTaskScheduler-");scheduler.setAwaitTerminationSeconds(60);scheduler.setWaitForTasksToCompleteOnShutdown(true);scheduler.initialize();return scheduler;}
}
12.6. ScheduledTask.java
package com.example.taskscheduler.entity;import lombok.Data;
import lombok.NoArgsConstructor;import javax.persistence.*;
import java.time.LocalDateTime;@Entity
@Table(name = "scheduled_tasks", uniqueConstraints = @UniqueConstraint(columnNames = "taskId"))
@Data
@NoArgsConstructor
public class ScheduledTask {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(nullable = false, unique = true)private String taskId; // 业务上的唯一标识@Column(nullable = false)private String taskType; // 任务类型@Column(columnDefinition = "TEXT")private String parameters; // JSON格式的参数@Column(nullable = false)private LocalDateTime scheduledTime; // 任务执行时间@Column(nullable = false, updatable = false)private LocalDateTime createdAt = LocalDateTime.now();private LocalDateTime updatedAt = LocalDateTime.now();private boolean executed = false;private String executionStatus;@Column(columnDefinition = "TEXT")private String executionResult;// 构造函数public ScheduledTask(String taskId, String taskType, String parameters, LocalDateTime scheduledTime) {this.taskId = taskId;this.taskType = taskType;this.parameters = parameters;this.scheduledTime = scheduledTime;}@PreUpdatepublic void preUpdate() {this.updatedAt = LocalDateTime.now();}
}
12.7. ScheduledTaskRepository.java
package com.example.taskscheduler.repository;import com.example.taskscheduler.entity.ScheduledTask;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;import java.util.Optional;@Repository
public interface ScheduledTaskRepository extends JpaRepository<ScheduledTask, Long> {Optional<ScheduledTask> findByTaskId(String taskId);boolean existsByTaskId(String taskId);
}
12.8. ScheduledTaskService.java
package com.example.taskscheduler.service;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.factory.TaskHandlerFactory;
import com.example.taskscheduler.handler.TaskHandler;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;@Service
public class ScheduledTaskService {private final ScheduledTaskRepository scheduledTaskRepository;private final ThreadPoolTaskScheduler taskScheduler;private final ObjectMapper objectMapper;private final TaskHandlerFactory taskHandlerFactory;// 用于管理 ScheduledFuture 对象,以支持任务取消等操作private final Map<Long, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();@Autowiredpublic ScheduledTaskService(ScheduledTaskRepository scheduledTaskRepository,ThreadPoolTaskScheduler taskScheduler,ObjectMapper objectMapper,TaskHandlerFactory taskHandlerFactory) {this.scheduledTaskRepository = scheduledTaskRepository;this.taskScheduler = taskScheduler;this.objectMapper = objectMapper;this.taskHandlerFactory = taskHandlerFactory;}/*** 创建并调度一个新任务。** @param taskId 业务上的唯一任务标识* @param taskType 任务类型* @param parameters 任务参数(Map<String, Object>)* @param delayMinutes 延迟执行的分钟数* @return 创建的 ScheduledTask 对象*/public ScheduledTask createAndScheduleTask(String taskId, String taskType, Map<String, Object> parameters, long delayMinutes) {// 任务去重:检查是否已存在相同 taskId 的未执行任务if (scheduledTaskRepository.existsByTaskId(taskId)) {throw new IllegalArgumentException("Task with taskId " + taskId + " already exists.");}// 计算执行时间LocalDateTime scheduledTime = LocalDateTime.now().plusMinutes(delayMinutes);// 序列化参数为 JSON 字符串String parametersJson;try {parametersJson = objectMapper.writeValueAsString(parameters);} catch (Exception e) {throw new RuntimeException("Failed to serialize task parameters.", e);}// 创建 ScheduledTask 实体ScheduledTask scheduledTask = new ScheduledTask(taskId, taskType, parametersJson, scheduledTime);scheduledTaskRepository.save(scheduledTask);// 创建并调度任务Runnable taskRunnable = () -> executeTask(scheduledTask.getId());ScheduledFuture<?> future = taskScheduler.schedule(taskRunnable, scheduledTime.atZone(java.time.ZoneId.systemDefault()).toInstant());// 保存 ScheduledFuture 对象scheduledFutures.put(scheduledTask.getId(), future);return scheduledTask;}/*** 执行任务逻辑,根据任务类型调用不同的处理方法。** @param scheduledTaskId 数据库中的 ScheduledTask 的 ID*/public void executeTask(Long scheduledTaskId) {Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findById(scheduledTaskId);if (!optionalTask.isPresent()) {// 任务记录不存在,可能已经被删除return;}ScheduledTask task = optionalTask.get();if (task.isExecuted()) {// 任务已执行,跳过return;}try {// 反序列化参数Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);// 获取任务类型对应的处理器TaskHandler handler = taskHandlerFactory.getHandler(task.getTaskType());// 执行任务handler.handle(params);// 更新任务状态task.setExecuted(true);task.setExecutionStatus("SUCCESS");task.setExecutionResult("Task executed successfully.");scheduledTaskRepository.save(task);} catch (Exception e) {// 更新任务状态为失败task.setExecuted(true);task.setExecutionStatus("FAILED");task.setExecutionResult(e.getMessage());scheduledTaskRepository.save(task);// 记录日志或采取其他措施System.err.println("Failed to execute taskId: " + task.getTaskId() + ", Error: " + e.getMessage());}}/*** 取消指定任务。** @param taskId 业务上的唯一任务标识* @return true 如果任务成功取消,false 否则*/public boolean cancelTask(String taskId) {Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findByTaskId(taskId);if (!optionalTask.isPresent()) {return false; // 任务不存在}ScheduledTask task = optionalTask.get();if (task.isExecuted()) {return false; // 任务已执行,无法取消}// 获取 ScheduledFuture 对象ScheduledFuture<?> future = scheduledFutures.get(task.getId());if (future != null && !future.isDone()) {boolean cancelled = future.cancel(false);if (cancelled) {// 更新任务状态task.setExecuted(true);task.setExecutionStatus("CANCELLED");task.setExecutionResult("Task was cancelled before execution.");scheduledTaskRepository.save(task);// 从 map 中移除scheduledFutures.remove(task.getId());return true;}}return false;}/*** 查找任务详情。** @param taskId 业务上的唯一任务标识* @return Optional<ScheduledTask>*/public Optional<ScheduledTask> findByTaskId(String taskId) {return scheduledTaskRepository.findByTaskId(taskId);}
}
9.4. StartupService.java
package com.example.taskscheduler.service;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Service
public class StartupService {private final ScheduledTaskRepository scheduledTaskRepository;private final ScheduledTaskService scheduledTaskService;private final ObjectMapper objectMapper;@Autowiredpublic StartupService(ScheduledTaskRepository scheduledTaskRepository,ScheduledTaskService scheduledTaskService,ObjectMapper objectMapper) {this.scheduledTaskRepository = scheduledTaskRepository;this.scheduledTaskService = scheduledTaskService;this.objectMapper = objectMapper;}@PostConstructpublic void init() {List<ScheduledTask> pendingTasks = scheduledTaskRepository.findAll().stream().filter(task -> !task.isExecuted()).collect(Collectors.toList());for (ScheduledTask task : pendingTasks) {// 计算剩余延迟时间Duration delay = Duration.between(LocalDateTime.now(), task.getScheduledTime());long delayInMinutes = delay.toMinutes();if (delayInMinutes > 0) {// 重新调度任务try {Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);scheduledTaskService.createAndScheduleTask(task.getTaskId(),task.getTaskType(),params,delayInMinutes);} catch (Exception e) {System.err.println("Failed to reschedule taskId: " + task.getTaskId() + ", Error: " + e.getMessage());}} else {// 执行延迟已过的任务scheduledTaskService.executeTask(task.getId());}}System.out.println("Recovered and rescheduled " + pendingTasks.size() + " tasks.");}
}
9.5. CleanupService.java
package com.example.taskscheduler.service;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;@Service
public class CleanupService {private final ScheduledTaskRepository scheduledTaskRepository;@Autowiredpublic CleanupService(ScheduledTaskRepository scheduledTaskRepository) {this.scheduledTaskRepository = scheduledTaskRepository;}/*** 定期清理已执行且超过保留期限的任务。* 例如,保留已执行任务7天后删除。*/@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行public void cleanUpExecutedTasks() {LocalDateTime cutoff = LocalDateTime.now().minusDays(7);List<ScheduledTask> tasksToDelete = scheduledTaskRepository.findAll().stream().filter(task -> task.isExecuted() &&(task.getScheduledTime().isBefore(cutoff) ||"CANCELLED".equals(task.getExecutionStatus()) ||"FAILED".equals(task.getExecutionStatus()))).collect(Collectors.toList());scheduledTaskRepository.deleteAll(tasksToDelete);System.out.println("Cleaned up " + tasksToDelete.size() + " tasks.");}
}
说明:
@Scheduled
注解用于定时执行清理任务。- 清理条件:
- 任务已执行且执行时间早于7天前。
- 任务状态为
CANCELLED
或FAILED
。
10. 控制器层
负责接收HTTP请求,管理任务的创建、取消和查询。
ScheduledTaskController.java
package com.example.taskscheduler.controller;import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.service.ScheduledTaskService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Optional;@RestController
@RequestMapping("/api/tasks")
public class ScheduledTaskController {private final ScheduledTaskService scheduledTaskService;private final ObjectMapper objectMapper;@Autowiredpublic ScheduledTaskController(ScheduledTaskService scheduledTaskService,ObjectMapper objectMapper) {this.scheduledTaskService = scheduledTaskService;this.objectMapper = objectMapper;}/*** 创建并调度新任务。** @param taskRequest 包含 taskId, taskType, parameters, delayMinutes 的请求体* @return 创建的任务信息*/@PostMapping("/schedule")public ResponseEntity<?> scheduleTask(@RequestBody TaskRequest taskRequest) {try {ScheduledTask task = scheduledTaskService.createAndScheduleTask(taskRequest.getTaskId(),taskRequest.getTaskType(),taskRequest.getParameters(),taskRequest.getDelayMinutes());return ResponseEntity.ok(task);} catch (IllegalArgumentException ex) {return ResponseEntity.badRequest().body(ex.getMessage());} catch (Exception ex) {return ResponseEntity.status(500).body("Failed to schedule task.");}}/*** 取消指定任务。** @param taskId 业务上的唯一任务标识* @return 取消结果*/@DeleteMapping("/cancel/{taskId}")public ResponseEntity<?> cancelTask(@PathVariable String taskId) {boolean cancelled = scheduledTaskService.cancelTask(taskId);if (cancelled) {return ResponseEntity.ok("Task cancelled successfully.");} else {return ResponseEntity.badRequest().body("Failed to cancel task. It may have been executed or does not exist.");}}/*** 查询任务详情。** @param taskId 业务上的唯一任务标识* @return 任务详情*/@GetMapping("/{taskId}")public ResponseEntity<?> getTaskStatus(@PathVariable String taskId) {Optional<ScheduledTask> optionalTask = scheduledTaskService.findByTaskId(taskId);if (optionalTask.isPresent()) {return ResponseEntity.ok(optionalTask.get());} else {return ResponseEntity.status(404).body("Task not found.");}}// DTO 类用于接收请求体public static class TaskRequest {private String taskId;private String taskType;private Map<String, Object> parameters;private long delayMinutes;public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public String getTaskType() {return taskType;}public void setTaskType(String taskType) {this.taskType = taskType;}public Map<String, Object> getParameters() {return parameters;}public void setParameters(Map<String, Object> parameters) {this.parameters = parameters;}public long getDelayMinutes() {return delayMinutes;}public void setDelayMinutes(long delayMinutes) {this.delayMinutes = delayMinutes;}}
}
说明:
- 创建任务:通过
POST /api/tasks/schedule
端点提交任务请求。 - 取消任务:通过
DELETE /api/tasks/cancel/{taskId}
端点取消指定任务。 - 查询任务:通过
GET /api/tasks/{taskId}
端点查询任务详情。 - DTO类:内部静态类
TaskRequest
用于接收和解析请求体。
13. 异常处理
统一处理不同类型的异常,返回一致的 API 响应。
GlobalExceptionHandler.java
package com.example.taskscheduler.exception;import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@ControllerAdvice
public class GlobalExceptionHandler {@ExceptionHandler(IllegalArgumentException.class)public ResponseEntity<String> handleBadRequest(Exception ex) {return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());}@ExceptionHandler(UnsupportedOperationException.class)public ResponseEntity<String> handleUnsupportedOperation(Exception ex) {return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());}@ExceptionHandler(Exception.class)public ResponseEntity<String> handleInternalError(Exception ex) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("An unexpected error occurred.");}
}
说明:
- 业务异常处理:处理如
IllegalArgumentException
和UnsupportedOperationException
,返回400 Bad Request
。 - 通用异常处理:捕获所有其他异常,返回
500 Internal Server Error
,避免泄露敏感信息。
14. 主应用类
TaskSchedulerApplication.java
package com.example.taskscheduler;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 启用定时任务调度
public class TaskSchedulerApplication {public static void main(String[] args) {SpringApplication.run(TaskSchedulerApplication.class, args);}
}
说明:
@EnableScheduling
:启用Spring的定时任务调度功能,允许使用@Scheduled
注解的方法。
测试与验证
以下是如何测试和验证该动态定时任务系统的方法。
1. 启动应用程序
确保您的MySQL数据库已启动,并且application.properties
中的配置正确。然后运行主应用类。
使用Maven命令:
mvn spring-boot:run
或者在IDE(如 IntelliJ IDEA 或 Eclipse)中运行 TaskSchedulerApplication
类。
2. 创建任务
使用Postman或cURL发送HTTP请求来创建新任务。
示例1:发送EMAIL任务
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json{"taskId": "TASK_EMAIL_001","taskType": "EMAIL","parameters": {"recipient": "user@example.com","subject": "Welcome!","body": "Thank you for registering."},"delayMinutes": 1
}
响应:
{"id": 1,"taskId": "TASK_EMAIL_001","taskType": "EMAIL","parameters": "{\"recipient\":\"user@example.com\",\"subject\":\"Welcome!\",\"body\":\"Thank you for registering.\"}","scheduledTime": "2023-10-01T10:01:00","createdAt": "2023-10-01T10:00:00","updatedAt": "2023-10-01T10:00:00","executed": false,"executionStatus": null,"executionResult": null
}
控制台输出(在1分钟后):
Sending email to: user@example.com
Subject: Welcome!
Body: Thank you for registering.
示例2:发送DATA_PROCESS任务
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json{"taskId": "TASK_DATA_001","taskType": "DATA_PROCESS","parameters": {"dataSource": "database","operation": "backup"},"delayMinutes": 2
}
响应:
{"id": 2,"taskId": "TASK_DATA_001","taskType": "DATA_PROCESS","parameters": "{\"dataSource\":\"database\",\"operation\":\"backup\"}","scheduledTime": "2023-10-01T10:02:00","createdAt": "2023-10-01T10:00:00","updatedAt": "2023-10-01T10:00:00","executed": false,"executionStatus": null,"executionResult": null
}
控制台输出(在2分钟后):
Processing data from: database
Operation: backup
3. 任务去重测试
尝试创建具有相同 taskId
的任务,验证系统是否阻止重复创建。
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json{"taskId": "TASK_EMAIL_001","taskType": "EMAIL","parameters": {"recipient": "duplicate@example.com","subject": "Duplicate Task","body": "This should not be scheduled."},"delayMinutes": 3
}
响应:
"Task with taskId TASK_EMAIL_001 already exists."
4. 取消任务
创建一个任务并在其执行前取消。
创建任务请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json{"taskId": "TASK_CANCEL_001","taskType": "EMAIL","parameters": {"recipient": "cancel@example.com","subject": "Cancel Task","body": "This task will be cancelled."},"delayMinutes": 5
}
取消任务请求:
DELETE http://localhost:8080/api/tasks/cancel/TASK_CANCEL_001
响应:
"Task cancelled successfully."
确认取消(查询任务状态):
GET http://localhost:8080/api/tasks/TASK_CANCEL_001
响应:
{"id": 3,"taskId": "TASK_CANCEL_001","taskType": "EMAIL","parameters": "{\"recipient\":\"cancel@example.com\",\"subject\":\"Cancel Task\",\"body\":\"This task will be cancelled.\"}","scheduledTime": "2023-10-01T10:05:00","createdAt": "2023-10-01T10:00:00","updatedAt": "2023-10-01T10:00:00","executed": true,"executionStatus": "CANCELLED","executionResult": "Task was cancelled before execution."
}
说明:
- 任务已取消:在任务执行前取消后,系统不会执行该任务。
5. 任务恢复测试
- 创建多个任务,确保有未执行的任务(如延迟执行)。
- 重启应用程序。
- 确认未执行的任务是否已恢复并按预定时间执行。
说明:
StartupService
在应用启动时会扫描未执行的任务并重新调度,确保系统重启后任务不丢失。
6. 清理任务测试
- 创建并执行多个任务。
- 等待超过保留期限(如7天)。
- 确认已执行的任务是否已被
CleanupService
清理。
说明:
CleanupService
会定期清理已执行且超过保留期限的任务,防止数据库膨胀。
优化与扩展
1. 添加新的任务类型
示例:SMS任务
创建 SmsTaskHandler.java
:
package com.example.taskscheduler.handler;import org.springframework.stereotype.Component;import java.util.Map;@Component
public class SmsTaskHandler implements TaskHandler {@Overridepublic void handle(Map<String, Object> parameters) throws Exception {String phoneNumber = (String) parameters.get("phoneNumber");String message = (String) parameters.get("message");// 实现发送短信的逻辑,例如使用第三方短信服务APISystem.out.println("Sending SMS to: " + phoneNumber);System.out.println("Message: " + message);// 模拟短信发送成功// 在实际应用中,应集成短信发送服务}@Overridepublic String getTaskType() {return "SMS";}
}
测试SMS任务:
请求:
POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json{"taskId": "TASK_SMS_001","taskType": "SMS","parameters": {"phoneNumber": "+1234567890","message": "Your verification code is 123456."},"delayMinutes": 1
}
预期响应:
{"id": 4,"taskId": "TASK_SMS_001","taskType": "SMS","parameters": "{\"phoneNumber\":\"+1234567890\",\"message\":\"Your verification code is 123456.\"}","scheduledTime": "2023-10-01T10:06:00","createdAt": "2023-10-01T10:00:00","updatedAt": "2023-10-01T10:00:00","executed": false,"executionStatus": null,"executionResult": null
}
控制台输出(在1分钟后):
Sending SMS to: +1234567890
Message: Your verification code is 123456.
2. 集成邮件和短信发送服务
实际应用中,应集成真实的邮件和短信发送服务,如JavaMailSender
或第三方短信API。以下为集成 JavaMailSender
的示例。
2.1. 添加依赖
在 pom.xml
中添加Spring Boot Starter Mail依赖:
<!-- Spring Boot Starter Mail -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId>
</dependency>
2.2. 配置邮件属性
在 application.properties
中添加邮件服务器配置:
# 邮件服务器配置(示例使用Gmail)
spring.mail.host=smtp.gmail.com
spring.mail.port=587
spring.mail.username=your_email@gmail.com
spring.mail.password=your_email_password
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
注意:为了使用Gmail发送邮件,可能需要调整Google账户的安全设置,允许低安全性应用访问或使用App Passwords。
2.3. 修改 EmailTaskHandler.java
package com.example.taskscheduler.handler;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import java.util.Map;@Component
public class EmailTaskHandler implements TaskHandler {private final JavaMailSender mailSender;@Autowiredpublic EmailTaskHandler(JavaMailSender mailSender) {this.mailSender = mailSender;}@Overridepublic void handle(Map<String, Object> parameters) throws Exception {String recipient = (String) parameters.get("recipient");String subject = (String) parameters.get("subject");String body = (String) parameters.get("body");// 创建简单邮件消息SimpleMailMessage message = new SimpleMailMessage();message.setTo(recipient);message.setSubject(subject);message.setText(body);// 发送邮件mailSender.send(message);System.out.println("Email sent to: " + recipient);}@Overridepublic String getTaskType() {return "EMAIL";}
}
说明:
- 使用
JavaMailSender
发送邮件。 - 确保
application.properties
中的邮件配置正确。
3. 使用DTO类替代Map(提高类型安全)
虽然使用Map<String, Object>
提供了参数传递的灵活性,但缺乏类型安全和可读性。可以为不同的任务类型定义专用的DTO类。
示例:EmailTaskParameters.java
package com.example.taskscheduler.dto;public class EmailTaskParameters {private String recipient;private String subject;private String body;// Getters and Setterspublic String getRecipient() {return recipient;}public void setRecipient(String recipient) {this.recipient = recipient;}public String getSubject() {return subject;}public void setSubject(String subject) {this.subject = subject;}public String getBody() {return body;}public void setBody(String body) {this.body = body;}
}
修改 EmailTaskHandler.java
以使用DTO
package com.example.taskscheduler.handler;import com.example.taskscheduler.dto.EmailTaskParameters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;import java.util.Map;@Component
public class EmailTaskHandler implements TaskHandler {private final JavaMailSender mailSender;private final ObjectMapper objectMapper;@Autowiredpublic EmailTaskHandler(JavaMailSender mailSender, ObjectMapper objectMapper) {this.mailSender = mailSender;this.objectMapper = objectMapper;}@Overridepublic void handle(Map<String, Object> parameters) throws Exception {// 反序列化参数到DTO类EmailTaskParameters emailParams = objectMapper.convertValue(parameters, EmailTaskParameters.class);// 创建简单邮件消息SimpleMailMessage message = new SimpleMailMessage();message.setTo(emailParams.getRecipient());message.setSubject(emailParams.getSubject());message.setText(emailParams.getBody());// 发送邮件mailSender.send(message);System.out.println("Email sent to: " + emailParams.getRecipient());}@Overridepublic String getTaskType() {return "EMAIL";}
}
优势:
- 类型安全:通过专用的DTO类,确保参数的类型正确,减少运行时错误。
- 可读性:代码更具可读性和可维护性。
扩展:
- 为每种任务类型定义对应的DTO类,修改相应的
TaskHandler
实现以使用这些DTO。
总结
通过以上设计和实现,您可以构建一个通用、灵活且可扩展的动态定时任务系统。该系统具备以下特点:
- 通用性:支持多种类型的任务,通过
TaskHandler
接口实现不同的任务逻辑。 - 灵活的参数传递:使用JSON格式存储任务参数,或通过专用的DTO类提高类型安全。
- 任务去重:通过
taskId
确保任务的唯一性,避免重复调度。 - 任务持久化与恢复:任务信息存储在数据库中,应用重启时自动恢复未执行的任务。
- 可扩展性:新增任务类型仅需创建新的
TaskHandler
实现类,无需修改核心逻辑。 - 任务管理:提供API端点用于创建、取消和查询任务。
- 定期清理:定时清理已执行或取消的任务,保持数据库整洁。
- 异常处理:统一的异常处理机制,确保API的一致性和安全性。
您可以根据具体业务需求进一步扩展和优化该系统,如:
- 任务重试机制:为失败的任务设置自动重试策略。
- 任务优先级:为任务设置优先级,调整调度逻辑以优先处理高优先级任务。
- 分布式调度:如果系统需要在多实例下运行,可考虑集成分布式调度框架(如Quartz集群)。
- 监控与报警:集成监控工具,实时监控任务执行状态,设置报警规则。
希望这个完整的实现能帮助您顺利构建所需的动态定时任务系统。