当前位置: 首页 > news >正文

通用的动态定时任务系统

下面是一个通用的动态定时任务系统的完整实现。这套系统采用策略模式Spring的依赖注入,使得添加新的任务类型变得更加便捷和模块化。以下内容包括项目结构、必要的依赖配置、实体类、仓库接口、任务处理器接口与实现、任务处理器工厂、服务层、控制器层、异常处理、定时任务调度配置以及应用启动与任务恢复等关键组件的完整代码。

目录

  1. 项目结构
  2. 依赖配置 (pom.xml)
  3. 应用配置 (application.properties)
  4. 实体类
    • ScheduledTask.java
  5. 仓库接口
    • ScheduledTaskRepository.java
  6. 任务处理器
    • TaskHandler.java
    • EmailTaskHandler.java
    • DataProcessTaskHandler.java
  7. 任务处理器工厂
    • TaskHandlerFactory.java
  8. 定时任务调度配置
    • SchedulerConfig.java
  9. 服务层
    • ScheduledTaskService.java
    • StartupService.java
    • CleanupService.java
  10. 控制器层
    • ScheduledTaskController.java
  11. 异常处理
    • GlobalExceptionHandler.java
  12. 主应用类
    • TaskSchedulerApplication.java

1. 项目结构

以下是项目的包结构示意:

src
├── main
│   ├── java
│   │   └── com
│   │       └── example
│   │           └── taskscheduler
│   │               ├── TaskSchedulerApplication.java
│   │               ├── config
│   │               │   └── SchedulerConfig.java
│   │               ├── controller
│   │               │   └── ScheduledTaskController.java
│   │               ├── entity
│   │               │   └── ScheduledTask.java
│   │               ├── exception
│   │               │   └── GlobalExceptionHandler.java
│   │               ├── factory
│   │               │   └── TaskHandlerFactory.java
│   │               ├── handler
│   │               │   ├── DataProcessTaskHandler.java
│   │               │   ├── EmailTaskHandler.java
│   │               │   └── TaskHandler.java
│   │               ├── repository
│   │               │   └── ScheduledTaskRepository.java
│   │               └── service
│   │                   ├── CleanupService.java
│   │                   ├── ScheduledTaskService.java
│   │                   └── StartupService.java
│   └── resources
│       └── application.properties
└── test
    └── java
        └── com
            └── example
                └── taskscheduler
                    └── TaskSchedulerApplicationTests.java

2. 依赖配置 (pom.xml)

首先,确保在项目的 pom.xml 文件中添加必要的依赖。以下是一个示例配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"   
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   
         http://maven.apache.org/xsd/maven-4.0.0.xsd">   
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>taskscheduler</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>TaskScheduler</name>
    <description>通用的动态定时任务系统</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.1</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>17</java.version> <!-- 确保使用支持的Java版本 -->
    </properties>

    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Starter Data JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <!-- JSON处理 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- 数据库驱动(以MySQL为例) -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        
        <!-- Lombok(可选,用于简化代码) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>

        <!-- Spring Boot Starter Test(用于测试) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Maven Compiler Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.10.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <annotationProcessorPaths>
                        <!-- Lombok 注解处理器 -->
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <version>1.18.26</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

说明

  • Spring Boot Starter Web:用于构建Web应用,包括REST API。
  • Spring Boot Starter Data JPA:用于与数据库交互。
  • Jackson Databind:用于JSON处理。
  • MySQL Connector/J:如果使用MySQL数据库,需添加对应驱动。
  • Lombok:用于简化Java代码(如生成getter/setter),可选但推荐。
  • Spring Boot Starter Test:用于测试。

3. 应用配置 (application.properties)

src/main/resources/ 目录下创建 application.properties 文件,配置数据库连接和JPA设置。

# 数据库配置(以MySQL为例)
spring.datasource.url=jdbc:mysql://localhost:3306/taskscheduler?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=yourpassword

# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.format_sql=true

# 定时任务初始延迟和间隔(可选,根据需要调整)
# 我们将在服务层自定义这些,通常不需要在此配置

# Logging配置(可选)
logging.level.org.springframework=INFO
logging.level.com.example.taskscheduler=DEBUG

注意

  • 数据库URL:确保替换为您实际的数据库URL、用户名和密码。
  • spring.jpa.hibernate.ddl-auto=update:让JPA自动创建或更新数据库表结构。对于生产环境,建议使用更严格的设置。
  • 日志级别:根据需要调整,以获取更多或更少的日志信息。

4. 实体类

ScheduledTask.java

package com.example.taskscheduler.entity;

import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Table(name = "scheduled_tasks", uniqueConstraints = @UniqueConstraint(columnNames = "taskId"))
@Data
@NoArgsConstructor
public class ScheduledTask {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String taskId; // 业务上的唯一标识

    @Column(nullable = false)
    private String taskType; // 任务类型

    @Column(columnDefinition = "TEXT")
    private String parameters; // JSON格式的参数

    @Column(nullable = false)
    private LocalDateTime scheduledTime; // 任务执行时间

    @Column(nullable = false, updatable = false)
    private LocalDateTime createdAt = LocalDateTime.now();

    private LocalDateTime updatedAt = LocalDateTime.now();

    private boolean executed = false;

    private String executionStatus;

    @Column(columnDefinition = "TEXT")
    private String executionResult;

    // 构造函数
    public ScheduledTask(String taskId, String taskType, String parameters, LocalDateTime scheduledTime) {
        this.taskId = taskId;
        this.taskType = taskType;
        this.parameters = parameters;
        this.scheduledTime = scheduledTime;
    }

    @PreUpdate
    public void preUpdate() {
        this.updatedAt = LocalDateTime.now();
    }
}

说明

  • Lombok注解@Data 自动生成getter、setter、toString等方法;@NoArgsConstructor 生成无参构造函数。
  • 字段解释
    • taskId:业务上的唯一标识,用于任务去重。
    • taskType:任务类型标识,如 EMAILDATA_PROCESS
    • parameters:任务执行所需的参数,以JSON字符串形式存储。
    • scheduledTime:任务的执行时间。
    • executed:任务是否已执行。
    • executionStatusexecutionResult:记录任务执行状态和结果。

5. 仓库接口

ScheduledTaskRepository.java

package com.example.taskscheduler.repository;

import com.example.taskscheduler.entity.ScheduledTask;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.Optional;

@Repository
public interface ScheduledTaskRepository extends JpaRepository<ScheduledTask, Long> {
    Optional<ScheduledTask> findByTaskId(String taskId);
    boolean existsByTaskId(String taskId);
}

说明

  • 提供根据 taskId 查询和验证任务是否存在的方法,以实现任务去重。

6. 任务处理器

采用策略模式,为每种任务类型创建对应的处理器,实现灵活和可扩展的任务执行逻辑。

6.1 TaskHandler.java

package com.example.taskscheduler.handler;

import java.util.Map;

public interface TaskHandler {
    /**
     * 执行任务的逻辑。
     *
     * @param parameters 任务参数,以键值对形式传递
     * @throws Exception 执行过程中可能抛出的异常
     */
    void handle(Map<String, Object> parameters) throws Exception;

    /**
     * 返回该处理器支持的任务类型。
     *
     * @return 任务类型的字符串标识
     */
    String getTaskType();
}

6.2 EmailTaskHandler.java

package com.example.taskscheduler.handler;

import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class EmailTaskHandler implements TaskHandler {

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        String recipient = (String) parameters.get("recipient");
        String subject = (String) parameters.get("subject");
        String body = (String) parameters.get("body");

        // 实现发送邮件的逻辑,例如使用JavaMailSender
        System.out.println("Sending email to: " + recipient);
        System.out.println("Subject: " + subject);
        System.out.println("Body: " + body);

        // 模拟邮件发送成功
        // 在实际应用中,应集成邮件发送服务
    }

    @Override
    public String getTaskType() {
        return "EMAIL";
    }
}

6.3 DataProcessTaskHandler.java

package com.example.taskscheduler.handler;

import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class DataProcessTaskHandler implements TaskHandler {

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        String dataSource = (String) parameters.get("dataSource");
        String operation = (String) parameters.get("operation");

        // 实现数据处理的逻辑
        System.out.println("Processing data from: " + dataSource);
        System.out.println("Operation: " + operation);

        // 模拟数据处理成功
        // 在实际应用中,应执行具体的数据处理任务
    }

    @Override
    public String getTaskType() {
        return "DATA_PROCESS";
    }
}

扩展性

  • 添加新的任务类型时,只需创建新的 TaskHandler 实现类,并通过 @Component 注解注册,即可自动被工厂识别和调用,无需修改现有代码。

7. 任务处理器工厂

负责根据任务类型查找并返回对应的 TaskHandler 实例。

TaskHandlerFactory.java

package com.example.taskscheduler.factory;

import com.example.taskscheduler.handler.TaskHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class TaskHandlerFactory {

    private final List<TaskHandler> handlers;
    private final Map<String, TaskHandler> handlerMap = new HashMap<>();

    @Autowired
    public TaskHandlerFactory(List<TaskHandler> handlers) {
        this.handlers = handlers;
    }

    @PostConstruct
    public void init() {
        for (TaskHandler handler : handlers) {
            handlerMap.put(handler.getTaskType(), handler);
        }
    }

    /**
     * 根据任务类型获取对应的处理器。
     *
     * @param taskType 任务类型的字符串标识
     * @return 对应的 TaskHandler 实例
     * @throws IllegalArgumentException 如果没有找到对应的处理器
     */
    public TaskHandler getHandler(String taskType) {
        TaskHandler handler = handlerMap.get(taskType);
        if (handler == null) {
            throw new IllegalArgumentException("No handler found for task type: " + taskType);
        }
        return handler;
    }
}

说明

  • 自动注入所有实现了 TaskHandler 接口的Bean,并在 @PostConstruct 阶段初始化 handlerMap,映射任务类型到处理器。
  • getHandler 方法根据任务类型获取对应的处理器实例。

8. 定时任务调度配置

配置 ThreadPoolTaskScheduler Bean,用于动态调度任务。

SchedulerConfig.java

package com.example.taskscheduler.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class SchedulerConfig {

    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10); // 根据需求调整
        scheduler.setThreadNamePrefix("DynamicTaskScheduler-");
        scheduler.setAwaitTerminationSeconds(60);
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.initialize();
        return scheduler;
    }
}

说明

  • 线程池大小:根据预期的并发任务量调整 poolSize
  • 线程名称前缀:便于在日志中追踪任务执行。
  • 关闭时等待任务完成:确保在应用关闭时,正在执行的任务能够完成。

9. 服务层

负责核心的任务管理逻辑,包括任务创建、调度、执行和取消。

9.1 ScheduledTaskService.java

package com.example.taskscheduler.service;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.factory.TaskHandlerFactory;
import com.example.taskscheduler.handler.TaskHandler;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

@Service
public class ScheduledTaskService {

    private final ScheduledTaskRepository scheduledTaskRepository;
    private final ThreadPoolTaskScheduler taskScheduler;
    private final ObjectMapper objectMapper;
    private final TaskHandlerFactory taskHandlerFactory;

    // 用于管理 ScheduledFuture 对象,以支持任务取消等操作
    private final Map<Long, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();

    @Autowired
    public ScheduledTaskService(ScheduledTaskRepository scheduledTaskRepository,
                                ThreadPoolTaskScheduler taskScheduler,
                                ObjectMapper objectMapper,
                                TaskHandlerFactory taskHandlerFactory) {
        this.scheduledTaskRepository = scheduledTaskRepository;
        this.taskScheduler = taskScheduler;
        this.objectMapper = objectMapper;
        this.taskHandlerFactory = taskHandlerFactory;
    }

    /**
     * 创建并调度一个新任务。
     *
     * @param taskId        业务上的唯一任务标识
     * @param taskType      任务类型
     * @param parameters    任务参数(Map<String, Object>)
     * @param delayMinutes 延迟执行的分钟数
     * @return 创建的 ScheduledTask 对象
     */
    public ScheduledTask createAndScheduleTask(String taskId, String taskType, Map<String, Object> parameters, long delayMinutes) {
        // 任务去重:检查是否已存在相同 taskId 的未执行任务
        if (scheduledTaskRepository.existsByTaskId(taskId)) {
            throw new IllegalArgumentException("Task with taskId " + taskId + " already exists.");
        }

        // 计算执行时间
        LocalDateTime scheduledTime = LocalDateTime.now().plusMinutes(delayMinutes);

        // 序列化参数为 JSON 字符串
        String parametersJson;
        try {
            parametersJson = objectMapper.writeValueAsString(parameters);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize task parameters.", e);
        }

        // 创建 ScheduledTask 实体
        ScheduledTask scheduledTask = new ScheduledTask(taskId, taskType, parametersJson, scheduledTime);
        scheduledTaskRepository.save(scheduledTask);

        // 创建并调度任务
        Runnable taskRunnable = () -> executeTask(scheduledTask.getId());

        ScheduledFuture<?> future = taskScheduler.schedule(taskRunnable, scheduledTime.atZone(java.time.ZoneId.systemDefault()).toInstant());

        // 保存 ScheduledFuture 对象
        scheduledFutures.put(scheduledTask.getId(), future);

        return scheduledTask;
    }

    /**
     * 执行任务逻辑,根据任务类型调用不同的处理方法。
     *
     * @param scheduledTaskId 数据库中的 ScheduledTask 的 ID
     */
    public void executeTask(Long scheduledTaskId) {
        Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findById(scheduledTaskId);
        if (!optionalTask.isPresent()) {
            // 任务记录不存在,可能已经被删除
            return;
        }

        ScheduledTask task = optionalTask.get();

        if (task.isExecuted()) {
            // 任务已执行,跳过
            return;
        }

        try {
            // 反序列化参数
            Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);

            // 获取任务类型对应的处理器
            TaskHandler handler = taskHandlerFactory.getHandler(task.getTaskType());

            // 执行任务
            handler.handle(params);

            // 更新任务状态
            task.setExecuted(true);
            task.setExecutionStatus("SUCCESS");
            task.setExecutionResult("Task executed successfully.");
            scheduledTaskRepository.save(task);
        } catch (Exception e) {
            // 更新任务状态为失败
            task.setExecuted(true);
            task.setExecutionStatus("FAILED");
            task.setExecutionResult(e.getMessage());
            scheduledTaskRepository.save(task);

            // 记录日志或采取其他措施
            System.err.println("Failed to execute taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
        }
    }

    /**
     * 取消指定任务。
     *
     * @param taskId 业务上的唯一任务标识
     * @return true 如果任务成功取消,false 否则
     */
    public boolean cancelTask(String taskId) {
        Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findByTaskId(taskId);
        if (!optionalTask.isPresent()) {
            return false; // 任务不存在
        }

        ScheduledTask task = optionalTask.get();

        if (task.isExecuted()) {
            return false; // 任务已执行,无法取消
        }

        // 获取 ScheduledFuture 对象
        ScheduledFuture<?> future = scheduledFutures.get(task.getId());
        if (future != null && !future.isDone()) {
            boolean cancelled = future.cancel(false);
            if (cancelled) {
                // 更新任务状态
                task.setExecuted(true);
                task.setExecutionStatus("CANCELLED");
                task.setExecutionResult("Task was cancelled before execution.");
                scheduledTaskRepository.save(task);

                // 从 map 中移除
                scheduledFutures.remove(task.getId());

                return true;
            }
        }

        return false;
    }

    /**
     * 查找任务详情。
     *
     * @param taskId 业务上的唯一任务标识
     * @return Optional<ScheduledTask>
     */
    public Optional<ScheduledTask> findByTaskId(String taskId) {
        return scheduledTaskRepository.findByTaskId(taskId);
    }
}

9.2 StartupService.java

负责在应用启动时恢复未执行的任务,确保系统重启后任务不丢失。

package com.example.taskscheduler.service;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Service
public class StartupService {

    private final ScheduledTaskRepository scheduledTaskRepository;
    private final ScheduledTaskService scheduledTaskService;
    private final ObjectMapper objectMapper;

    @Autowired
    public StartupService(ScheduledTaskRepository scheduledTaskRepository,
                          ScheduledTaskService scheduledTaskService,
                          ObjectMapper objectMapper) {
        this.scheduledTaskRepository = scheduledTaskRepository;
        this.scheduledTaskService = scheduledTaskService;
        this.objectMapper = objectMapper;
    }

    @PostConstruct
    public void init() {
        List<ScheduledTask> pendingTasks = scheduledTaskRepository.findAll()
                .stream()
                .filter(task -> !task.isExecuted())
                .collect(Collectors.toList());

        for (ScheduledTask task : pendingTasks) {
            // 计算剩余延迟时间
            Duration delay = Duration.between(LocalDateTime.now(), task.getScheduledTime());
            long delayInMinutes = delay.toMinutes();

            if (delayInMinutes > 0) {
                // 重新调度任务
                try {
                    Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);
                    scheduledTaskService.createAndScheduleTask(task.getTaskId(),
                            task.getTaskType(),
                            params,
                            delayInMinutes);
                } catch (Exception e) {
                    System.err.println("Failed to reschedule taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
                }
            } else {
                // 执行延迟已过的任务
                scheduledTaskService.executeTask(task.getId());
            }
        }

        System.out.println("Recovered and rescheduled " + pendingTasks.size() + " tasks.");
    }
}

9.3 CleanupService.java

负责定期清理已执行或取消的任务,防止数据库膨胀。

package com.example.taskscheduler.service;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class CleanupService {

    private final ScheduledTaskRepository scheduledTaskRepository;
    private final ObjectMapper objectMapper;

    @Autowired
    public CleanupService(ScheduledTaskRepository scheduledTaskRepository,
                          ObjectMapper objectMapper) {
        this.scheduledTaskRepository = scheduledTaskRepository;
        this.objectMapper = objectMapper;
    }

    /**
     * 定期清理已执行且超过保留期限的任务。
     * 例如,保留已执行任务7天后删除。
     */
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void cleanUpExecutedTasks() {
        LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
        List<ScheduledTask> tasksToDelete = scheduledTaskRepository.findAll()
                .stream()
                .filter(task -> task.isExecuted() &&
                        (task.getScheduledTime().isBefore(cutoff) ||
                         "CANCELLED".equals(task.getExecutionStatus()) ||
                         "FAILED".equals(task.getExecutionStatus())))
                .collect(Collectors.toList());

        scheduledTaskRepository.deleteAll(tasksToDelete);
        System.out.println("Cleaned up " + tasksToDelete.size() + " tasks.");
    }
}

说明

  • @Scheduled(cron = "0 0 2 * * ?"):设置任务每天凌晨2点执行。
  • 清理条件:
    • 任务已执行且执行时间早于7天前。
    • 任务状态为 CANCELLEDFAILED

10. 控制器层

提供REST API端点,用于创建、取消和查询任务。

ScheduledTaskController.java

package com.example.taskscheduler.controller;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.service.ScheduledTaskService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Optional;

@RestController
@RequestMapping("/api/tasks")
public class ScheduledTaskController {

    private final ScheduledTaskService scheduledTaskService;
    private final ObjectMapper objectMapper;

    @Autowired
    public ScheduledTaskController(ScheduledTaskService scheduledTaskService,
                                   ObjectMapper objectMapper) {
        this.scheduledTaskService = scheduledTaskService;
        this.objectMapper = objectMapper;
    }

    /**
     * 创建并调度新任务。
     *
     * @param taskRequest 包含 taskId, taskType, parameters, delayMinutes 的请求体
     * @return 创建的任务信息
     */
    @PostMapping("/schedule")
    public ResponseEntity<?> scheduleTask(@RequestBody TaskRequest taskRequest) {
        try {
            ScheduledTask task = scheduledTaskService.createAndScheduleTask(
                    taskRequest.getTaskId(),
                    taskRequest.getTaskType(),
                    taskRequest.getParameters(),
                    taskRequest.getDelayMinutes()
            );
            return ResponseEntity.ok(task);
        } catch (IllegalArgumentException ex) {
            return ResponseEntity.badRequest().body(ex.getMessage());
        } catch (Exception ex) {
            return ResponseEntity.status(500).body("Failed to schedule task.");
        }
    }

    /**
     * 取消指定任务。
     *
     * @param taskId 业务上的唯一任务标识
     * @return 取消结果
     */
    @DeleteMapping("/cancel/{taskId}")
    public ResponseEntity<?> cancelTask(@PathVariable String taskId) {
        boolean cancelled = scheduledTaskService.cancelTask(taskId);
        if (cancelled) {
            return ResponseEntity.ok("Task cancelled successfully.");
        } else {
            return ResponseEntity.badRequest().body("Failed to cancel task. It may have been executed or does not exist.");
        }
    }

    /**
     * 查询任务详情。
     *
     * @param taskId 业务上的唯一任务标识
     * @return 任务详情
     */
    @GetMapping("/{taskId}")
    public ResponseEntity<?> getTaskStatus(@PathVariable String taskId) {
        Optional<ScheduledTask> optionalTask = scheduledTaskService.findByTaskId(taskId);
        if (optionalTask.isPresent()) {
            return ResponseEntity.ok(optionalTask.get());
        } else {
            return ResponseEntity.status(404).body("Task not found.");
        }
    }

    // DTO 类用于接收请求体
    public static class TaskRequest {
        private String taskId;
        private String taskType;
        private Map<String, Object> parameters;
        private long delayMinutes;

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        public String getTaskType() {
            return taskType;
        }

        public void setTaskType(String taskType) {
            this.taskType = taskType;
        }

        public Map<String, Object> getParameters() {
            return parameters;
        }

        public void setParameters(Map<String, Object> parameters) {
            this.parameters = parameters;
        }

        public long getDelayMinutes() {
            return delayMinutes;
        }

        public void setDelayMinutes(long delayMinutes) {
            this.delayMinutes = delayMinutes;
        }
    }
}

说明

  • 创建任务:通过 POST /api/tasks/schedule 端点接收任务请求,包含 taskIdtaskTypeparametersdelayMinutes
  • 取消任务:通过 DELETE /api/tasks/cancel/{taskId} 端点取消特定任务。
  • 查询任务:通过 GET /api/tasks/{taskId} 端点获取任务详情。

11. 异常处理

统一的全局异常处理,确保API响应的一致性和信息的安全性。

GlobalExceptionHandler.java

package com.example.taskscheduler.exception;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@ControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler(IllegalArgumentException.class)
    public ResponseEntity<String> handleBadRequest(Exception ex) {
        return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
    }

    @ExceptionHandler(UnsupportedOperationException.class)
    public ResponseEntity<String> handleUnsupportedOperation(Exception ex) {
        return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
    }

    @ExceptionHandler(Exception.class)
    public ResponseEntity<String> handleInternalError(Exception ex) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("An unexpected error occurred.");
    }
}

说明

  • @ControllerAdvice:允许定义全局的异常处理规则。
  • 特定异常处理:为常见的业务异常(如 IllegalArgumentExceptionUnsupportedOperationException)提供定制化的响应。
  • 通用异常处理:捕获所有未处理的异常,避免泄露敏感信息。

12. 主应用类

定义Spring Boot的主应用入口。

TaskSchedulerApplication.java

package com.example.taskscheduler;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling // 启用定时任务调度
public class TaskSchedulerApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskSchedulerApplication.class, args);
    }
}

说明

  • @SpringBootApplication:标识这是一个Spring Boot应用。
  • @EnableScheduling:启用Spring的定时任务调度功能,允许使用 @Scheduled 注解的方法。

完整代码汇总

为了便于理解,以下是各个关键类的完整代码。

12.1. TaskHandler.java

package com.example.taskscheduler.handler;

import java.util.Map;

public interface TaskHandler {
    void handle(Map<String, Object> parameters) throws Exception;
    String getTaskType();
}

12.2. EmailTaskHandler.java

package com.example.taskscheduler.handler;

import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class EmailTaskHandler implements TaskHandler {

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        String recipient = (String) parameters.get("recipient");
        String subject = (String) parameters.get("subject");
        String body = (String) parameters.get("body");

        // 实现发送邮件的逻辑,例如使用JavaMailSender
        System.out.println("Sending email to: " + recipient);
        System.out.println("Subject: " + subject);
        System.out.println("Body: " + body);

        // 模拟邮件发送成功
        // 在实际应用中,应集成邮件发送服务
    }

    @Override
    public String getTaskType() {
        return "EMAIL";
    }
}

12.3. DataProcessTaskHandler.java

package com.example.taskscheduler.handler;

import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class DataProcessTaskHandler implements TaskHandler {

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        String dataSource = (String) parameters.get("dataSource");
        String operation = (String) parameters.get("operation");

        // 实现数据处理的逻辑
        System.out.println("Processing data from: " + dataSource);
        System.out.println("Operation: " + operation);

        // 模拟数据处理成功
        // 在实际应用中,应执行具体的数据处理任务
    }

    @Override
    public String getTaskType() {
        return "DATA_PROCESS";
    }
}

12.4. TaskHandlerFactory.java

package com.example.taskscheduler.factory;

import com.example.taskscheduler.handler.TaskHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class TaskHandlerFactory {

    private final List<TaskHandler> handlers;
    private final Map<String, TaskHandler> handlerMap = new HashMap<>();

    @Autowired
    public TaskHandlerFactory(List<TaskHandler> handlers) {
        this.handlers = handlers;
    }

    @PostConstruct
    public void init() {
        for (TaskHandler handler : handlers) {
            handlerMap.put(handler.getTaskType(), handler);
        }
    }

    public TaskHandler getHandler(String taskType) {
        TaskHandler handler = handlerMap.get(taskType);
        if (handler == null) {
            throw new IllegalArgumentException("No handler found for task type: " + taskType);
        }
        return handler;
    }
}

12.5. SchedulerConfig.java

package com.example.taskscheduler.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class SchedulerConfig {

    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10); // 根据需求调整
        scheduler.setThreadNamePrefix("DynamicTaskScheduler-");
        scheduler.setAwaitTerminationSeconds(60);
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.initialize();
        return scheduler;
    }
}

12.6. ScheduledTask.java

package com.example.taskscheduler.entity;

import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Table(name = "scheduled_tasks", uniqueConstraints = @UniqueConstraint(columnNames = "taskId"))
@Data
@NoArgsConstructor
public class ScheduledTask {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String taskId; // 业务上的唯一标识

    @Column(nullable = false)
    private String taskType; // 任务类型

    @Column(columnDefinition = "TEXT")
    private String parameters; // JSON格式的参数

    @Column(nullable = false)
    private LocalDateTime scheduledTime; // 任务执行时间

    @Column(nullable = false, updatable = false)
    private LocalDateTime createdAt = LocalDateTime.now();

    private LocalDateTime updatedAt = LocalDateTime.now();

    private boolean executed = false;

    private String executionStatus;

    @Column(columnDefinition = "TEXT")
    private String executionResult;

    // 构造函数
    public ScheduledTask(String taskId, String taskType, String parameters, LocalDateTime scheduledTime) {
        this.taskId = taskId;
        this.taskType = taskType;
        this.parameters = parameters;
        this.scheduledTime = scheduledTime;
    }

    @PreUpdate
    public void preUpdate() {
        this.updatedAt = LocalDateTime.now();
    }
}

12.7. ScheduledTaskRepository.java

package com.example.taskscheduler.repository;

import com.example.taskscheduler.entity.ScheduledTask;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.Optional;

@Repository
public interface ScheduledTaskRepository extends JpaRepository<ScheduledTask, Long> {
    Optional<ScheduledTask> findByTaskId(String taskId);
    boolean existsByTaskId(String taskId);
}

12.8. ScheduledTaskService.java

package com.example.taskscheduler.service;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.factory.TaskHandlerFactory;
import com.example.taskscheduler.handler.TaskHandler;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

@Service
public class ScheduledTaskService {

    private final ScheduledTaskRepository scheduledTaskRepository;
    private final ThreadPoolTaskScheduler taskScheduler;
    private final ObjectMapper objectMapper;
    private final TaskHandlerFactory taskHandlerFactory;

    // 用于管理 ScheduledFuture 对象,以支持任务取消等操作
    private final Map<Long, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();

    @Autowired
    public ScheduledTaskService(ScheduledTaskRepository scheduledTaskRepository,
                                ThreadPoolTaskScheduler taskScheduler,
                                ObjectMapper objectMapper,
                                TaskHandlerFactory taskHandlerFactory) {
        this.scheduledTaskRepository = scheduledTaskRepository;
        this.taskScheduler = taskScheduler;
        this.objectMapper = objectMapper;
        this.taskHandlerFactory = taskHandlerFactory;
    }

    /**
     * 创建并调度一个新任务。
     *
     * @param taskId        业务上的唯一任务标识
     * @param taskType      任务类型
     * @param parameters    任务参数(Map<String, Object>)
     * @param delayMinutes 延迟执行的分钟数
     * @return 创建的 ScheduledTask 对象
     */
    public ScheduledTask createAndScheduleTask(String taskId, String taskType, Map<String, Object> parameters, long delayMinutes) {
        // 任务去重:检查是否已存在相同 taskId 的未执行任务
        if (scheduledTaskRepository.existsByTaskId(taskId)) {
            throw new IllegalArgumentException("Task with taskId " + taskId + " already exists.");
        }

        // 计算执行时间
        LocalDateTime scheduledTime = LocalDateTime.now().plusMinutes(delayMinutes);

        // 序列化参数为 JSON 字符串
        String parametersJson;
        try {
            parametersJson = objectMapper.writeValueAsString(parameters);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize task parameters.", e);
        }

        // 创建 ScheduledTask 实体
        ScheduledTask scheduledTask = new ScheduledTask(taskId, taskType, parametersJson, scheduledTime);
        scheduledTaskRepository.save(scheduledTask);

        // 创建并调度任务
        Runnable taskRunnable = () -> executeTask(scheduledTask.getId());

        ScheduledFuture<?> future = taskScheduler.schedule(taskRunnable, scheduledTime.atZone(java.time.ZoneId.systemDefault()).toInstant());

        // 保存 ScheduledFuture 对象
        scheduledFutures.put(scheduledTask.getId(), future);

        return scheduledTask;
    }

    /**
     * 执行任务逻辑,根据任务类型调用不同的处理方法。
     *
     * @param scheduledTaskId 数据库中的 ScheduledTask 的 ID
     */
    public void executeTask(Long scheduledTaskId) {
        Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findById(scheduledTaskId);
        if (!optionalTask.isPresent()) {
            // 任务记录不存在,可能已经被删除
            return;
        }

        ScheduledTask task = optionalTask.get();

        if (task.isExecuted()) {
            // 任务已执行,跳过
            return;
        }

        try {
            // 反序列化参数
            Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);

            // 获取任务类型对应的处理器
            TaskHandler handler = taskHandlerFactory.getHandler(task.getTaskType());

            // 执行任务
            handler.handle(params);

            // 更新任务状态
            task.setExecuted(true);
            task.setExecutionStatus("SUCCESS");
            task.setExecutionResult("Task executed successfully.");
            scheduledTaskRepository.save(task);
        } catch (Exception e) {
            // 更新任务状态为失败
            task.setExecuted(true);
            task.setExecutionStatus("FAILED");
            task.setExecutionResult(e.getMessage());
            scheduledTaskRepository.save(task);

            // 记录日志或采取其他措施
            System.err.println("Failed to execute taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
        }
    }

    /**
     * 取消指定任务。
     *
     * @param taskId 业务上的唯一任务标识
     * @return true 如果任务成功取消,false 否则
     */
    public boolean cancelTask(String taskId) {
        Optional<ScheduledTask> optionalTask = scheduledTaskRepository.findByTaskId(taskId);
        if (!optionalTask.isPresent()) {
            return false; // 任务不存在
        }

        ScheduledTask task = optionalTask.get();

        if (task.isExecuted()) {
            return false; // 任务已执行,无法取消
        }

        // 获取 ScheduledFuture 对象
        ScheduledFuture<?> future = scheduledFutures.get(task.getId());
        if (future != null && !future.isDone()) {
            boolean cancelled = future.cancel(false);
            if (cancelled) {
                // 更新任务状态
                task.setExecuted(true);
                task.setExecutionStatus("CANCELLED");
                task.setExecutionResult("Task was cancelled before execution.");
                scheduledTaskRepository.save(task);

                // 从 map 中移除
                scheduledFutures.remove(task.getId());

                return true;
            }
        }

        return false;
    }

    /**
     * 查找任务详情。
     *
     * @param taskId 业务上的唯一任务标识
     * @return Optional<ScheduledTask>
     */
    public Optional<ScheduledTask> findByTaskId(String taskId) {
        return scheduledTaskRepository.findByTaskId(taskId);
    }
}

9.4. StartupService.java

package com.example.taskscheduler.service;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Service
public class StartupService {

    private final ScheduledTaskRepository scheduledTaskRepository;
    private final ScheduledTaskService scheduledTaskService;
    private final ObjectMapper objectMapper;

    @Autowired
    public StartupService(ScheduledTaskRepository scheduledTaskRepository,
                          ScheduledTaskService scheduledTaskService,
                          ObjectMapper objectMapper) {
        this.scheduledTaskRepository = scheduledTaskRepository;
        this.scheduledTaskService = scheduledTaskService;
        this.objectMapper = objectMapper;
    }

    @PostConstruct
    public void init() {
        List<ScheduledTask> pendingTasks = scheduledTaskRepository.findAll()
                .stream()
                .filter(task -> !task.isExecuted())
                .collect(Collectors.toList());

        for (ScheduledTask task : pendingTasks) {
            // 计算剩余延迟时间
            Duration delay = Duration.between(LocalDateTime.now(), task.getScheduledTime());
            long delayInMinutes = delay.toMinutes();

            if (delayInMinutes > 0) {
                // 重新调度任务
                try {
                    Map<String, Object> params = objectMapper.readValue(task.getParameters(), Map.class);
                    scheduledTaskService.createAndScheduleTask(task.getTaskId(),
                            task.getTaskType(),
                            params,
                            delayInMinutes);
                } catch (Exception e) {
                    System.err.println("Failed to reschedule taskId: " + task.getTaskId() + ", Error: " + e.getMessage());
                }
            } else {
                // 执行延迟已过的任务
                scheduledTaskService.executeTask(task.getId());
            }
        }

        System.out.println("Recovered and rescheduled " + pendingTasks.size() + " tasks.");
    }
}

9.5. CleanupService.java

package com.example.taskscheduler.service;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.repository.ScheduledTaskRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class CleanupService {

    private final ScheduledTaskRepository scheduledTaskRepository;

    @Autowired
    public CleanupService(ScheduledTaskRepository scheduledTaskRepository) {
        this.scheduledTaskRepository = scheduledTaskRepository;
    }

    /**
     * 定期清理已执行且超过保留期限的任务。
     * 例如,保留已执行任务7天后删除。
     */
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void cleanUpExecutedTasks() {
        LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
        List<ScheduledTask> tasksToDelete = scheduledTaskRepository.findAll()
                .stream()
                .filter(task -> task.isExecuted() &&
                        (task.getScheduledTime().isBefore(cutoff) ||
                         "CANCELLED".equals(task.getExecutionStatus()) ||
                         "FAILED".equals(task.getExecutionStatus())))
                .collect(Collectors.toList());

        scheduledTaskRepository.deleteAll(tasksToDelete);
        System.out.println("Cleaned up " + tasksToDelete.size() + " tasks.");
    }
}

说明

  • @Scheduled 注解用于定时执行清理任务。
  • 清理条件:
    • 任务已执行且执行时间早于7天前。
    • 任务状态为 CANCELLEDFAILED

10. 控制器层

负责接收HTTP请求,管理任务的创建、取消和查询。

ScheduledTaskController.java

package com.example.taskscheduler.controller;

import com.example.taskscheduler.entity.ScheduledTask;
import com.example.taskscheduler.service.ScheduledTaskService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Optional;

@RestController
@RequestMapping("/api/tasks")
public class ScheduledTaskController {

    private final ScheduledTaskService scheduledTaskService;
    private final ObjectMapper objectMapper;

    @Autowired
    public ScheduledTaskController(ScheduledTaskService scheduledTaskService,
                                   ObjectMapper objectMapper) {
        this.scheduledTaskService = scheduledTaskService;
        this.objectMapper = objectMapper;
    }

    /**
     * 创建并调度新任务。
     *
     * @param taskRequest 包含 taskId, taskType, parameters, delayMinutes 的请求体
     * @return 创建的任务信息
     */
    @PostMapping("/schedule")
    public ResponseEntity<?> scheduleTask(@RequestBody TaskRequest taskRequest) {
        try {
            ScheduledTask task = scheduledTaskService.createAndScheduleTask(
                    taskRequest.getTaskId(),
                    taskRequest.getTaskType(),
                    taskRequest.getParameters(),
                    taskRequest.getDelayMinutes()
            );
            return ResponseEntity.ok(task);
        } catch (IllegalArgumentException ex) {
            return ResponseEntity.badRequest().body(ex.getMessage());
        } catch (Exception ex) {
            return ResponseEntity.status(500).body("Failed to schedule task.");
        }
    }

    /**
     * 取消指定任务。
     *
     * @param taskId 业务上的唯一任务标识
     * @return 取消结果
     */
    @DeleteMapping("/cancel/{taskId}")
    public ResponseEntity<?> cancelTask(@PathVariable String taskId) {
        boolean cancelled = scheduledTaskService.cancelTask(taskId);
        if (cancelled) {
            return ResponseEntity.ok("Task cancelled successfully.");
        } else {
            return ResponseEntity.badRequest().body("Failed to cancel task. It may have been executed or does not exist.");
        }
    }

    /**
     * 查询任务详情。
     *
     * @param taskId 业务上的唯一任务标识
     * @return 任务详情
     */
    @GetMapping("/{taskId}")
    public ResponseEntity<?> getTaskStatus(@PathVariable String taskId) {
        Optional<ScheduledTask> optionalTask = scheduledTaskService.findByTaskId(taskId);
        if (optionalTask.isPresent()) {
            return ResponseEntity.ok(optionalTask.get());
        } else {
            return ResponseEntity.status(404).body("Task not found.");
        }
    }

    // DTO 类用于接收请求体
    public static class TaskRequest {
        private String taskId;
        private String taskType;
        private Map<String, Object> parameters;
        private long delayMinutes;

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        public String getTaskType() {
            return taskType;
        }

        public void setTaskType(String taskType) {
            this.taskType = taskType;
        }

        public Map<String, Object> getParameters() {
            return parameters;
        }

        public void setParameters(Map<String, Object> parameters) {
            this.parameters = parameters;
        }

        public long getDelayMinutes() {
            return delayMinutes;
        }

        public void setDelayMinutes(long delayMinutes) {
            this.delayMinutes = delayMinutes;
        }
    }
}

说明

  • 创建任务:通过 POST /api/tasks/schedule 端点提交任务请求。
  • 取消任务:通过 DELETE /api/tasks/cancel/{taskId} 端点取消指定任务。
  • 查询任务:通过 GET /api/tasks/{taskId} 端点查询任务详情。
  • DTO类:内部静态类 TaskRequest 用于接收和解析请求体。

13. 异常处理

统一处理不同类型的异常,返回一致的 API 响应。

GlobalExceptionHandler.java

package com.example.taskscheduler.exception;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@ControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler(IllegalArgumentException.class)
    public ResponseEntity<String> handleBadRequest(Exception ex) {
        return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
    }

    @ExceptionHandler(UnsupportedOperationException.class)
    public ResponseEntity<String> handleUnsupportedOperation(Exception ex) {
        return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
    }

    @ExceptionHandler(Exception.class)
    public ResponseEntity<String> handleInternalError(Exception ex) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("An unexpected error occurred.");
    }
}

说明

  • 业务异常处理:处理如 IllegalArgumentExceptionUnsupportedOperationException,返回 400 Bad Request
  • 通用异常处理:捕获所有其他异常,返回 500 Internal Server Error,避免泄露敏感信息。

14. 主应用类

TaskSchedulerApplication.java

package com.example.taskscheduler;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling // 启用定时任务调度
public class TaskSchedulerApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskSchedulerApplication.class, args);
    }
}

说明

  • @EnableScheduling:启用Spring的定时任务调度功能,允许使用 @Scheduled 注解的方法。

测试与验证

以下是如何测试和验证该动态定时任务系统的方法。

1. 启动应用程序

确保您的MySQL数据库已启动,并且application.properties中的配置正确。然后运行主应用类。

使用Maven命令:

mvn spring-boot:run

或者在IDE(如 IntelliJ IDEA 或 Eclipse)中运行 TaskSchedulerApplication 类。

2. 创建任务

使用PostmancURL发送HTTP请求来创建新任务。

示例1:发送EMAIL任务

请求

POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json

{
    "taskId": "TASK_EMAIL_001",
    "taskType": "EMAIL",
    "parameters": {
        "recipient": "user@example.com",
        "subject": "Welcome!",
        "body": "Thank you for registering."
    },
    "delayMinutes": 1
}

响应

{
    "id": 1,
    "taskId": "TASK_EMAIL_001",
    "taskType": "EMAIL",
    "parameters": "{\"recipient\":\"user@example.com\",\"subject\":\"Welcome!\",\"body\":\"Thank you for registering.\"}",
    "scheduledTime": "2023-10-01T10:01:00",
    "createdAt": "2023-10-01T10:00:00",
    "updatedAt": "2023-10-01T10:00:00",
    "executed": false,
    "executionStatus": null,
    "executionResult": null
}

控制台输出(在1分钟后)

Sending email to: user@example.com
Subject: Welcome!
Body: Thank you for registering.
示例2:发送DATA_PROCESS任务

请求

POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json

{
    "taskId": "TASK_DATA_001",
    "taskType": "DATA_PROCESS",
    "parameters": {
        "dataSource": "database",
        "operation": "backup"
    },
    "delayMinutes": 2
}

响应

{
    "id": 2,
    "taskId": "TASK_DATA_001",
    "taskType": "DATA_PROCESS",
    "parameters": "{\"dataSource\":\"database\",\"operation\":\"backup\"}",
    "scheduledTime": "2023-10-01T10:02:00",
    "createdAt": "2023-10-01T10:00:00",
    "updatedAt": "2023-10-01T10:00:00",
    "executed": false,
    "executionStatus": null,
    "executionResult": null
}

控制台输出(在2分钟后)

Processing data from: database
Operation: backup

3. 任务去重测试

尝试创建具有相同 taskId 的任务,验证系统是否阻止重复创建。

请求

POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json

{
    "taskId": "TASK_EMAIL_001",
    "taskType": "EMAIL",
    "parameters": {
        "recipient": "duplicate@example.com",
        "subject": "Duplicate Task",
        "body": "This should not be scheduled."
    },
    "delayMinutes": 3
}

响应

"Task with taskId TASK_EMAIL_001 already exists."

4. 取消任务

创建一个任务并在其执行前取消。

创建任务请求

POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json

{
    "taskId": "TASK_CANCEL_001",
    "taskType": "EMAIL",
    "parameters": {
        "recipient": "cancel@example.com",
        "subject": "Cancel Task",
        "body": "This task will be cancelled."
    },
    "delayMinutes": 5
}

取消任务请求

DELETE http://localhost:8080/api/tasks/cancel/TASK_CANCEL_001

响应

"Task cancelled successfully."

确认取消(查询任务状态)

GET http://localhost:8080/api/tasks/TASK_CANCEL_001

响应

{
    "id": 3,
    "taskId": "TASK_CANCEL_001",
    "taskType": "EMAIL",
    "parameters": "{\"recipient\":\"cancel@example.com\",\"subject\":\"Cancel Task\",\"body\":\"This task will be cancelled.\"}",
    "scheduledTime": "2023-10-01T10:05:00",
    "createdAt": "2023-10-01T10:00:00",
    "updatedAt": "2023-10-01T10:00:00",
    "executed": true,
    "executionStatus": "CANCELLED",
    "executionResult": "Task was cancelled before execution."
}

说明

  • 任务已取消:在任务执行前取消后,系统不会执行该任务。

5. 任务恢复测试

  1. 创建多个任务,确保有未执行的任务(如延迟执行)。
  2. 重启应用程序。
  3. 确认未执行的任务是否已恢复并按预定时间执行。

说明

  • StartupService 在应用启动时会扫描未执行的任务并重新调度,确保系统重启后任务不丢失。

6. 清理任务测试

  1. 创建并执行多个任务。
  2. 等待超过保留期限(如7天)。
  3. 确认已执行的任务是否已被 CleanupService 清理。

说明

  • CleanupService 会定期清理已执行且超过保留期限的任务,防止数据库膨胀。

优化与扩展

1. 添加新的任务类型

示例:SMS任务

创建 SmsTaskHandler.java

package com.example.taskscheduler.handler;

import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class SmsTaskHandler implements TaskHandler {

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        String phoneNumber = (String) parameters.get("phoneNumber");
        String message = (String) parameters.get("message");

        // 实现发送短信的逻辑,例如使用第三方短信服务API
        System.out.println("Sending SMS to: " + phoneNumber);
        System.out.println("Message: " + message);

        // 模拟短信发送成功
        // 在实际应用中,应集成短信发送服务
    }

    @Override
    public String getTaskType() {
        return "SMS";
    }
}

测试SMS任务

请求

POST http://localhost:8080/api/tasks/schedule
Content-Type: application/json

{
    "taskId": "TASK_SMS_001",
    "taskType": "SMS",
    "parameters": {
        "phoneNumber": "+1234567890",
        "message": "Your verification code is 123456."
    },
    "delayMinutes": 1
}

预期响应

{
    "id": 4,
    "taskId": "TASK_SMS_001",
    "taskType": "SMS",
    "parameters": "{\"phoneNumber\":\"+1234567890\",\"message\":\"Your verification code is 123456.\"}",
    "scheduledTime": "2023-10-01T10:06:00",
    "createdAt": "2023-10-01T10:00:00",
    "updatedAt": "2023-10-01T10:00:00",
    "executed": false,
    "executionStatus": null,
    "executionResult": null
}

控制台输出(在1分钟后)

Sending SMS to: +1234567890
Message: Your verification code is 123456.

2. 集成邮件和短信发送服务

实际应用中,应集成真实的邮件和短信发送服务,如JavaMailSender或第三方短信API。以下为集成 JavaMailSender 的示例。

2.1. 添加依赖

pom.xml 中添加Spring Boot Starter Mail依赖:

<!-- Spring Boot Starter Mail -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
</dependency>
2.2. 配置邮件属性

application.properties 中添加邮件服务器配置:

# 邮件服务器配置(示例使用Gmail)
spring.mail.host=smtp.gmail.com
spring.mail.port=587
spring.mail.username=your_email@gmail.com
spring.mail.password=your_email_password
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true

注意:为了使用Gmail发送邮件,可能需要调整Google账户的安全设置,允许低安全性应用访问或使用App Passwords。

2.3. 修改 EmailTaskHandler.java
package com.example.taskscheduler.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
public class EmailTaskHandler implements TaskHandler {

    private final JavaMailSender mailSender;

    @Autowired
    public EmailTaskHandler(JavaMailSender mailSender) {
        this.mailSender = mailSender;
    }

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        String recipient = (String) parameters.get("recipient");
        String subject = (String) parameters.get("subject");
        String body = (String) parameters.get("body");

        // 创建简单邮件消息
        SimpleMailMessage message = new SimpleMailMessage();
        message.setTo(recipient);
        message.setSubject(subject);
        message.setText(body);

        // 发送邮件
        mailSender.send(message);

        System.out.println("Email sent to: " + recipient);
    }

    @Override
    public String getTaskType() {
        return "EMAIL";
    }
}

说明

  • 使用JavaMailSender发送邮件。
  • 确保application.properties中的邮件配置正确。

3. 使用DTO类替代Map(提高类型安全)

虽然使用Map<String, Object>提供了参数传递的灵活性,但缺乏类型安全和可读性。可以为不同的任务类型定义专用的DTO类。

示例:EmailTaskParameters.java
package com.example.taskscheduler.dto;

public class EmailTaskParameters {
    private String recipient;
    private String subject;
    private String body;

    // Getters and Setters

    public String getRecipient() {
        return recipient;
    }

    public void setRecipient(String recipient) {
        this.recipient = recipient;
    }
    
    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }
    
    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}
修改 EmailTaskHandler.java 以使用DTO
package com.example.taskscheduler.handler;

import com.example.taskscheduler.dto.EmailTaskParameters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

@Component
public class EmailTaskHandler implements TaskHandler {

    private final JavaMailSender mailSender;
    private final ObjectMapper objectMapper;

    @Autowired
    public EmailTaskHandler(JavaMailSender mailSender, ObjectMapper objectMapper) {
        this.mailSender = mailSender;
        this.objectMapper = objectMapper;
    }

    @Override
    public void handle(Map<String, Object> parameters) throws Exception {
        // 反序列化参数到DTO类
        EmailTaskParameters emailParams = objectMapper.convertValue(parameters, EmailTaskParameters.class);

        // 创建简单邮件消息
        SimpleMailMessage message = new SimpleMailMessage();
        message.setTo(emailParams.getRecipient());
        message.setSubject(emailParams.getSubject());
        message.setText(emailParams.getBody());

        // 发送邮件
        mailSender.send(message);

        System.out.println("Email sent to: " + emailParams.getRecipient());
    }

    @Override
    public String getTaskType() {
        return "EMAIL";
    }
}

优势

  • 类型安全:通过专用的DTO类,确保参数的类型正确,减少运行时错误。
  • 可读性:代码更具可读性和可维护性。

扩展

  • 为每种任务类型定义对应的DTO类,修改相应的TaskHandler实现以使用这些DTO。

总结

通过以上设计和实现,您可以构建一个通用、灵活且可扩展的动态定时任务系统。该系统具备以下特点:

  1. 通用性:支持多种类型的任务,通过TaskHandler接口实现不同的任务逻辑。
  2. 灵活的参数传递:使用JSON格式存储任务参数,或通过专用的DTO类提高类型安全。
  3. 任务去重:通过taskId确保任务的唯一性,避免重复调度。
  4. 任务持久化与恢复:任务信息存储在数据库中,应用重启时自动恢复未执行的任务。
  5. 可扩展性:新增任务类型仅需创建新的TaskHandler实现类,无需修改核心逻辑。
  6. 任务管理:提供API端点用于创建、取消和查询任务。
  7. 定期清理:定时清理已执行或取消的任务,保持数据库整洁。
  8. 异常处理:统一的异常处理机制,确保API的一致性和安全性。

您可以根据具体业务需求进一步扩展和优化该系统,如:

  • 任务重试机制:为失败的任务设置自动重试策略。
  • 任务优先级:为任务设置优先级,调整调度逻辑以优先处理高优先级任务。
  • 分布式调度:如果系统需要在多实例下运行,可考虑集成分布式调度框架(如Quartz集群)。
  • 监控与报警:集成监控工具,实时监控任务执行状态,设置报警规则。

希望这个完整的实现能帮助您顺利构建所需的动态定时任务系统。

http://www.dtcms.com/a/108882.html

相关文章:

  • 【动态规划】二分优化最长上升子序列
  • 34、web前端开发之JavaScript(三)
  • 将图表和表格导出为PDF的功能
  • ThreadLocalMap的作用和特点
  • cobbler自动最小化安装centos,并配置地址
  • springboot+easyexcel实现下载excels模板下拉选择
  • Spring Boot 的配置文件
  • 网络空间安全(50)JavaScript基础语法
  • C#:重构(refactoring)
  • 【Spring Cloud Alibaba】:Nacos 使用全详解
  • CExercise04_1位运算符_1 用位运算符判断某个整数是否为奇数
  • 购物车(V2装饰器)
  • 算法:优选(1)
  • RK3568驱动 SPI主/从 配置
  • 基于微信小程序的医院挂号预约系统设计与实现
  • Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
  • WRF-Chem 中出现real.exe错误(psfc 计算问题)- MOZART
  • Apache BookKeeper Ledger 的底层存储机制解析
  • 配置单区域OSPF
  • ARM—LED,看门狗关闭,按钮,时钟,PWM定时器,蜂鸣器
  • 【前端扫盲】postman介绍及使用
  • 走向多模态AI之路(三):多模态 AI 的挑战与未来
  • 【家政平台开发(12)】家政平台数据库设计:从MySQL到MyBatis-Plus实战
  • 多个参考文献插入、如何同时插入多个参考文献:如[1,2]、[1-3]格式
  • 搬砖--贪心+排序的背包
  • 请谈谈分治算法,如何应用分治算法解决大规模问题?
  • Pico4 Pro VR 和HTC Vivi 哪个好些
  • ngx_getpid() ngx_parent = ngx_getppid()
  • [C语言笔记]09、指针
  • 代码随想录Day31