分布式任务调度系统设计方案
分布式任务调度系统设计方案 - GridJob
1. 系统架构概述
1.1 系统定位
GridJob 是一个高性能、高可靠、可扩展的分布式任务调度系统,旨在解决大规模分布式环境下的任务调度问题。系统支持海量任务的调度、执行和监控,具备强大的容错能力和扩展性。
1.2 总体架构
GridJob 采用分层架构设计,主要包含以下几个层次:
- 接入层:提供统一的 API 接口,支持 HTTP/HTTPS、RPC 等多种接入方式
- 调度层:负责任务的调度、分发和执行状态管理
- 执行层:负责任务的实际执行和结果回传
- 存储层:负责任务数据、执行历史等信息的持久化存储
- 注册中心:负责服务发现和健康检查
- 监控层:负责系统运行状态监控和告警
1.3 系统拓扑结构
┌─────────────┐│ 客户端应用 │└──────┬──────┘│▼
┌─────────────────────────────────────────────────────────────────────┐
│ 接入层 (API Gateway) │
└───────────────────────────────────┬─────────────────────────────────┘│▼
┌─────────────────────────────────────────────────────────────────────┐
│ 调度中心集群 │
└───────────────────────────────────┬─────────────────────────────────┘│┌─────────────────┬┴┬─────────────────┐│ │ │ │▼ ▼ ▼ ▼┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ 执行节点集群 │ │ 执行节点集群 │ │ 执行节点集群 ││ (Group 1) │ │ (Group 2) │ │ (Group N) │└─────────────┘ └─────────────┘ └─────────────┘│ │ │└─────────────────┼──────────────────┘│▼
┌─────────────────────────────────────────────────────────────────────┐
│ 存储层 │
└─────────────────────────────────────────────────────────────────────┘
2. 核心组件设计
2.1 调度中心 (Scheduler)
调度中心是系统的核心组件,负责任务的调度、分发和状态管理。
2.1.1 功能职责
- 任务接收与解析
- 任务依赖分析
- 任务优先级管理
- 任务分发策略实现
- 任务执行状态跟踪
- 失败任务重试策略
- 任务超时管理
- 负载均衡
2.1.2 内部架构
调度中心内部采用模块化设计,主要包含以下模块:
- 任务接收器 (Task Receiver):接收来自客户端的任务请求
- 任务解析器 (Task Parser):解析任务参数和配置
- 任务调度器 (Task Dispatcher):根据调度策略分发任务
- 状态管理器 (State Manager):管理任务执行状态
- 资源管理器 (Resource Manager):管理执行节点资源
- 负载均衡器 (Load Balancer):实现任务的负载均衡分发
2.1.3 调度中心高可用设计
调度中心采用主从架构,保证高可用:
- 主从选举:基于 ZooKeeper/etcd 实现的分布式锁进行主节点选举
- 状态同步:主从节点间通过消息队列或共享存储同步状态
- 故障转移:主节点故障时,从节点自动接管,保证服务不中断
- 数据一致性:采用分布式事务保证数据一致性
2.1.4 调度中心实现细节
public class SchedulerCenter {private TaskReceiver taskReceiver;private TaskParser taskParser;private TaskDispatcher taskDispatcher;private StateManager stateManager;private ResourceManager resourceManager;private LoadBalancer loadBalancer;// 初始化调度中心public void initialize() {// 初始化各个组件taskReceiver = new TaskReceiver();taskParser = new TaskParser();taskDispatcher = new TaskDispatcher();stateManager = new StateManager();resourceManager = new ResourceManager();loadBalancer = new LoadBalancer();// 注册到注册中心registerToRegistry();// 启动主从选举startLeaderElection();// 启动任务接收服务taskReceiver.start();}// 提交任务public String submitTask(Task task) {// 解析任务TaskDefinition taskDefinition = taskParser.parse(task);// 生成任务IDString taskId = generateTaskId();// 保存任务信息stateManager.saveTask(taskId, taskDefinition);// 触发任务调度scheduleTask(taskId, taskDefinition);return taskId;}// 调度任务private void scheduleTask(String taskId, TaskDefinition taskDefinition) {// 检查任务依赖if (!checkDependencies(taskDefinition)) {// 如果依赖未满足,将任务放入等待队列stateManager.waitForDependencies(taskId);return;}// 获取可用执行节点List<ExecutorNode> availableNodes = resourceManager.getAvailableNodes(taskDefinition.getResourceRequirements());// 负载均衡选择执行节点ExecutorNode selectedNode = loadBalancer.select(availableNodes, taskDefinition);// 分发任务到执行节点taskDispatcher.dispatch(taskId, taskDefinition, selectedNode);// 更新任务状态stateManager.updateTaskStatus(taskId, TaskStatus.DISPATCHED);}// 处理任务完成事件public void handleTaskCompletion(String taskId, TaskResult result) {// 更新任务状态stateManager.updateTaskStatus(taskId, result.isSuccess() ? TaskStatus.COMPLETED : TaskStatus.FAILED);// 处理任务结果processTaskResult(taskId, result);// 触发依赖任务triggerDependentTasks(taskId);}// 触发依赖任务private void triggerDependentTasks(String taskId) {List<String> dependentTaskIds = stateManager.getDependentTasks(taskId);for (String dependentTaskId : dependentTaskIds) {TaskDefinition taskDefinition = stateManager.getTaskDefinition(dependentTaskId);scheduleTask(dependentTaskId, taskDefinition);}}
}
2.2 执行节点 (Executor)
执行节点负责实际执行调度中心分发的任务,并将执行结果回传给调度中心。
2.2.1 功能职责
- 任务接收与执行
- 资源隔离与管理
- 任务执行状态上报
- 健康状态自检
- 任务日志收集
2.2.2 内部架构
执行节点内部主要包含以下模块:
- 任务接收器 (Task Receiver):接收来自调度中心的任务
- 任务执行器 (Task Executor):执行具体任务
- 资源管理器 (Resource Manager):管理本地资源
- 状态上报器 (Status Reporter):向调度中心上报执行状态
- 日志收集器 (Log Collector):收集任务执行日志
2.2.3 执行节点实现细节
public class ExecutorNode {private String nodeId;private TaskReceiver taskReceiver;private TaskExecutor taskExecutor;private ResourceManager resourceManager;private StatusReporter statusReporter;private LogCollector logCollector;// 初始化执行节点public void initialize() {// 生成节点IDnodeId = generateNodeId();// 初始化各个组件taskReceiver = new TaskReceiver();taskExecutor = new TaskExecutor();resourceManager = new ResourceManager();statusReporter = new StatusReporter();logCollector = new LogCollector();// 注册到注册中心registerToRegistry();// 启动心跳服务startHeartbeat();// 启动任务接收服务taskReceiver.start();}// 接收任务public void receiveTask(String taskId, TaskDefinition taskDefinition) {// 检查资源是否满足要求if (!resourceManager.checkResourceAvailability(taskDefinition.getResourceRequirements())) {// 资源不足,拒绝任务statusReporter.reportTaskRejected(taskId, "资源不足");return;}// 分配资源resourceManager.allocateResource(taskId, taskDefinition.getResourceRequirements());// 更新任务状态statusReporter.reportTaskStatus(taskId, TaskStatus.RECEIVED);// 异步执行任务executeTask(taskId, taskDefinition);}// 执行任务private void executeTask(String taskId, TaskDefinition taskDefinition) {try {// 更新任务状态statusReporter.reportTaskStatus(taskId, TaskStatus.RUNNING);// 启动日志收集logCollector.startCollection(taskId);// 执行任务TaskResult result = taskExecutor.execute(taskDefinition);// 停止日志收集logCollector.stopCollection(taskId);// 释放资源resourceManager.releaseResource(taskId);// 上报任务结果statusReporter.reportTaskResult(taskId, result);} catch (Exception e) {// 处理异常logCollector.logException(taskId, e);// 释放资源resourceManager.releaseResource(taskId);// 上报任务失败statusReporter.reportTaskFailed(taskId, e.getMessage());}}// 心跳服务private void startHeartbeat() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {// 收集节点状态信息NodeStatus status = collectNodeStatus();// 发送心跳statusReporter.sendHeartbeat(nodeId, status);} catch (Exception e) {// 处理心跳异常logger.error("心跳发送失败", e);}}, 0, 5, TimeUnit.SECONDS);}// 收集节点状态private NodeStatus collectNodeStatus() {NodeStatus status = new NodeStatus();status.setCpuUsage(resourceManager.getCpuUsage());status.setMemoryUsage(resourceManager.getMemoryUsage());status.setDiskUsage(resourceManager.getDiskUsage());status.setTaskCount(taskExecutor.getRunningTaskCount());return status;}
}
2.3 任务存储 (Task Storage)
任务存储负责任务数据的持久化,包括任务定义、执行历史、执行结果等信息。
2.3.1 存储模型设计
任务存储采用分层设计,包括:
- 热数据层:存储活跃任务数据,采用内存数据库(如 Redis)
- 温数据层:存储近期任务数据,采用关系型数据库(如 MySQL)
- 冷数据层:存储历史任务数据,采用分布式文件系统或对象存储(如 HDFS、S3)
2.3.2 数据分片策略
为了支持海量任务数据的存储,采用分片策略:
- 水平分片:按照任务 ID 哈希分片
- 时间分片:按照任务创建时间分片
- 混合分片:结合水平分片和时间分片
2.3.3 存储接口设计
public interface TaskStorage {// 保存任务定义void saveTaskDefinition(String taskId, TaskDefinition taskDefinition);// 获取任务定义TaskDefinition getTaskDefinition(String taskId);// 更新任务状态void updateTaskStatus(String taskId, TaskStatus status);// 保存任务结果void saveTaskResult(String taskId, TaskResult result);// 获取任务结果TaskResult getTaskResult(String taskId);// 获取任务执行历史List<TaskExecution> getTaskExecutionHistory(String taskId);// 获取依赖任务List<String> getDependentTasks(String taskId);// 获取任务依赖List<String> getTaskDependencies(String taskId);// 清理过期数据void cleanupExpiredData(Date expirationDate);
}
2.3.4 存储实现细节
public class HybridTaskStorage implements TaskStorage {private RedisTaskStorage hotStorage;private MySQLTaskStorage warmStorage;private HDFSTaskStorage coldStorage;public HybridTaskStorage() {hotStorage = new RedisTaskStorage();warmStorage = new MySQLTaskStorage();coldStorage = new HDFSTaskStorage();}@Overridepublic void saveTaskDefinition(String taskId, TaskDefinition taskDefinition) {// 保存到热数据层hotStorage.saveTaskDefinition(taskId, taskDefinition);// 异步保存到温数据层CompletableFuture.runAsync(() -> {warmStorage.saveTaskDefinition(taskId, taskDefinition);});}@Overridepublic TaskDefinition getTaskDefinition(String taskId) {// 先从热数据层查询TaskDefinition taskDefinition = hotStorage.getTaskDefinition(taskId);// 如果热数据层没有,从温数据层查询if (taskDefinition == null) {taskDefinition = warmStorage.getTaskDefinition(taskId);// 如果找到了,回填到热数据层if (taskDefinition != null) {hotStorage.saveTaskDefinition(taskId, taskDefinition);} else {// 如果温数据层也没有,从冷数据层查询taskDefinition = coldStorage.getTaskDefinition(taskId);}}return taskDefinition;}@Overridepublic void updateTaskStatus(String taskId, TaskStatus status) {// 更新热数据层hotStorage.updateTaskStatus(taskId, status);// 异步更新温数据层CompletableFuture.runAsync(() -> {warmStorage.updateTaskStatus(taskId, status);});}@Overridepublic void cleanupExpiredData(Date expirationDate) {// 清理热数据层hotStorage.cleanupExpiredData(expirationDate);// 将温数据层过期数据迁移到冷数据层List<TaskData> expiredData = warmStorage.getExpiredData(expirationDate);coldStorage.batchSave(expiredData);// 清理温数据层过期数据warmStorage.cleanupExpiredData(expirationDate);}// 其他方法实现...
}
2.4 注册中心 (Registry)
注册中心负责服务发现、健康检查和配置管理。
2.4.1 功能职责
- 服务注册与发现
- 健康检查
- 配置管理
- 集群状态管理
- 主从选举支持
2.4.2 实现方案
注册中心可以基于以下开源组件实现:
- ZooKeeper:提供服务注册、发现和分布式协调
- etcd:提供键值存储和服务发现
- Consul:提供服务发现、健康检查和配置管理
- Nacos:提供动态服务发现、配置管理和服务管理
2.4.3 注册中心接口设计
public interface Registry {// 注册服务void registerService(String serviceId, ServiceInstance instance);// 注销服务void deregisterService(String serviceId, String instanceId);// 发现服务List<ServiceInstance> discoverService(String serviceId);// 监听服务变化void watchService(String serviceId, ServiceChangeListener listener);// 获取配置String getConfig(String key);// 设置配置void setConfig(String key, String value);// 监听配置变化void watchConfig(String key, ConfigChangeListener listener);// 创建分布式锁Lock createLock(String lockKey);// 创建领导选举LeaderElection createLeaderElection(String electionPath);
}
2.4.4 基于 ZooKeeper 的实现
public class ZooKeeperRegistry implements Registry {private CuratorFramework client;private String rootPath;public ZooKeeperRegistry(String connectString, String rootPath) {this.rootPath = rootPath;// 创建 ZooKeeper 客户端client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();// 启动客户端client.start();}@Overridepublic void registerService(String serviceId, ServiceInstance instance) {try {// 创建服务路径String servicePath = rootPath + "/services/" + serviceId;if (client.checkExists().forPath(servicePath) == null) {client.create().creatingParentsIfNeeded().forPath(servicePath);}// 创建实例节点String instancePath = servicePath + "/" + instance.getInstanceId();byte[] instanceData = serialize(instance);// 创建临时节点client.create().withMode(CreateMode.EPHEMERAL).forPath(instancePath, instanceData);} catch (Exception e) {throw new RegistryException("注册服务失败", e);}}@Overridepublic void deregisterService(String serviceId, String instanceId) {try {// 删除实例节点String instancePath = rootPath + "/services/" + serviceId + "/" + instanceId;client.delete().forPath(instancePath);} catch (Exception e) {throw new RegistryException("注销服务失败", e);}}@Overridepublic List<ServiceInstance> discoverService(String serviceId) {try {// 获取服务路径String servicePath = rootPath + "/services/" + serviceId;// 获取所有实例List<String> instanceIds = client.getChildren().forPath(servicePath);// 获取实例数据List<ServiceInstance> instances = new ArrayList<>();for (String instanceId : instanceIds) {String instancePath = servicePath + "/" + instanceId;byte[] instanceData = client.getData().forPath(instancePath);ServiceInstance instance = deserialize(instanceData);instances.add(instance);}return instances;} catch (Exception e) {throw new RegistryException("发现服务失败", e);}}@Overridepublic Lock createLock(String lockKey) {String lockPath = rootPath + "/locks/" + lockKey;return new InterProcessMutex(client, lockPath);}@Overridepublic LeaderElection createLeaderElection(String electionPath) {String leaderPath = rootPath + "/leader/" + electionPath;return new LeaderLatch(client, leaderPath);}// 其他方法实现...
}
2.5 通信模块 (Communication)
通信模块负责系统各组件之间的消息传递和数据交换。
2.5.1 通信模式
系统支持多种通信模式:
- 同步通信:基于 HTTP/HTTPS 或 gRPC 的请求-响应模式
- 异步通信:基于消息队列的发布-订阅模式
- 广播通信:基于消息总线的广播模式
2.5.2 消息格式
消息采用统一的格式,支持 JSON 和 Protocol Buffers 两种序列化方式。
基本消息格式:
{"messageId": "msg-123456","messageType": "TASK_DISPATCH","timestamp": 1632547890123,"source": "scheduler-1","destination": "executor-2","payload": {// 具体消息内容}
}
2.5.3 通信接口设计
public interface CommunicationService {// 发送同步消息Message sendSync(String destination, Message message, long timeout);// 发送异步消息CompletableFuture<Message> sendAsync(String destination, Message message);// 发布消息void publish(String topic, Message message);// 订阅消息void subscribe(String topic, MessageListener listener);// 广播消息void broadcast(String channel, Message message);// 注册消息处理器void registerHandler(String messageType, MessageHandler handler);
}
2.5.4 基于 Kafka 的实现
public class KafkaCommunicationService implements CommunicationService {private KafkaProducer<String, byte[]> producer;private Map<String, KafkaConsumer<String, byte[]>> consumers;private Map<String, MessageHandler> handlers;public KafkaCommunicationService(String bootstrapServers) {// 创建生产者Properties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());producer = new KafkaProducer<>(producerProps);// 初始化消费者映射consumers = new ConcurrentHashMap<>();// 初始化处理器映射handlers = new ConcurrentHashMap<>();}@Overridepublic void publish(String topic, Message message) {try {// 序列化消息byte[] messageData = serialize(message);// 创建生产者记录ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, message.getMessageId(), messageData);// 发送消息producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理发送异常logger.error("消息发送失败", exception);}});} catch (Exception e) {throw new CommunicationException("发布消息失败", e);}}@Overridepublic void subscribe(String topic, MessageListener listener) {// 创建消费者Properties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "gridJob-consumer-" + UUID.randomUUID().toString());consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 保存消费者consumers.put(topic, consumer);// 启动消费线程Thread consumerThread = new Thread(() -> {try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, byte[]> record : records) {// 反序列化消息Message message = deserialize(record.value());// 调用监听器listener.onMessage(message);}}} catch (Exception e) {logger.error("消费消息失败", e);} finally {consumer.close();}});consumerThread.setDaemon(true);consumerThread.start();}// 其他方法实现...
}
3. 数据模型设计
3.1 核心数据模型
3.1.1 任务 (Task)
任务是系统的核心数据模型,包含以下属性:
public class Task {private String taskId; // 任务IDprivate String name; // 任务名称private String description; // 任务描述private String type; // 任务类型private Map<String, Object> params; // 任务参数private List<String> dependencies; // 任务依赖private TaskPriority priority; // 任务优先级private TaskSchedule schedule; // 任务调度计划private TaskTimeout timeout; // 任务超时设置private TaskRetry retry; // 任务重试设置private Map<String, Object> resourceRequirements; // 资源需求private String routingKey; // 路由键private Map<String, String> tags; // 标签private String createdBy; // 创建者private Date createdTime; // 创建时间private Date updatedTime; // 更新时间// getter 和 setter 方法...
}
3.1.2 任务定义 (TaskDefinition)
任务定义是任务的具体执行定义,包含以下属性:
public class TaskDefinition {private String taskId; // 任务IDprivate String name; // 任务名称private String description; // 任务描述private String type; // 任务类型private Map<String, Object> params; // 任务参数private String executorClass; // 执行器类名private String executorMethod; // 执行器方法名private String codeContent; // 代码内容(脚本类型任务)private String codeLanguage; // 代码语言(脚本类型任务)private Map<String, Object> resourceRequirements; // 资源需求private TaskTimeout timeout; // 任务超时设置private TaskRetry retry; // 任务重试设置private Map<String, String> env; // 环境变量// getter 和 setter 方法...
}
3.1.3 任务实例 (TaskInstance)
任务实例是任务的一次具体执行,包含以下属性:
public class TaskInstance {private String instanceId; // 实例IDprivate String taskId; // 任务IDprivate String executorId; // 执行节点IDprivate TaskStatus status; // 任务状态private Date startTime; // 开始时间private Date endTime; // 结束时间private Long duration; // 执行时长private Integer retryCount; // 重试次数private String result; // 执行结果private String errorMessage; // 错误信息private String logPath; // 日志路径// getter 和 setter 方法...
}
3.1.4 执行节点 (ExecutorNode)
执行节点是任务的实际执行者,包含以下属性:
public class ExecutorNode {private String nodeId; // 节点IDprivate String host; // 主机地址private Integer port; // 端口private String group; // 节点组private NodeStatus status; // 节点状态private Map<String, Object> resources; // 资源信息private Integer weight; // 权重private Date registeredTime; // 注册时间private Date lastHeartbeatTime; // 最后心跳时间// getter 和 setter 方法...
}
3.2 状态模型
3.2.1 任务状态 (TaskStatus)
任务状态包括:
public enum TaskStatus {CREATED, // 已创建WAITING, // 等待调度DISPATCHED, // 已分发RUNNING, // 运行中COMPLETED, // 已完成FAILED, // 失败TIMEOUT, // 超时CANCELLED, // 已取消PAUSED // 已暂停
}
3.2.2 节点状态 (NodeStatus)
节点状态包括:
public enum NodeStatus {ONLINE, // 在线OFFLINE, // 离线BUSY, // 繁忙IDLE, // 空闲DISABLED // 已禁用
}
3.3 数据库表设计
3.3.1 任务表 (task)
CREATE TABLE `task` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`task_id` varchar(64) NOT NULL COMMENT '任务ID',`name` varchar(255) NOT NULL COMMENT '任务名称',`description` varchar(1024) DEFAULT NULL COMMENT '任务描述',`type` varchar(32) NOT NULL COMMENT '任务类型',`params` text COMMENT '任务参数(JSON)',`dependencies` text COMMENT '任务依赖(JSON)',`priority` tinyint(4) DEFAULT '0' COMMENT '任务优先级',`schedule` text COMMENT '调度计划(JSON)',`timeout` int(11) DEFAULT NULL COMMENT '超时时间(秒)',`retry` text COMMENT '重试设置(JSON)',`resource_requirements` text COMMENT '资源需求(JSON)',`routing_key` varchar(255) DEFAULT NULL COMMENT '路由键',`tags` text COMMENT '标签(JSON)',`created_by` varchar(64) DEFAULT NULL COMMENT '创建者',`created_time` datetime NOT NULL COMMENT '创建时间',`updated_time` datetime NOT NULL COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_task_id` (`task_id`),KEY `idx_created_time` (`created_time`),KEY `idx_type` (`type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务表';
3.3.2 任务定义表 (task_definition)
CREATE TABLE `task_definition` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`task_id` varchar(64) NOT NULL COMMENT '任务ID',`name` varchar(255) NOT NULL COMMENT '任务名称',`description` varchar(1024) DEFAULT NULL COMMENT '任务描述',`type` varchar(32) NOT NULL COMMENT '任务类型',`params` text COMMENT '任务参数(JSON)',`executor_class` varchar(255) DEFAULT NULL COMMENT '执行器类名',`executor_method` varchar(255) DEFAULT NULL COMMENT '执行器方法名',`code_content` text COMMENT '代码内容',`code_language` varchar(32) DEFAULT NULL COMMENT '代码语言',`resource_requirements` text COMMENT '资源需求(JSON)',`timeout` int(11) DEFAULT NULL COMMENT '超时时间(秒)',`retry` text COMMENT '重试设置(JSON)',`env` text COMMENT '环境变量(JSON)',`created_time` datetime NOT NULL COMMENT '创建时间',`updated_time` datetime NOT NULL COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_task_id` (`task_id`),KEY `idx_created_time` (`created_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务定义表';
3.3.3 任务实例表 (task_instance)
CREATE TABLE `task_instance` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`instance_id` varchar(64) NOT NULL COMMENT '实例ID',`task_id` varchar(64) NOT NULL COMMENT '任务ID',`executor_id` varchar(64) DEFAULT NULL COMMENT '执行节点ID',`status` varchar(32) NOT NULL COMMENT '任务状态',`start_time` datetime DEFAULT NULL COMMENT '开始时间',`end_time` datetime DEFAULT NULL COMMENT '结束时间',`duration` bigint(20) DEFAULT NULL COMMENT '执行时长(毫秒)',`retry_count` int(11) DEFAULT '0' COMMENT '重试次数',`result` text COMMENT '执行结果',`error_message` text COMMENT '错误信息',`log_path` varchar(255) DEFAULT NULL COMMENT '日志路径',`created_time` datetime NOT NULL COMMENT '创建时间',`updated_time` datetime NOT NULL COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_instance_id` (`instance_id`),KEY `idx_task_id` (`task_id`),KEY `idx_status` (`status`),KEY `idx_start_time` (`start_time`),KEY `idx_created_time` (`created_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务实例表';
3.3.4 执行节点表 (executor_node)
CREATE TABLE `executor_node` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`node_id` varchar(64) NOT NULL COMMENT '节点ID',`host` varchar(255) NOT NULL COMMENT '主机地址',`port` int(11) NOT NULL COMMENT '端口',`group` varchar(64) DEFAULT NULL COMMENT '节点组',`status` varchar(32) NOT NULL COMMENT '节点状态',`resources` text COMMENT '资源信息(JSON)',`weight` int(11) DEFAULT '100' COMMENT '权重',`registered_time` datetime NOT NULL COMMENT '注册时间',`last_heartbeat_time` datetime NOT NULL COMMENT '最后心跳时间',`created_time` datetime NOT NULL COMMENT '创建时间',`updated_time` datetime NOT NULL COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_node_id` (`node_id`),KEY `idx_group` (`group`),KEY `idx_status` (`status`),KEY `idx_last_heartbeat_time` (`last_heartbeat_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='执行节点表';
4. 任务调度算法
4.1 基本调度策略
4.1.1 FIFO 调度
先进先出调度策略,按照任务提交的时间顺序进行调度。
public class FifoTaskScheduler implements TaskScheduler {private Queue<Task> taskQueue = new LinkedList<>();@Overridepublic void addTask(Task task) {taskQueue.offer(task);}@Overridepublic Task nextTask() {return taskQueue.poll();}@Overridepublic boolean hasNext() {return !taskQueue.isEmpty();}
}
4.1.2 优先级调度
根据任务优先级进行调度,高优先级任务优先执行。
public class PriorityTaskScheduler implements TaskScheduler {private PriorityQueue<Task> taskQueue = new PriorityQueue<>((t1, t2) -> {// 优先级比较,数值越小优先级越高int priorityCompare = t1.getPriority().compareTo(t2.getPriority());if (priorityCompare != 0) {return priorityCompare;}// 优先级相同,按照创建时间排序return t1.getCreatedTime().compareTo(t2.getCreatedTime());});@Overridepublic void addTask(Task task) {taskQueue.offer(task);}@Overridepublic Task nextTask() {return taskQueue.poll();}@Overridepublic boolean hasNext() {return !taskQueue.isEmpty();}
}
4.1.3 公平调度
公平调度策略,确保每个用户或任务组获得公平的资源分配。
public class FairTaskScheduler implements TaskScheduler {private Map<String, Queue<Task>> userTaskQueues = new HashMap<>();private Queue<String> userQueue = new LinkedList<>();private Set<String> activeUsers = new HashSet<>();@Overridepublic void addTask(Task task) {String user = task.getCreatedBy();// 获取用户任务队列Queue<Task> userTasks = userTaskQueues.computeIfAbsent(user, k -> new LinkedList<>());userTasks.offer(task);// 如果用户不在活跃用户集合中,将其加入用户队列if (!activeUsers.contains(user)) {userQueue.offer(user);activeUsers.add(user);}}@Overridepublic Task nextTask() {if (userQueue.isEmpty()) {return null;}// 获取下一个用户String user = userQueue.poll();activeUsers.remove(user);// 获取用户任务队列Queue<Task> userTasks = userTaskQueues.get(user);Task task = userTasks.poll();// 如果用户还有任务,将其重新加入用户队列if (!userTasks.isEmpty()) {userQueue.offer(user);activeUsers.add(user);}return task;}@Overridepublic boolean hasNext() {return !userQueue.isEmpty();}
}
4.2 高级调度策略
4.2.1 资源感知调度
根据任务资源需求和执行节点资源状况进行调度。
public class ResourceAwareTaskScheduler implements TaskScheduler {private Queue<Task> taskQueue = new PriorityQueue<>((t1, t2) -> {// 优先级比较int priorityCompare = t1.getPriority().compareTo(t2.getPriority());if (priorityCompare != 0) {return priorityCompare;}// 优先级相同,按照创建时间排序return t1.getCreatedTime().compareTo(t2.getCreatedTime());});private ResourceManager resourceManager;public ResourceAwareTaskScheduler(ResourceManager resourceManager) {this.resourceManager = resourceManager;}@Overridepublic void addTask(Task task) {taskQueue.offer(task);}@Overridepublic Task nextTask(ExecutorNode executorNode) {// 获取执行节点的资源情况Map<String, Object> nodeResources = executorNode.getResources();// 遍历任务队列,找到符合资源要求的任务Iterator<Task> iterator = taskQueue.iterator();while (iterator.hasNext()) {Task task = iterator.next();// 检查资源是否满足要求if (checkResourceRequirements(task.getResourceRequirements(), nodeResources)) {iterator.remove();return task;}}return null;}private boolean checkResourceRequirements(Map<String, Object> requirements, Map<String, Object> resources) {for (Map.Entry<String, Object> entry : requirements.entrySet()) {String resourceName = entry.getKey();Object requiredValue = entry.getValue();Object availableValue = resources.get(resourceName);if (availableValue == null) {return false;}if (requiredValue instanceof Number && availableValue instanceof Number) {if (((Number) requiredValue).doubleValue() > ((Number) availableValue).doubleValue()) {return false;}} else if (!requiredValue.equals(availableValue)) {return false;}}return true;}@Overridepublic boolean hasNext() {return !taskQueue.isEmpty();}
}
4.2.2 动态调度
根据系统负载和任务执行情况动态调整调度策略。
public class DynamicTaskScheduler implements TaskScheduler {private List<TaskScheduler> schedulers;private int currentSchedulerIndex;private LoadMonitor loadMonitor;public DynamicTaskScheduler(LoadMonitor loadMonitor) {this.loadMonitor = loadMonitor;// 初始化调度器列表schedulers = new ArrayList<>();schedulers.add(new FifoTaskScheduler());schedulers.add(new PriorityTaskScheduler());schedulers.add(new ResourceAwareTaskScheduler(new ResourceManager()));// 默认使用 FIFO 调度器currentSchedulerIndex = 0;}@Overridepublic void addTask(Task task) {// 将任务添加到所有调度器for (TaskScheduler scheduler : schedulers) {scheduler.addTask(task);}}@Overridepublic Task nextTask() {// 根据系统负载选择调度器selectScheduler();// 使用选定的调度器获取下一个任务return schedulers.get(currentSchedulerIndex).nextTask();}private void selectScheduler() {// 获取系统负载SystemLoad load = loadMonitor.getCurrentLoad();// 根据负载选择调度器if (load.getCpuUsage() > 0.8 || load.getMemoryUsage() > 0.8) {// 系统负载高,使用资源感知调度器currentSchedulerIndex = 2;} else if (load.getQueueSize() > 1000) {// 队列较长,使用优先级调度器currentSchedulerIndex = 1;} else {// 默认使用 FIFO 调度器currentSchedulerIndex = 0;}}@Overridepublic boolean hasNext() {return schedulers.get(currentSchedulerIndex).hasNext();}
}
4.2.3 依赖感知调度
考虑任务之间的依赖关系进行调度。
public class DependencyAwareTaskScheduler implements TaskScheduler {private Map<String, Task> taskMap = new HashMap<>();private Map<String, Set<String>> dependencyGraph = new HashMap<>();private Map<String, Set<String>> reverseDependencyGraph = new HashMap<>();private Queue<Task> readyTasks = new LinkedList<>();@Overridepublic void addTask(Task task) {String taskId = task.getTaskId();// 保存任务taskMap.put(taskId, task);// 构建依赖图List<String> dependencies = task.getDependencies();if (dependencies == null || dependencies.isEmpty()) {// 没有依赖,直接加入就绪队列readyTasks.offer(task);} else {// 有依赖,构建依赖图Set<String> deps = new HashSet<>(dependencies);dependencyGraph.put(taskId, deps);// 构建反向依赖图for (String depId : dependencies) {reverseDependencyGraph.computeIfAbsent(depId, k -> new HashSet<>()).add(taskId);}}}@Overridepublic Task nextTask() {return readyTasks.poll();}@Overridepublic void completeTask(String taskId) {// 获取依赖于该任务的任务Set<String> dependentTasks = reverseDependencyGraph.getOrDefault(taskId, Collections.emptySet());// 更新依赖图for (String depTaskId : dependentTasks) {Set<String> deps = dependencyGraph.get(depTaskId);deps.remove(taskId);// 如果所有依赖都已完成,将任务加入就绪队列if (deps.isEmpty()) {Task task = taskMap.get(depTaskId);readyTasks.offer(task);dependencyGraph.remove(depTaskId);}}// 清理反向依赖图reverseDependencyGraph.remove(taskId);}@Overridepublic boolean hasNext() {return !readyTasks.isEmpty();}
}
4.3 调度器组合策略
将多种调度策略组合使用,以满足不同场景的需求。
public class CompositeTaskScheduler implements TaskScheduler {private List<TaskScheduler> schedulers;private List<TaskFilter> filters;public CompositeTaskScheduler() {schedulers = new ArrayList<>();filters = new ArrayList<>();}public void addScheduler(TaskScheduler scheduler) {schedulers.add(scheduler);}public void addFilter(TaskFilter filter) {filters.add(filter);}@Overridepublic void addTask(Task task) {// 应用过滤器for (TaskFilter filter : filters) {task = filter.filter(task);if (task == null) {return;}}// 将任务添加到所有调度器for (TaskScheduler scheduler : schedulers) {scheduler.addTask(task);}}@Overridepublic Task nextTask() {// 遍历所有调度器,获取下一个任务for (TaskScheduler scheduler : schedulers) {if (scheduler.hasNext()) {return scheduler.nextTask();}}return null;}@Overridepublic boolean hasNext() {// 检查是否有调度器有下一个任务for (TaskScheduler scheduler : schedulers) {if (scheduler.hasNext()) {return true;}}return false;}
}
5. 高可用性和容错机制
5.1 调度中心高可用
5.1.1 主从架构
调度中心采用主从架构,保证高可用:
- 主节点:负责任务调度和分发
- 从节点:监控主节点状态,准备接管
- 主从选举:基于 ZooKeeper/etcd 实现的分布式锁进行主节点选举
- 状态同步:主从节点间通过共享存储同步状态
public class SchedulerHAManager {private ZooKeeperRegistry registry;private String schedulerId;private LeaderElection leaderElection;private boolean isLeader;private SchedulerCenter schedulerCenter;public SchedulerHAManager(ZooKeeperRegistry registry, String schedulerId, SchedulerCenter schedulerCenter) {this.registry = registry;this.schedulerId = schedulerId;this.schedulerCenter = schedulerCenter;// 创建领导选举leaderElection = registry.createLeaderElection("scheduler");// 注册监听器leaderElection.addListener(new LeaderElectionListener() {@Overridepublic void onLeadershipAcquired() {// 成为主节点isLeader = true;activateLeaderMode();}@Overridepublic void onLeadershipLost() {// 失去主节点身份isLeader = false;activateFollowerMode();}});}public void start() {try {// 启动领导选举leaderElection.start();} catch (Exception e) {throw new RuntimeException("启动高可用管理器失败", e);}}public void stop() {try {// 停止领导选举leaderElection.close();} catch (Exception e) {logger.error("停止高可用管理器失败", e);}}private void activateLeaderMode() {logger.info("节点 {} 成为主节点", schedulerId);// 激活调度中心的主节点模式schedulerCenter.activateLeaderMode();}private void activateFollowerMode() {logger.info("节点 {} 成为从节点", schedulerId);// 激活调度中心的从节点模式schedulerCenter.activateFollowerMode();}public boolean isLeader() {return isLeader;}
}
5.1.2 状态同步机制
主从节点之间的状态同步机制:
- 共享存储:使用分布式数据库或缓存存储任务状态
- 消息通知:通过消息队列通知状态变更
- 定期同步:定期从共享存储同步状态
public class StateSync {private TaskStorage taskStorage;private CommunicationService communicationService;private String nodeId;public StateSync(TaskStorage taskStorage, CommunicationService communicationService, String nodeId) {this.taskStorage = taskStorage;this.communicationService = communicationService;this.nodeId = nodeId;// 订阅状态变更消息communicationService.subscribe("state-change", this::handleStateChangeMessage);}public void publishStateChange(String taskId, TaskStatus status) {// 创建状态变更消息StateChangeMessage message = new StateChangeMessage();message.setTaskId(taskId);message.setStatus(status);message.setTimestamp(System.currentTimeMillis());message.setSource(nodeId);// 发布状态变更消息communicationService.publish("state-change", message);}private void handleStateChangeMessage(Message message) {if (message instanceof StateChangeMessage) {StateChangeMessage stateChangeMessage = (StateChangeMessage) message;// 忽略自己发布的消息if (nodeId.equals(stateChangeMessage.getSource())) {return;}// 更新本地状态String taskId = stateChangeMessage.getTaskId();TaskStatus status = stateChangeMessage.getStatus();taskStorage.updateTaskStatus(taskId, status);}}public void syncFromStorage() {// 从存储同步状态// 这里可以实现增量同步或全量同步}
}
5.2 执行节点容错
5.2.1 健康检查
执行节点定期向调度中心发送心跳,调度中心通过心跳检测执行节点的健康状态。
public class HealthChecker {private Map<String, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();private Map<String, Long> lastHeartbeatMap = new ConcurrentHashMap<>();private long heartbeatTimeout = 30000; // 30秒public void updateHeartbeat(String nodeId, NodeStatus status) {nodeStatusMap.put(nodeId, status);lastHeartbeatMap.put(nodeId, System.currentTimeMillis());}public List<String> checkDeadNodes() {List<String> deadNodes = new ArrayList<>();long now = System.currentTimeMillis();for (Map.Entry<String, Long> entry : lastHeartbeatMap.entrySet()) {String nodeId = entry.getKey();Long lastHeartbeat = entry.getValue();if (now - lastHeartbeat > heartbeatTimeout) {// 节点超时,标记为死亡deadNodes.add(nodeId);// 更新节点状态nodeStatusMap.put(nodeId, NodeStatus.OFFLINE);}}return deadNodes;}public NodeStatus getNodeStatus(String nodeId) {return nodeStatusMap.getOrDefault(nodeId, NodeStatus.OFFLINE);}public void removeNode(String nodeId) {nodeStatusMap.remove(nodeId);lastHeartbeatMap.remove(nodeId);}
}
5.2.2 任务失败重试
任务执行失败时,根据重试策略进行重试。
public class TaskRetryManager {private TaskStorage taskStorage;private SchedulerCenter schedulerCenter;public TaskRetryManager(TaskStorage taskStorage, SchedulerCenter schedulerCenter) {this.taskStorage = taskStorage;this.schedulerCenter = schedulerCenter;}public void handleTaskFailure(String taskId, String errorMessage) {// 获取任务定义TaskDefinition taskDefinition = taskStorage.getTaskDefinition(taskId);// 获取任务实例TaskInstance taskInstance = taskStorage.getTaskInstance(taskId);// 获取重试设置TaskRetry retry = taskDefinition.getRetry();// 检查是否需要重试if (retry != null && taskInstance.getRetryCount() < retry.getMaxRetries()) {// 计算重试延迟long delay = calculateRetryDelay(retry, taskInstance.getRetryCount());// 更新重试次数taskInstance.setRetryCount(taskInstance.getRetryCount() + 1);taskStorage.updateTaskInstance(taskInstance);// 延迟重试schedulerCenter.scheduleRetry(taskId, delay);} else {// 重试次数已达上限,标记任务为失败taskStorage.updateTaskStatus(taskId, TaskStatus.FAILED);// 记录错误信息taskStorage.saveTaskError(taskId, errorMessage);}}private long calculateRetryDelay(TaskRetry retry, int retryCount) {// 指数退避策略if (retry.isExponentialBackoff()) {return retry.getInitialDelay() * (long) Math.pow(retry.getBackoffFactor(), retryCount);} else {// 固定延迟return retry.getInitialDelay();}}
}
5.2.3 任务超时处理
任务执行超时时,进行超时处理。
public class TaskTimeoutManager {private TaskStorage taskStorage;private SchedulerCenter schedulerCenter;private ScheduledExecutorService scheduler;public TaskTimeoutManager(TaskStorage taskStorage, SchedulerCenter schedulerCenter) {this.taskStorage = taskStorage;this.schedulerCenter = schedulerCenter;this.scheduler = Executors.newScheduledThreadPool(1);}public void scheduleTimeoutCheck(String taskId, long timeout) {scheduler.schedule(() -> {// 获取任务实例TaskInstance taskInstance = taskStorage.getTaskInstance(taskId);// 检查任务状态if (taskInstance.getStatus() == TaskStatus.RUNNING) {// 任务仍在运行,处理超时handleTimeout(taskId);}}, timeout, TimeUnit.MILLISECONDS);}private void handleTimeout(String taskId) {// 获取任务定义TaskDefinition taskDefinition = taskStorage.getTaskDefinition(taskId);// 获取任务实例TaskInstance taskInstance = taskStorage.getTaskInstance(taskId);// 获取超时策略TaskTimeout timeout = taskDefinition.getTimeout();// 根据超时策略处理switch (timeout.getStrategy()) {case KILL:// 杀死任务schedulerCenter.killTask(taskId);// 更新任务状态taskStorage.updateTaskStatus(taskId, TaskStatus.TIMEOUT);break;case ALERT:// 发送告警schedulerCenter.sendAlert(taskId, "任务执行超时");break;case RETRY:// 杀死任务schedulerCenter.killTask(taskId);// 重试任务schedulerCenter.retryTask(taskId);break;}}
}
5.3 数据一致性保障
5.3.1 分布式事务
使用分布式事务保证数据一致性。
public class DistributedTransactionManager {private DataSource dataSource;public DistributedTransactionManager(DataSource dataSource) {this.dataSource = dataSource;}public <T> T executeInTransaction(TransactionCallback<T> callback) throws Exception {Connection connection = null;boolean autoCommit = false;try {// 获取数据库连接connection = dataSource.getConnection();// 保存自动提交设置autoCommit = connection.getAutoCommit();// 关闭自动提交connection.setAutoCommit(false);// 执行回调T result = callback.execute(connection);// 提交事务connection.commit();return result;} catch (Exception e) {// 回滚事务if (connection != null) {try {connection.rollback();} catch (SQLException ex) {logger.error("回滚事务失败", ex);}}throw e;} finally {// 恢复自动提交设置if (connection != null) {try {connection.setAutoCommit(autoCommit);connection.close();} catch (SQLException ex) {logger.error("关闭连接失败", ex);}}}}public interface TransactionCallback<T> {T execute(Connection connection) throws Exception;}
}
5.3.2 幂等性设计
保证任务执行的幂等性,避免重复执行导致的问题。
public class IdempotentExecutor {private TaskStorage taskStorage;public IdempotentExecutor(TaskStorage taskStorage) {this.taskStorage = taskStorage;}public TaskResult execute(String taskId, TaskDefinition taskDefinition, TaskExecutor executor) {// 获取任务实例TaskInstance taskInstance = taskStorage.getTaskInstance(taskId);// 检查任务是否已经执行过if (taskInstance.getStatus() == TaskStatus.COMPLETED) {// 任务已经执行过,直接返回结果return taskStorage.getTaskResult(taskId);}// 生成幂等性标识String idempotentKey = generateIdempotentKey(taskId, taskDefinition);// 尝试获取幂等性锁if (acquireIdempotentLock(idempotentKey)) {try {// 执行任务TaskResult result = executor.execute(taskDefinition);// 保存结果taskStorage.saveTaskResult(taskId, result);return result;} finally {// 释放幂等性锁releaseIdempotentLock(idempotentKey);}} else {// 任务正在执行中,等待结果return waitForResult(taskId);}}private String generateIdempotentKey(String taskId, TaskDefinition taskDefinition) {// 生成幂等性标识return "idempotent:" + taskId;}private boolean acquireIdempotentLock(String idempotentKey) {// 获取幂等性锁// 这里可以使用 Redis 的 SETNX 命令实现return true;}private void releaseIdempotentLock(String idempotentKey) {// 释放幂等性锁}private TaskResult waitForResult(String taskId) {// 等待任务执行结果// 这里可以使用轮询或者阻塞等待return taskStorage.getTaskResult(taskId);}
}
5.4 灾备机制
5.4.1 数据备份
定期备份任务数据,确保数据安全。
public class DataBackupManager {private TaskStorage taskStorage;private String backupPath;private ScheduledExecutorService scheduler;public DataBackupManager(TaskStorage taskStorage, String backupPath) {this.taskStorage = taskStorage;this.backupPath = backupPath;this.scheduler = Executors.newScheduledThreadPool(1);}public void start() {// 每天凌晨 2 点执行备份scheduler.scheduleAtFixedRate(this::backup, getInitialDelay(), 24 * 60 * 60 * 1000, TimeUnit.MILLISECONDS);}private long getInitialDelay() {// 计算距离下一个凌晨 2 点的时间Calendar calendar = Calendar.getInstance();calendar.set(Calendar.HOUR_OF_DAY, 2);calendar.set(Calendar.MINUTE, 0);calendar.set(Calendar.SECOND, 0);calendar.set(Calendar.MILLISECOND, 0);long now = System.currentTimeMillis();long nextBackupTime = calendar.getTimeInMillis();if (nextBackupTime <= now) {nextBackupTime += 24 * 60 * 60 * 1000;}return nextBackupTime - now;}private void backup() {try {// 获取当前时间SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");String timestamp = sdf.format(new Date());// 备份文件路径String backupFile = backupPath + "/backup_" + timestamp + ".zip";// 执行备份taskStorage.backup(backupFile);logger.info("数据备份成功:{}", backupFile);} catch (Exception e) {logger.error("数据备份失败", e);}}public void stop() {scheduler.shutdown();}
}
5.4.2 异地多活
部署多个数据中心,实现异地多活。
public class MultiDataCenterManager {private List<DataCenter> dataCenters;private DataCenterSelector selector;private CommunicationService communicationService;public MultiDataCenterManager(List<DataCenter> dataCenters, DataCenterSelector selector, CommunicationService communicationService) {this.dataCenters = dataCenters;this.selector = selector;this.communicationService = communicationService;// 订阅数据中心状态变更消息communicationService.subscribe("data-center-status", this::handleDataCenterStatusChange);}public DataCenter selectDataCenter(Task task) {// 选择数据中心return selector.select(dataCenters, task);}private void handleDataCenterStatusChange(Message message) {if (message instanceof DataCenterStatusMessage) {DataCenterStatusMessage statusMessage = (DataCenterStatusMessage) message;// 更新数据中心状态for (DataCenter dataCenter : dataCenters) {if (dataCenter.getId().equals(statusMessage.getDataCenterId())) {dataCenter.setStatus(statusMessage.getStatus());break;}}}}public void syncTask(Task task) {// 同步任务到所有数据中心for (DataCenter dataCenter : dataCenters) {if (dataCenter.getStatus() == DataCenterStatus.ONLINE) {// 创建同步消息TaskSyncMessage syncMessage = new TaskSyncMessage();syncMessage.setTaskId(task.getTaskId());syncMessage.setTask(task);// 发送同步消息communicationService.sendAsync(dataCenter.getEndpoint(), syncMessage);}}}
}
6. 扩展性设计
6.1 水平扩展
6.1.1 调度中心集群扩展
调度中心采用无状态设计,支持水平扩展:
- 节点自动发现:新节点启动后自动注册到注册中心
- 负载均衡:通过负载均衡器分发请求
- 状态共享:通过共享存储共享状态
public class SchedulerCluster {private List<SchedulerNode> nodes = new ArrayList<>();private Registry registry;private LoadBalancer loadBalancer;public SchedulerCluster(Registry registry, LoadBalancer loadBalancer) {this.registry = registry;this.loadBalancer = loadBalancer;// 监听节点变化registry.watchService("scheduler", this::handleNodeChange);}private void handleNodeChange(List<ServiceInstance> instances) {// 更新节点列表synchronized (nodes) {nodes.clear();for (ServiceInstance instance : instances) {SchedulerNode node = new SchedulerNode(instance.getInstanceId(), instance.getHost(), instance.getPort());nodes.add(node);}}// 更新负载均衡器loadBalancer.updateNodes(nodes);}public SchedulerNode selectNode(Task task) {// 选择节点return loadBalancer.select(nodes, task);}public int getNodeCount() {return nodes.size();}
}
6.1.2 执行节点集群扩展
执行节点支持动态扩缩容:
- 自动注册:执行节点启动后自动注册到注册中心
- 分组管理:按照节点组进行管理
- 动态发现:调度中心动态发现执行节点
public class ExecutorClusterManager {private Map<String, List<ExecutorNode>> groupNodes = new HashMap<>();private Registry registry;public ExecutorClusterManager(Registry registry) {this.registry = registry;// 监听执行节点变化registry.watchService("executor", this::handleNodeChange);}private void handleNodeChange(List<ServiceInstance> instances) {// 按组分类节点Map<String, List<ExecutorNode>> newGroupNodes = new HashMap<>();for (ServiceInstance instance : instances) {String group = instance.getMetadata().getOrDefault("group", "default");ExecutorNode node = new ExecutorNode();node.setNodeId(instance.getInstanceId());node.setHost(instance.getHost());node.setPort(instance.getPort());node.setGroup(group);node.setStatus(NodeStatus.valueOf(instance.getMetadata().getOrDefault("status", "ONLINE")));newGroupNodes.computeIfAbsent(group, k -> new ArrayList<>()).add(node);}// 更新节点分组synchronized (groupNodes) {groupNodes.clear();groupNodes.putAll(newGroupNodes);}}public List<ExecutorNode> getNodesByGroup(String group) {return groupNodes.getOrDefault(group, Collections.emptyList());}public List<ExecutorNode> getAllNodes() {List<ExecutorNode> allNodes = new ArrayList<>();for (List<ExecutorNode> nodes : groupNodes.values()) {allNodes.addAll(nodes);}return allNodes;}public int getNodeCount() {return getAllNodes().size();}
}
6.2 垂直扩展
6.2.1 资源隔离
通过资源隔离实现垂直扩展:
- CPU 隔离:限制任务 CPU 使用率
- 内存隔离:限制任务内存使用
- 磁盘隔离:限制任务磁盘使用
- 网络隔离:限制任务网络带宽
public class ResourceIsolator {private ContainerManager containerManager;public ResourceIsolator(ContainerManager containerManager) {this.containerManager = containerManager;}public Container createContainer(String taskId, Map<String, Object> resourceRequirements) {// 创建容器配置ContainerConfig config = new ContainerConfig();config.setTaskId(taskId);// 设置 CPU 限制if (resourceRequirements.containsKey("cpu")) {config.setCpuLimit((Double) resourceRequirements.get("cpu"));}// 设置内存限制if (resourceRequirements.containsKey("memory")) {config.setMemoryLimit((Long) resourceRequirements.get("memory"));}// 设置磁盘限制if (resourceRequirements.containsKey("disk")) {config.setDiskLimit((Long) resourceRequirements.get("disk"));}// 设置网络限制if (resourceRequirements.containsKey("network")) {config.setNetworkLimit((Long) resourceRequirements.get("network"));}// 创建容器return containerManager.createContainer(config);}public void destroyContainer(String containerId) {containerManager.destroyContainer(containerId);}
}
6.2.2 任务优先级
通过任务优先级实现资源分配:
- 优先级队列:按照优先级排序任务
- 资源预留:为高优先级任务预留资源
- 资源抢占:高优先级任务可以抢占低优先级任务的资源
public class PriorityResourceAllocator {private ResourceManager resourceManager;public PriorityResourceAllocator(ResourceManager resourceManager) {this.resourceManager = resourceManager;}public boolean allocateResource(Task task) {// 获取任务优先级TaskPriority priority = task.getPriority();// 获取资源需求Map<String, Object> resourceRequirements = task.getResourceRequirements();// 检查资源是否足够if (resourceManager.checkResourceAvailability(resourceRequirements)) {// 资源足够,直接分配resourceManager.allocateResource(task.getTaskId(), resourceRequirements);return true;} else if (priority == TaskPriority.HIGH) {// 高优先级任务,尝试抢占资源return tryPreemptResource(task);} else {// 资源不足,无法分配return false;}}private boolean tryPreemptResource(Task task) {// 获取资源需求Map<String, Object> resourceRequirements = task.getResourceRequirements();// 获取低优先级任务List<Task> lowPriorityTasks = resourceManager.getLowPriorityTasks();// 计算可抢占的资源Map<String, Object> preemptableResources = new HashMap<>();for (Task lowPriorityTask : lowPriorityTasks) {Map<String, Object> taskResources = resourceManager.getTaskResources(lowPriorityTask.getTaskId());// 累加可抢占资源for (Map.Entry<String, Object> entry : taskResources.entrySet()) {String resourceName = entry.getKey();Object resourceValue = entry.getValue();if (resourceValue instanceof Number) {double value = ((Number) resourceValue).doubleValue();if (preemptableResources.containsKey(resourceName)) {double currentValue = ((Number) preemptableResources.get(resourceName)).doubleValue();preemptableResources.put(resourceName, currentValue + value);} else {preemptableResources.put(resourceName, value);}}}// 检查累计资源是否满足需求if (checkResourceSufficient(preemptableResources, resourceRequirements)) {// 资源足够,开始抢占preemptResources(task, lowPriorityTasks);return true;}}return false;}private boolean checkResourceSufficient(Map<String, Object> available, Map<String, Object> required) {for (Map.Entry<String, Object> entry : required.entrySet()) {String resourceName = entry.getKey();Object requiredValue = entry.getValue();if (requiredValue instanceof Number) {double required = ((Number) requiredValue).doubleValue();if (!available.containsKey(resourceName)) {return false;}double available = ((Number) available.get(resourceName)).doubleValue();if (available < required) {return false;}}}return true;}private void preemptResources(Task highPriorityTask, List<Task> lowPriorityTasks) {// 获取资源需求Map<String, Object> resourceRequirements = highPriorityTask.getResourceRequirements();// 记录已抢占的资源Map<String, Double> preemptedResources = new HashMap<>();// 抢占资源for (Task lowPriorityTask : lowPriorityTasks) {// 检查是否已满足需求if (checkResourceSufficient(preemptedResources, resourceRequirements)) {break;}// 暂停低优先级任务resourceManager.pauseTask(lowPriorityTask.getTaskId());// 释放资源Map<String, Object> taskResources = resourceManager.getTaskResources(lowPriorityTask.getTaskId());resourceManager.releaseResource(lowPriorityTask.getTaskId());// 累加已抢占资源for (Map.Entry<String, Object> entry : taskResources.entrySet()) {String resourceName = entry.getKey();Object resourceValue = entry.getValue();if (resourceValue instanceof Number) {double value = ((Number) resourceValue).doubleValue();if (preemptedResources.containsKey(resourceName)) {double currentValue = preemptedResources.get(resourceName);preemptedResources.put(resourceName, currentValue + value);} else {preemptedResources.put(resourceName, value);}}}}// 分配资源给高优先级任务resourceManager.allocateResource(highPriorityTask.getTaskId(), resourceRequirements);}
}
6.3 插件化架构
6.3.1 插件框架
通过插件框架实现功能扩展:
- 插件接口:定义统一的插件接口
- 插件加载器:动态加载插件
- 插件生命周期:管理插件的生命周期
public class PluginFramework {private Map<String, Plugin> plugins = new HashMap<>();private PluginLoader pluginLoader;public PluginFramework(PluginLoader pluginLoader) {this.pluginLoader = pluginLoader;}public void loadPlugins(String pluginDir) {// 加载插件List<Plugin> loadedPlugins = pluginLoader.loadPlugins(pluginDir);// 注册插件for (Plugin plugin : loadedPlugins) {registerPlugin(plugin);}}public void registerPlugin(Plugin plugin) {// 初始化插件plugin.init();// 注册插件plugins.put(plugin.getId(), plugin);}public void unregisterPlugin(String pluginId) {Plugin plugin = plugins.get(pluginId);if (plugin != null) {// 销毁插件plugin.destroy();// 移除插件plugins.remove(pluginId);}}public Plugin getPlugin(String pluginId) {return plugins.get(pluginId);}public List<Plugin> getPlugins() {return new ArrayList<>(plugins.values());}
}
6.3.2 扩展点设计
通过扩展点实现功能定制:
- 扩展点定义:定义系统的扩展点
- 扩展实现:通过插件实现扩展点
- 扩展点管理:管理扩展点的注册和调用
public class ExtensionPointManager {private Map<String, List<Extension>> extensionPoints = new HashMap<>();public void registerExtension(String extensionPoint, Extension extension) {extensionPoints.computeIfAbsent(extensionPoint, k -> new ArrayList<>()).add(extension);}public void unregisterExtension(String extensionPoint, Extension extension) {List<Extension> extensions = extensionPoints.get(extensionPoint);if (extensions != null) {extensions.remove(extension);}}public List<Extension> getExtensions(String extensionPoint) {return extensionPoints.getOrDefault(extensionPoint, Collections.emptyList());}public <T> T invokeExtension(String extensionPoint, ExtensionContext context) {List<Extension> extensions = getExtensions(extensionPoint);if (extensions.isEmpty()) {return null;}// 调用扩展点for (Extension extension : extensions) {if (extension.supports(context)) {return (T) extension.execute(context);}}return null;}
}
7. 安全性考虑
7.1 认证与授权
7.1.1 用户认证
实现用户认证机制:
- 用户名密码认证:基本的用户名密码认证
- 多因素认证:支持多因素认证
- OAuth 认证:支持 OAuth 认证
- LDAP 认证:支持 LDAP 认证
public class AuthenticationManager {private List<AuthenticationProvider> providers = new ArrayList<>();public AuthenticationManager() {// 注册认证提供者providers.add(new UsernamePasswordAuthenticationProvider());providers.add(new OAuthAuthenticationProvider());providers.add(new LdapAuthenticationProvider());}public Authentication authenticate(Authentication authentication) throws AuthenticationException {// 遍历认证提供者for (AuthenticationProvider provider : providers) {if (provider.supports(authentication.getClass())) {try {// 尝试认证Authentication result = provider.authenticate(authentication);if (result != null && result.isAuthenticated()) {return result;}} catch (AuthenticationException e) {// 认证失败,继续尝试下一个提供者}}}// 所有提供者都认证失败throw new AuthenticationException("认证失败");}public void addAuthenticationProvider(AuthenticationProvider provider) {providers.add(provider);}
}
7.1.2 权限控制
实现权限控制机制:
- 角色权限:基于角色的权限控制
- 资源权限:基于资源的权限控制
- 操作权限:基于操作的权限控制
- 数据权限:基于数据的权限控制
public class AuthorizationManager {private PermissionRepository permissionRepository;public AuthorizationManager(PermissionRepository permissionRepository) {this.permissionRepository = permissionRepository;}public boolean hasPermission(Authentication authentication, Object resource, String operation) {// 获取用户角色Collection<? extends GrantedAuthority> authorities = authentication.getAuthorities();// 获取资源所需权限List<Permission> requiredPermissions = permissionRepository.getPermissions(resource, operation);// 检查是否有权限for (Permission permission : requiredPermissions) {boolean hasPermission = false;for (GrantedAuthority authority : authorities) {if (permission.matches(authority)) {hasPermission = true;break;}}if (!hasPermission) {return false;}}return true;}public void checkPermission(Authentication authentication, Object resource, String operation) throws AccessDeniedException {if (!hasPermission(authentication, resource, operation)) {throw new AccessDeniedException("没有权限");}}
}
7.2 数据安全
7.2.1 数据加密
实现数据加密机制:
- 传输加密:使用 HTTPS 加密传输数据
- 存储加密:加密敏感数据存储
- 密钥管理:安全管理加密密钥
public class EncryptionService {private KeyManager keyManager;public EncryptionService(KeyManager keyManager) {this.keyManager = keyManager;}public String encrypt(String plainText, String keyId) throws Exception {// 获取加密密钥Key key = keyManager.getKey(keyId);// 创建加密器Cipher cipher = Cipher.getInstance(key.getAlgorithm());cipher.init(Cipher.ENCRYPT_MODE, key);// 加密数据byte[] encryptedBytes = cipher.doFinal(plainText.getBytes(StandardCharsets.UTF_8));// 返回 Base64 编码的密文return Base64.getEncoder().encodeToString(encryptedBytes);}public String decrypt(String cipherText, String keyId) throws Exception {// 获取解密密钥Key key = keyManager.getKey(keyId);// 创建解密器Cipher cipher = Cipher.getInstance(key.getAlgorithm());cipher.init(Cipher.DECRYPT_MODE, key);// 解密数据byte[] encryptedBytes = Base64.getDecoder().decode(cipherText);byte[] decryptedBytes = cipher.doFinal(encryptedBytes);// 返回明文return new String(decryptedBytes, StandardCharsets.UTF_8);}
}
7.2.2 数据脱敏
实现数据脱敏机制:
- 字段脱敏:对敏感字段进行脱敏
- 日志脱敏:对日志中的敏感信息进行脱敏
- 结果脱敏:对查询结果进行脱敏
public class DataMaskingService {private Map<String, DataMasker> maskers = new HashMap<>();public DataMaskingService() {// 注册脱敏器maskers.put("phone", new PhoneNumberMasker());maskers.put("email", new EmailMasker());maskers.put("idcard", new IdCardMasker());maskers.put("name", new NameMasker());}public String mask(String data, String type) {DataMasker masker = maskers.get(type);if (masker != null) {return masker.mask(data);}return data;}public void registerMasker(String type, DataMasker masker) {maskers.put(type, masker);}public interface DataMasker {String mask(String data);}public static class PhoneNumberMasker implements DataMasker {@Overridepublic String mask(String data) {if (data == null || data.length() < 11) {return data;}return data.substring(0, 3) + "****" + data.substring(7);}}public static class EmailMasker implements DataMasker {@Overridepublic String mask(String data) {if (data == null || !data.contains("@")) {return data;}int atIndex = data.indexOf('@');String name = data.substring(0, atIndex);String domain = data.substring(atIndex);if (name.length() <= 2) {return name + "***" + domain;} else {return name.substring(0, 2) + "***" + domain;}}}
}
7.3 审计日志
7.3.1 操作审计
记录用户操作日志:
- 操作记录:记录用户的操作
- 操作结果:记录操作的结果
- 操作时间:记录操作的时间
- 操作来源:记录操作的来源
public class AuditLogService {private AuditLogRepository repository;public AuditLogService(AuditLogRepository repository) {this.repository = repository;}public void log(String userId, String operation, String resource, String result, String ip) {// 创建审计日志AuditLog log = new AuditLog();log.setUserId(userId);log.setOperation(operation);log.setResource(resource);log.setResult(result);log.setIp(ip);log.setTimestamp(new Date());// 保存审计日志repository.save(log);}public List<AuditLog> queryLogs(String userId, String operation, String resource, Date startTime, Date endTime) {// 查询审计日志return repository.query(userId, operation, resource, startTime, endTime);}
}
7.3.2 安全审计
记录安全相关的审计日志:
- 登录日志:记录用户登录情况
- 权限变更:记录权限变更情况
- 敏感操作:记录敏感操作情况
- 异常行为:记录异常行为情况
public class SecurityAuditService {private SecurityAuditRepository repository;public SecurityAuditService(SecurityAuditRepository repository) {this.repository = repository;}public void logLogin(String userId, String ip, boolean success, String failReason) {// 创建登录日志SecurityAudit log = new SecurityAudit();log.setType("LOGIN");log.setUserId(userId);log.setIp(ip);log.setSuccess(success);log.setDetail(success ? "登录成功" : "登录失败:" + failReason);log.setTimestamp(new Date());// 保存安全审计日志repository.save(log);}public void logPermissionChange(String userId, String targetUserId, String permission, String operation) {// 创建权限变更日志SecurityAudit log = new SecurityAudit();log.setType("PERMISSION");log.setUserId(userId);log.setTargetId(targetUserId);log.setDetail(String.format("权限变更:%s %s", operation, permission));log.setTimestamp(new Date());// 保存安全审计日志repository.save(log);}public void logSensitiveOperation(String userId, String operation, String resource) {// 创建敏感操作日志SecurityAudit log = new SecurityAudit();log.setType("SENSITIVE");log.setUserId(userId);log.setDetail(String.format("敏感操作:%s %s", operation, resource));log.setTimestamp(new Date());// 保存安全审计日志repository.save(log);}
}
8. 监控和告警系统
8.1 监控指标
8.1.1 系统指标
监控系统级别的指标:
- CPU 使用率:监控 CPU 使用情况
- 内存使用率:监控内存使用情况
- 磁盘使用率:监控磁盘使用情况
- 网络流量:监控网络流量情况
- 系统负载:监控系统负载情况
public class SystemMetricsCollector {private MetricsRegistry registry;public SystemMetricsCollector(MetricsRegistry registry) {this.registry = registry;}public void start() {// 注册 CPU 使用率指标registry.gauge("system.cpu.usage", this::getCpuUsage);// 注册内存使用率指标registry.gauge("system.memory.usage", this::getMemoryUsage);// 注册磁盘使用率指标registry.gauge("system.disk.usage", this::getDiskUsage);// 注册网络流量指标registry.gauge("system.network.rx", this::getNetworkRx);registry.gauge("system.network.tx", this::getNetworkTx);// 注册系统负载指标registry.gauge("system.load.1m", this::getLoad1m);registry.gauge("system.load.5m", this::getLoad5m);registry.gauge("system.load.15m", this::getLoad15m);}private double getCpuUsage() {// 获取 CPU 使用率OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();return ((com.sun.management.OperatingSystemMXBean) osBean).getSystemCpuLoad();}private double getMemoryUsage() {// 获取内存使用率MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();MemoryUsage heapMemoryUsage = memoryBean.getHeapMemoryUsage();return (double) heapMemoryUsage.getUsed() / heapMemoryUsage.getMax();}private double getDiskUsage() {// 获取磁盘使用率File file = new File("/");return (double) (file.getTotalSpace() - file.getFreeSpace()) / file.getTotalSpace();}private long getNetworkRx() {// 获取网络接收流量return 0;}private long getNetworkTx() {// 获取网络发送流量return 0;}private double getLoad1m() {// 获取 1 分钟负载OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();return osBean.getSystemLoadAverage();}private double getLoad5m() {// 获取 5 分钟负载return 0;}private double getLoad15m() {// 获取 15 分钟负载return 0;}
}
8.1.2 业务指标
监控业务级别的指标:
- 任务提交量:监控任务提交情况
- 任务执行量:监控任务执行情况
- 任务成功率:监控任务成功率
- 任务执行时间:监控任务执行时间
- 任务等待时间:监控任务等待时间
public class BusinessMetricsCollector {private MetricsRegistry registry;private TaskStorage taskStorage;public BusinessMetricsCollector(MetricsRegistry registry, TaskStorage taskStorage) {this.registry = registry;this.taskStorage = taskStorage;}public void start() {// 注册任务提交量指标registry.counter("task.submit.count");// 注册任务执行量指标registry.counter("task.execute.count");// 注册任务成功率指标registry.gauge("task.success.rate", this::getTaskSuccessRate);// 注册任务执行时间指标registry.histogram("task.execute.time");// 注册任务等待时间指标registry.histogram("task.wait.time");}public void recordTaskSubmit() {// 记录任务提交registry.counter("task.submit.count").increment();}public void recordTaskExecute() {// 记录任务执行registry.counter("task.execute.count").increment();}public void recordTaskExecuteTime(long time) {// 记录任务执行时间registry.histogram("task.execute.time").update(time);}public void recordTaskWaitTime(long time) {// 记录任务等待时间registry.histogram("task.wait.time").update(time);}private double getTaskSuccessRate() {// 获取任务成功率long totalCount = taskStorage.getTaskCount(null);long successCount = taskStorage.getTaskCount(TaskStatus.COMPLETED);if (totalCount == 0) {return 1.0;}return (double) successCount / totalCount;}
}
8.2 监控系统
8.2.1 数据采集
采集监控数据:
- Agent 采集:通过 Agent 采集数据
- Push 模式:主动推送数据
- Pull 模式:被动拉取数据
- 日志采集:采集日志数据
public class MetricsCollector {private MetricsRegistry registry;private MetricsExporter exporter;private ScheduledExecutorService scheduler;public MetricsCollector(MetricsRegistry registry, MetricsExporter exporter) {this.registry = registry;this.exporter = exporter;this.scheduler = Executors.newScheduledThreadPool(1);}public void start() {// 定时采集数据scheduler.scheduleAtFixedRate(this::collectAndExport, 0, 10, TimeUnit.SECONDS);}private void collectAndExport() {try {// 采集数据Map<String, Metric> metrics = registry.getMetrics();// 导出数据exporter.export(metrics);} catch (Exception e) {logger.error("采集数据失败", e);}}public void stop() {scheduler.shutdown();}
}
8.2.2 数据存储
存储监控数据:
- 时序数据库:使用时序数据库存储数据
- 数据压缩:压缩存储数据
- 数据分片:分片存储数据
- 数据过期:自动过期数据
public class TimeSeriesStorage {private InfluxDB influxDB;private String database;public TimeSeriesStorage(String url, String username, String password, String database) {this.database = database;// 连接 InfluxDBinfluxDB = InfluxDBFactory.connect(url, username, password);// 创建数据库if (!influxDB.databaseExists(database)) {influxDB.createDatabase(database);}// 设置保留策略influxDB.query(new Query("CREATE RETENTION POLICY \"30d\" ON \"" + database + "\" DURATION 30d REPLICATION 1 DEFAULT"));// 设置批量写入influxDB.enableBatch(BatchOptions.DEFAULTS);// 设置数据库influxDB.setDatabase(database);}public void write(String measurement, Map<String, String> tags, Map<String, Object> fields) {// 创建数据点Point.Builder builder = Point.measurement(measurement);// 添加标签for (Map.Entry<String, String> entry : tags.entrySet()) {builder.tag(entry.getKey(), entry.getValue());}// 添加字段for (Map.Entry<String, Object> entry : fields.entrySet()) {Object value = entry.getValue();if (value instanceof Number) {builder.addField(entry.getKey(), (Number) value);} else if (value instanceof Boolean) {builder.addField(entry.getKey(), (Boolean) value);} else {builder.addField(entry.getKey(), value.toString());}}// 写入数据influxDB.write(builder.build());}public QueryResult query(String query) {// 查询数据return influxDB.query(new Query(query, database));}public void close() {influxDB.close();}
}
8.2.3 数据可视化
可视化监控数据:
- 仪表盘:展示关键指标
- 趋势图:展示指标趋势
- 热力图:展示指标分布
- 告警面板:展示告警信息
public class DashboardService {private GrafanaClient grafanaClient;public DashboardService(String url, String apiKey) {grafanaClient = new GrafanaClient(url, apiKey);}public void createDashboard(String title, List<Panel> panels) {// 创建仪表盘Dashboard dashboard = new Dashboard();dashboard.setTitle(title);// 添加面板for (Panel panel : panels) {dashboard.addPanel(panel);}// 保存仪表盘grafanaClient.saveDashboard(dashboard);}public void createSystemDashboard() {// 创建系统监控仪表盘List<Panel> panels = new ArrayList<>();// 添加 CPU 使用率面板panels.add(createTimeSeriesPanel("CPU 使用率", "system.cpu.usage", "percent"));// 添加内存使用率面板panels.add(createTimeSeriesPanel("内存使用率", "system.memory.usage", "percent"));// 添加磁盘使用率面板panels.add(createTimeSeriesPanel("磁盘使用率", "system.disk.usage", "percent"));// 添加网络流量面板panels.add(createTimeSeriesPanel("网络流量", "system.network.rx, system.network.tx", "bytes"));// 添加系统负载面板panels.add(createTimeSeriesPanel("系统负载", "system.load.1m, system.load.5m, system.load.15m", "short"));// 创建仪表盘createDashboard("系统监控", panels);}public void createBusinessDashboard() {// 创建业务监控仪表盘List<Panel> panels = new ArrayList<>();// 添加任务提交量面板panels.add(createTimeSeriesPanel("任务提交量", "task.submit.count", "short"));// 添加任务执行量面板panels.add(createTimeSeriesPanel("任务执行量", "task.execute.count", "short"));// 添加任务成功率面板panels.add(createGaugePanel("任务成功率", "task.success.rate", "percent"));// 添加任务执行时间面板panels.add(createTimeSeriesPanel("任务执行时间", "task.execute.time", "ms"));// 添加任务等待时间面板panels.add(createTimeSeriesPanel("任务等待时间", "task.wait.time", "ms"));// 创建仪表盘createDashboard("业务监控", panels);}private Panel createTimeSeriesPanel(String title, String metrics, String unit) {// 创建时序图面板Panel panel = new Panel();panel.setTitle(title);panel.setType("timeseries");panel.setUnit(unit);panel.setMetrics(metrics);return panel;}private Panel createGaugePanel(String title, String metrics, String unit) {// 创建仪表盘面板Panel panel = new Panel();panel.setTitle(title);panel.setType("gauge");panel.setUnit(unit);panel.setMetrics(metrics);return panel;}
}
8.3 告警系统
8.3.1 告警规则
定义告警规则:
- 阈值告警:基于阈值的告警
- 趋势告警:基于趋势的告警
- 异常告警:基于异常检测的告警
- 复合告警:基于多个条件的告警
public class AlertRuleManager {private List<AlertRule> rules = new ArrayList<>();private AlertStorage alertStorage;public AlertRuleManager(AlertStorage alertStorage) {this.alertStorage = alertStorage;}public void addRule(AlertRule rule) {rules.add(rule);}public void removeRule(String ruleId) {rules.removeIf(rule -> rule.getId().equals(ruleId));}public List<AlertRule> getRules() {return new ArrayList<>(rules);}public void evaluate(Map<String, Metric> metrics) {// 评估所有规则for (AlertRule rule : rules) {try {// 评估规则boolean triggered = rule.evaluate(metrics);// 获取规则状态AlertStatus status = alertStorage.getAlertStatus(rule.getId());if (triggered && status != AlertStatus.FIRING) {// 触发告警fireAlert(rule);} else if (!triggered && status == AlertStatus.FIRING) {// 恢复告警resolveAlert(rule);}} catch (Exception e) {logger.error("评估规则失败: " + rule.getId(), e);}}}private void fireAlert(AlertRule rule) {// 创建告警Alert alert = new Alert();alert.setRuleId(rule.getId());alert.setName(rule.getName());alert.setSeverity(rule.getSeverity());alert.setMessage(rule.getMessage());alert.setStatus(AlertStatus.FIRING);alert.setStartTime(new Date());// 保存告警alertStorage.saveAlert(alert);// 发送通知sendNotification(alert);}private void resolveAlert(AlertRule rule) {// 获取告警Alert alert = alertStorage.getAlert(rule.getId());if (alert != null) {// 更新告警状态alert.setStatus(AlertStatus.RESOLVED);alert.setEndTime(new Date());// 保存告警alertStorage.saveAlert(alert);// 发送恢复通知sendRecoveryNotification(alert);}}private void sendNotification(Alert alert) {// 发送告警通知}private void sendRecoveryNotification(Alert alert) {// 发送恢复通知}
}
8.3.2 告警通知
发送告警通知:
- 邮件通知:通过邮件发送通知
- 短信通知:通过短信发送通知
- 钉钉通知:通过钉钉发送通知
- Webhook 通知:通过 Webhook 发送通知
public class NotificationService {private List<NotificationChannel> channels = new ArrayList<>();public NotificationService() {// 注册通知渠道channels.add(new EmailNotificationChannel());channels.add(new SmsNotificationChannel());channels.add(new DingTalkNotificationChannel());channels.add(new WebhookNotificationChannel());}public void sendNotification(Alert alert, List<String> receivers) {// 创建通知Notification notification = new Notification();notification.setTitle("告警: " + alert.getName());notification.setContent(alert.getMessage());notification.setSeverity(alert.getSeverity());notification.setTime(new Date());notification.setReceivers(receivers);// 发送通知for (NotificationChannel channel : channels) {try {channel.send(notification);} catch (Exception e) {logger.error("发送通知失败: " + channel.getName(), e);}}}public void sendRecoveryNotification(Alert alert, List<String> receivers) {// 创建恢复通知Notification notification = new Notification();notification.setTitle("恢复: " + alert.getName());notification.setContent("告警已恢复: " + alert.getMessage());notification.setSeverity(alert.getSeverity());notification.setTime(new Date());notification.setReceivers(receivers);// 发送通知for (NotificationChannel channel : channels) {try {channel.send(notification);} catch (Exception e) {logger.error("发送恢复通知失败: " + channel.getName(), e);}}}public void addChannel(NotificationChannel channel) {channels.add(channel);}
}
8.3.3 告警处理
处理告警:
- 告警确认:确认告警
- 告警升级:升级告警
- 告警抑制:抑制告警
- 告警静默:静默告警
public class AlertManager {private AlertStorage alertStorage;private NotificationService notificationService;public AlertManager(AlertStorage alertStorage, NotificationService notificationService) {this.alertStorage = alertStorage;this.notificationService = notificationService;}public void acknowledgeAlert(String alertId, String userId, String comment) {// 获取告警Alert alert = alertStorage.getAlert(alertId);if (alert != null) {// 更新告警状态alert.setStatus(AlertStatus.ACKNOWLEDGED);alert.setAcknowledgedBy(userId);alert.setAcknowledgedTime(new Date());alert.setComment(comment);// 保存告警alertStorage.saveAlert(alert);}}public void escalateAlert(String alertId, String userId, String reason) {// 获取告警Alert alert = alertStorage.getAlert(alertId);if (alert != null) {// 升级告警AlertSeverity newSeverity = escalateSeverity(alert.getSeverity());alert.setSeverity(newSeverity);alert.setEscalatedBy(userId);alert.setEscalatedTime(new Date());alert.setEscalationReason(reason);// 保存告警alertStorage.saveAlert(alert);// 发送升级通知List<String> receivers = getEscalationReceivers(newSeverity);notificationService.sendNotification(alert, receivers);}}public void suppressAlert(String alertId, String userId, Date until, String reason) {// 获取告警Alert alert = alertStorage.getAlert(alertId);if (alert != null) {// 抑制告警alert.setStatus(AlertStatus.SUPPRESSED);alert.setSuppressedBy(userId);alert.setSuppressedTime(new Date());alert.setSuppressedUntil(until);alert.setSuppressReason(reason);// 保存告警alertStorage.saveAlert(alert);}}public void silenceAlert(String ruleId, String userId, Date until, String reason) {// 创建静默规则Silence silence = new Silence();silence.setRuleId(ruleId);silence.setCreatedBy(userId);silence.setCreatedTime(new Date());silence.setUntil(until);silence.setReason(reason);// 保存静默规则alertStorage.saveSilence(silence);}private AlertSeverity escalateSeverity(AlertSeverity severity) {// 升级告警级别switch (severity) {case INFO:return AlertSeverity.WARNING;case WARNING:return AlertSeverity.ERROR;case ERROR:return AlertSeverity.CRITICAL;default:return severity;}}private List<String> getEscalationReceivers(AlertSeverity severity) {// 获取升级接收人switch (severity) {case WARNING:return Arrays.asList("team-leader");case ERROR:return Arrays.asList("department-manager");case CRITICAL:return Arrays.asList("cto", "ceo");default:return Arrays.asList("developer");}}
}
9. 部署架构
9.1 物理部署
9.1.1 单机部署
适用于小规模场景的单机部署:
- 单机模式:所有组件部署在一台机器上
- 伪分布式模式:所有组件部署在一台机器上,但是以分布式方式运行
+----------------------------------+
| 单机部署 |
+----------------------------------+
| |
| +------------+ +------------+ |
| | 调度中心 | | 执行节点 | |
| +------------+ +------------+ |
| |
| +------------+ +------------+ |
| | 注册中心 | | 存储服务 | |
| +------------+ +------------+ |
| |
| +------------+ +------------+ |
| | 监控系统 | | 告警系统 | |
| +------------+ +------------+ |
| |
+----------------------------------+
9.1.2 集群部署
适用于大规模场景的集群部署:
- 调度中心集群:部署多个调度中心节点
- 执行节点集群:部署多个执行节点
- 注册中心集群:部署多个注册中心节点
- 存储服务集群:部署多个存储服务节点
+------------------------------------------------------------------+
| 集群部署 |
+------------------------------------------------------------------+
| |
| +------------+ +------------+ +------------+ |
| | 调度中心1 | | 调度中心2 | | 调度中心3 | |
| +------------+ +------------+ +------------+ |
| |
| +------------+ +------------+ +------------+ +------------+ |
| | 执行节点1 | | 执行节点2 | | 执行节点3 | | 执行节点4 | |
| +------------+ +------------+ +------------+ +------------+ |
| |
| +------------+ +------------+ +------------+ |
| | 注册中心1 | | 注册中心2 | | 注册中心3 | |
| +------------+ +------------+ +------------+ |
| |
| +------------+ +------------+ +------------+ |
| | 存储服务1 | | 存储服务2 | | 存储服务3 | |
| +------------+ +------------+ +------------+ |
| |
| +------------+ +------------+ |
| | 监控系统 | | 告警系统 | |
| +------------+ +------------+ |
| |
+------------------------------------------------------------------+
9.1.3 多数据中心部署
适用于异地多活场景的多数据中心部署:
- 主数据中心:部署完整的系统
- 备数据中心:部署完整的系统
- 数据同步:数据中心之间进行数据同步
- 流量调度:根据策略调度流量
+------------------------------------------------------------------+
| 多数据中心部署 |
+------------------------------------------------------------------+
| |
| +-------------------------+ +-------------------------+ |
| | 数据中心 A | | 数据中心 B | |
| | | | | |
| | +------------------+ | | +------------------+ | |
| | | 调度中心集群 | | | | 调度中心集群 | | |
| | +------------------+ | | +------------------+ | |
| | | | | |
| | +------------------+ | | +------------------+ | |
| | | 执行节点集群 | | | | 执行节点集群 | | |
| | +------------------+ | | +------------------+ | |
| | | | | |
| | +------------------+ | | +------------------+ | |
| | | 存储服务集群 | |<---->| | 存储服务集群 | | |
| | +------------------+ | | +------------------+ | |
| | | | | |
| +-------------------------+ +-------------------------+ |
| |
| +--------------------------------------------------+ |
| | 全局负载均衡 | |
| +--------------------------------------------------+ |
| |
+------------------------------------------------------------------+
9.2 容器化部署
9.2.1 Docker 部署
使用 Docker 容器化部署:
- 组件容器化:将各个组件打包为 Docker 镜像
- 容器编排:使用 Docker Compose 编排容器
- 容器网络:配置容器网络
- 容器存储:配置容器存储
version: '3'services:# 注册中心zookeeper:image: zookeeper:3.6ports:- "2181:2181"volumes:- zookeeper_data:/dataenvironment:ZOO_MY_ID: 1ZOO_SERVERS: server.1=zookeeper:2888:3888;2181# 消息队列kafka:image: wurstmeister/kafka:2.13-2.7.0ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: kafkaKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_CREATE_TOPICS: "task-events:3:1,task-results:3:1"volumes:- kafka_data:/var/lib/kafka/datadepends_on:- zookeeper# 数据库mysql:image: mysql:8.0ports:- "3306:3306"environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: gridjobvolumes:- mysql_data:/var/lib/mysql- ./sql:/docker-entrypoint-initdb.d# Redisredis:image: redis:6.2ports:- "6379:6379"volumes:- redis_data:/data# 调度中心scheduler:image: gridjob/scheduler:1.0ports:- "8080:8080"environment:SPRING_PROFILES_ACTIVE: dockerZOOKEEPER_CONNECT: zookeeper:2181KAFKA_BOOTSTRAP_SERVERS: kafka:9092MYSQL_URL: jdbc:mysql://mysql:3306/gridjobMYSQL_USERNAME: rootMYSQL_PASSWORD: rootREDIS_HOST: redisREDIS_PORT: 6379depends_on:- zookeeper- kafka- mysql- redis# 执行节点executor:image: gridjob/executor:1.0environment:SPRING_PROFILES_ACTIVE: dockerZOOKEEPER_CONNECT: zookeeper:2181KAFKA_BOOTSTRAP_SERVERS: kafka:9092SCHEDULER_URL: http://scheduler:8080depends_on:- scheduler# 监控系统prometheus:image: prom/prometheus:v2.30.0ports:- "9090:9090"volumes:- ./prometheus.yml:/etc/prometheus/prometheus.yml- prometheus_data:/prometheusdepends_on:- scheduler- executor# 可视化系统grafana:image: grafana/grafana:8.2.0ports:- "3000:3000"environment:GF_SECURITY_ADMIN_PASSWORD: adminvolumes:- grafana_data:/var/lib/grafanadepends_on:- prometheusvolumes:zookeeper_data:kafka_data:mysql_data:redis_data:prometheus_data:grafana_data:
9.2.2 Kubernetes 部署
使用 Kubernetes 容器编排部署:
- Pod 定义:定义各个组件的 Pod
- Service 定义:定义各个组件的 Service
- Deployment 定义:定义各个组件的 Deployment
- StatefulSet 定义:定义有状态组件的 StatefulSet
- ConfigMap 定义:定义配置信息
- Secret 定义:定义敏感信息
- PersistentVolume 定义:定义持久化存储
# 命名空间
apiVersion: v1
kind: Namespace
metadata:name: gridjob---
# 配置信息
apiVersion: v1
kind: ConfigMap
metadata:name: gridjob-confignamespace: gridjob
data:application.yml: |spring:profiles:active: k8sgridjob:zookeeper:connect: zookeeper:2181kafka:bootstrap-servers: kafka:9092mysql:url: jdbc:mysql://mysql:3306/gridjobusername: rootredis:host: redisport: 6379---
# 敏感信息
apiVersion: v1
kind: Secret
metadata:name: gridjob-secretnamespace: gridjob
type: Opaque
data:mysql-password: cm9vdA== # root---
# 调度中心 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:name: schedulernamespace: gridjob
spec:replicas: 3selector:matchLabels:app: schedulertemplate:metadata:labels:app: schedulerspec:containers:- name: schedulerimage: gridjob/scheduler:1.0ports:- containerPort: 8080env:- name: SPRING_PROFILES_ACTIVEvalue: k8s- name: MYSQL_PASSWORDvalueFrom:secretKeyRef:name: gridjob-secretkey: mysql-passwordvolumeMounts:- name: config-volumemountPath: /app/configreadinessProbe:httpGet:path: /actuator/healthport: 8080initialDelaySeconds: 30periodSeconds: 10livenessProbe:httpGet:path: /actuator/healthport: 8080initialDelaySeconds: 60periodSeconds: 20volumes:- name: config-volumeconfigMap:name: gridjob-config---
# 调度中心 Service
apiVersion: v1
kind: Service
metadata:name: schedulernamespace: gridjob
spec:selector:app: schedulerports:- port: 8080targetPort: 8080type: ClusterIP---
# 执行节点 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:name: executornamespace: gridjob
spec:replicas: 5selector:matchLabels:app: executortemplate:metadata:labels:app: executorspec:containers:- name: executorimage: gridjob/executor:1.0env:- name: SPRING_PROFILES_ACTIVEvalue: k8s- name: SCHEDULER_URLvalue: http://scheduler:8080volumeMounts:- name: config-volumemountPath: /app/configreadinessProbe:httpGet:path: /actuator/healthport: 8081initialDelaySeconds: 30periodSeconds: 10livenessProbe:httpGet:path: /actuator/healthport: 8081initialDelaySeconds: 60periodSeconds: 20volumes:- name: config-volumeconfigMap:name: gridjob-config---
# ZooKeeper StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zookeepernamespace: gridjob
spec:serviceName: zookeeperreplicas: 3selector:matchLabels:app: zookeepertemplate:metadata:labels:app: zookeeperspec:containers:- name: zookeeperimage: zookeeper:3.6ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electionenv:- name: ZOO_MY_IDvalueFrom:fieldRef:fieldPath: metadata.name- name: ZOO_SERVERSvalue: "server.0=zookeeper-0.zookeeper:2888:3888 server.1=zookeeper-1.zookeeper:2888:3888 server.2=zookeeper-2.zookeeper:2888:3888"volumeMounts:- name: datamountPath: /datavolumeClaimTemplates:- metadata:name: dataspec:accessModes: [ "ReadWriteOnce" ]resources:requests:storage: 10Gi---
# ZooKeeper Service
apiVersion: v1
kind: Service
metadata:name: zookeepernamespace: gridjob
spec:selector:app: zookeeperports:- port: 2181name: client- port: 2888name: server- port: 3888name: leader-electionclusterIP: None
9.3 混合云部署
9.3.1 公有云部署
在公有云上部署系统:
- 云服务器:使用云服务器部署系统
- 云数据库:使用云数据库存储数据
- 云消息队列:使用云消息队列传递消息
- 云存储:使用云存储存储数据
9.3.2 私有云部署
在私有云上部署系统:
- 私有云平台:使用私有云平台部署系统
- 私有云资源:使用私有云资源运行系统
- 私有云管理:使用私有云管理工具管理系统
9.3.3 混合云部署
结合公有云和私有云部署系统:
- 核心组件:在私有云上部署核心组件
- 弹性组件:在公有云上部署弹性组件
- 数据存储:在私有云上存储核心数据,在公有云上存储非核心数据
- 混合云管理:使用混合云管理工具管理系统
10. 性能优化策略
10.1 调度性能优化
10.1.1 调度算法优化
优化调度算法:
- 批量调度:批量处理任务调度
- 预调度:提前调度任务
- 缓存优化:缓存调度结果
- 索引优化:优化调度索引
public class BatchTaskScheduler implements TaskScheduler {private static final int BATCH_SIZE = 100;private Queue<Task> taskQueue = new PriorityQueue<>((t1, t2) -> {// 优先级比较int priorityCompare = t1.getPriority().compareTo(t2.getPriority());if (priorityCompare != 0) {return priorityCompare;}// 优先级相同,按照创建时间排序return t1.getCreatedTime().compareTo(t2.getCreatedTime());});@Overridepublic void addTask(Task task) {taskQueue.offer(task);}@Overridepublic List<Task> nextBatch() {List<Task> batch = new ArrayList<>();// 获取一批任务for (int i = 0; i < BATCH_SIZE && !taskQueue.isEmpty(); i++) {batch.add(taskQueue.poll());}return batch;}@Overridepublic boolean hasNext() {return !taskQueue.isEmpty();}
}
10.1.2 调度并发优化
优化调度并发:
- 多线程调度:使用多线程进行调度
- 线程池优化:优化线程池参数
- 异步调度:使用异步方式进行调度
- 并行调度:并行处理多个调度请求
public class ConcurrentTaskScheduler implements TaskScheduler {private Queue<Task> taskQueue = new ConcurrentLinkedQueue<>();private ExecutorService executorService;public ConcurrentTaskScheduler(int threadCount) {executorService = Executors.newFixedThreadPool(threadCount);}@Overridepublic void addTask(Task task) {taskQueue.offer(task);}@Overridepublic void scheduleTasks() {// 获取所有任务List<Task> tasks = new ArrayList<>();Task task;while ((task = taskQueue.poll()) != null) {tasks.add(task);}// 并行调度任务List<CompletableFuture<Void>> futures = new ArrayList<>();for (Task t : tasks) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {// 调度任务scheduleTask(t);} catch (Exception e) {logger.error("调度任务失败: " + t.getTaskId(), e);}}, executorService);futures.add(future);}// 等待所有任务调度完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();}private void scheduleTask(Task task) {// 调度单个任务}@Overridepublic boolean hasNext() {return !taskQueue.isEmpty();}public void shutdown() {executorService.shutdown();}
}
10.2 执行性能优化
10.2.1 任务执行优化
优化任务执行:
- 资源隔离:隔离任务执行资源
- 任务分组:按照类型分组任务
- 任务合并:合并相似任务
- 任务拆分:拆分大型任务
public class TaskOptimizer {public List<Task> optimizeTasks(List<Task> tasks) {// 按照类型分组任务Map<String, List<Task>> taskGroups = tasks.stream().collect(Collectors.groupingBy(Task::getType));// 优化后的任务列表List<Task> optimizedTasks = new ArrayList<>();// 处理每个任务组for (Map.Entry<String, List<Task>> entry : taskGroups.entrySet()) {String type = entry.getKey();List<Task> groupTasks = entry.getValue();// 根据任务类型选择优化策略switch (type) {case "BATCH":// 合并批处理任务optimizedTasks.addAll(mergeBatchTasks(groupTasks));break;case "LARGE":// 拆分大型任务optimizedTasks.addAll(splitLargeTasks(groupTasks));break;default:// 其他类型任务不做优化optimizedTasks.addAll(groupTasks);break;}}return optimizedTasks;}private List<Task> mergeBatchTasks(List<Task> tasks) {// 合并批处理任务if (tasks.size() <= 1) {return tasks;}// 按照参数相似度分组Map<String, List<Task>> paramGroups = new HashMap<>();for (Task task : tasks) {// 计算参数签名String paramSignature = calculateParamSignature(task.getParams());paramGroups.computeIfAbsent(paramSignature, k -> new ArrayList<>()).add(task);}// 合并每个参数组的任务List<Task> mergedTasks = new ArrayList<>();for (List<Task> groupTasks : paramGroups.values()) {if (groupTasks.size() == 1) {// 只有一个任务,不需要合并mergedTasks.add(groupTasks.get(0));} else {// 合并多个任务Task mergedTask = mergeTasks(groupTasks);mergedTasks.add(mergedTask);}}return mergedTasks;}private String calculateParamSignature(Map<String, Object> params) {// 计算参数签名// 这里简化处理,实际应该根据参数内容生成唯一签名return params.keySet().stream().sorted().collect(Collectors.joining(","));}private Task mergeTasks(List<Task> tasks) {// 合并多个任务Task firstTask = tasks.get(0);// 创建合并后的任务Task mergedTask = new Task();mergedTask.setTaskId(generateMergedTaskId(tasks));mergedTask.setName("Merged: " + firstTask.getName());mergedTask.setType(firstTask.getType());// 合并参数Map<String, Object> mergedParams = new HashMap<>(firstTask.getParams());mergedParams.put("mergedTaskIds", tasks.stream().map(Task::getTaskId).collect(Collectors.toList()));mergedTask.setParams(mergedParams);// 设置优先级为最高优先级TaskPriority highestPriority = tasks.stream().map(Task::getPriority).min(Comparator.naturalOrder()).orElse(TaskPriority.NORMAL);mergedTask.setPriority(highestPriority);// 设置创建时间为最早创建时间Date earliestTime = tasks.stream().map(Task::getCreatedTime).min(Comparator.naturalOrder()).orElse(new Date());mergedTask.setCreatedTime(earliestTime);return mergedTask;}private String generateMergedTaskId(List<Task> tasks) {// 生成合并任务IDreturn "merged-" + UUID.randomUUID().toString();}private List<Task> splitLargeTasks(List<Task> tasks) {// 拆分大型任务List<Task> splitTasks = new ArrayList<>();for (Task task : tasks) {// 检查任务是否需要拆分if (isLargeTask(task)) {// 拆分任务splitTasks.addAll(splitTask(task));} else {// 不需要拆分splitTasks.add(task);}}return splitTasks;}private boolean isLargeTask(Task task) {// 判断任务是否是大型任务// 这里简化处理,实际应该根据任务参数和资源需求判断Map<String, Object> params = task.getParams();if (params.containsKey("dataSize")) {Object dataSize = params.get("dataSize");if (dataSize instanceof Number) {return ((Number) dataSize).longValue() > 1000000; // 大于 1M 的数据}}return false;}private List<Task> splitTask(Task task) {// 拆分任务List<Task> splitTasks = new ArrayList<>();// 获取数据大小long dataSize = ((Number) task.getParams().get("dataSize")).longValue();// 计算拆分数量int splitCount = (int) Math.ceil(dataSize / 1000000.0); // 每 1M 数据一个任务// 拆分任务for (int i = 0; i < splitCount; i++) {Task splitTask = new Task();splitTask.setTaskId(task.getTaskId() + "-" + i);splitTask.setName(task.getName() + " (Part " + (i + 1) + "/" + splitCount + ")");splitTask.setType(task.getType());// 设置参数Map<String, Object> params = new HashMap<>(task.getParams());params.put("partIndex", i);params.put("partCount", splitCount);params.put("originalTaskId", task.getTaskId());splitTask.setParams(params);// 设置优先级和创建时间splitTask.setPriority(task.getPriority());splitTask.setCreatedTime(task.getCreatedTime());splitTasks.add(splitTask);}return splitTasks;}
}
10.2.2 资源利用优化
优化资源利用:
- 资源预分配:提前分配资源
- 资源池化:使用资源池管理资源
- 资源弹性:根据负载弹性调整资源
- 资源回收:及时回收空闲资源
public class ResourcePoolManager {private Map<String, ResourcePool> resourcePools = new HashMap<>();public ResourcePoolManager() {// 初始化资源池resourcePools.put("cpu", new ResourcePool("cpu", 100));resourcePools.put("memory", new ResourcePool("memory", 1024 * 1024 * 1024)); // 1GBresourcePools.put("disk", new ResourcePool("disk", 10 * 1024 * 1024 * 1024)); // 10GBresourcePools.put("network", new ResourcePool("network", 100 * 1024 * 1024)); // 100MB}public boolean allocateResource(String taskId, Map<String, Object> resourceRequirements) {// 检查资源是否足够for (Map.Entry<String, Object> entry : resourceRequirements.entrySet()) {String resourceName = entry.getKey();Object requiredValue = entry.getValue();ResourcePool pool = resourcePools.get(resourceName);if (pool == null) {return false;}if (requiredValue instanceof Number) {long required = ((Number) requiredValue).longValue();if (!pool.canAllocate(required)) {return false;}}}// 分配资源for (Map.Entry<String, Object> entry : resourceRequirements.entrySet()) {String resourceName = entry.getKey();Object requiredValue = entry.getValue();ResourcePool pool = resourcePools.get(resourceName);if (requiredValue instanceof Number) {long required = ((Number) requiredValue).longValue();pool.allocate(taskId, required);}}return true;}public void releaseResource(String taskId) {// 释放资源for (ResourcePool pool : resourcePools.values()) {pool.release(taskId);}}public void expandResourcePool(String resourceName, long amount) {// 扩展资源池ResourcePool pool = resourcePools.get(resourceName);if (pool != null) {pool.expand(amount);}}public void shrinkResourcePool(String resourceName, long amount) {// 收缩资源池ResourcePool pool = resourcePools.get(resourceName);if (pool != null) {pool.shrink(amount);}}public Map<String, Long> getResourceUsage() {// 获取资源使用情况Map<String, Long> usage = new HashMap<>();for (Map.Entry<String, ResourcePool> entry : resourcePools.entrySet()) {String resourceName = entry.getKey();ResourcePool pool = entry.getValue();usage.put(resourceName, pool.getUsed());}return usage;}public static class ResourcePool {private String name;private long total;private long used;private Map<String, Long> allocations = new HashMap<>();public ResourcePool(String name, long total) {this.name = name;this.total = total;this.used = 0;}public boolean canAllocate(long amount) {return used + amount <= total;}public void allocate(String taskId, long amount) {if (!canAllocate(amount)) {throw new IllegalStateException("资源不足");}allocations.put(taskId, amount);used += amount;}public void release(String taskId) {Long amount = allocations.remove(taskId);if (amount != null) {used -= amount;}}public void expand(long amount) {total += amount;}public void shrink(long amount) {if (total - amount < used) {throw new IllegalStateException("资源正在使用中,无法收缩");}total -= amount;}public long getTotal() {return total;}public long getUsed() {return used;}public long getFree() {return total - used;}}
}
10.3 存储性能优化
10.3.1 数据库优化
优化数据库:
- 索引优化:优化数据库索引
- 查询优化:优化 SQL 查询
- 分库分表:实现分库分表
- 读写分离:实现读写分离
public class DatabaseOptimizer {private DataSource dataSource;public DatabaseOptimizer(DataSource dataSource) {this.dataSource = dataSource;}public void optimizeIndexes() {try (Connection conn = dataSource.getConnection();Statement stmt = conn.createStatement()) {// 分析表stmt.executeUpdate("ANALYZE TABLE task");stmt.executeUpdate("ANALYZE TABLE task_instance");stmt.executeUpdate("ANALYZE TABLE executor_node");// 检查索引使用情况try (ResultSet rs = stmt.executeQuery("SHOW INDEX FROM task")) {// 处理索引信息}// 添加缺失的索引stmt.executeUpdate("CREATE INDEX IF NOT EXISTS idx_task_status ON task(status)");stmt.executeUpdate("CREATE INDEX IF NOT EXISTS idx_task_priority ON task(priority)");stmt.executeUpdate("CREATE INDEX IF NOT EXISTS idx_task_instance_status ON task_instance(status)");stmt.executeUpdate("CREATE INDEX IF NOT EXISTS idx_task_instance_executor_id ON task_instance(executor_id)");} catch (SQLException e) {logger.error("优化索引失败", e);}}public void optimizeQueries() {try (Connection conn = dataSource.getConnection();Statement stmt = conn.createStatement()) {// 设置查询缓存stmt.executeUpdate("SET GLOBAL query_cache_size = 67108864"); // 64MBstmt.executeUpdate("SET GLOBAL query_cache_type = 1");// 优化慢查询try (ResultSet rs = stmt.executeQuery("SELECT * FROM mysql.slow_log LIMIT 10")) {// 处理慢查询日志}} catch (SQLException e) {logger.error("优化查询失败", e);}}public void setupSharding() {// 设置分库分表// 这里简化处理,实际应该使用分库分表中间件}public void setupReadWriteSplitting() {// 设置读写分离// 这里简化处理,实际应该使用读写分离中间件}
}
10.3.2 缓存优化
优化缓存:
- 多级缓存:实现多级缓存
- 缓存预热:预热缓存数据
- 缓存更新:及时更新缓存
- 缓存淘汰:合理淘汰缓存
public class CacheManager {private Cache<String, Object> localCache;private RedisTemplate<String, Object> redisTemplate;public CacheManager(RedisTemplate<String, Object> redisTemplate) {this.redisTemplate = redisTemplate;// 初始化本地缓存localCache = CacheBuilder.newBuilder().maximumSize(10000).expireAfterWrite(5, TimeUnit.MINUTES).build();}public Object get(String key) {// 从本地缓存获取Object value = localCache.getIfPresent(key);if (value != null) {return value;}// 从 Redis 缓存获取value = redisTemplate.opsForValue().get(key);if (value != null) {// 更新本地缓存localCache.put(key, value);}return value;}public void put(String key, Object value) {// 更新本地缓存localCache.put(key, value);// 更新 Redis 缓存redisTemplate.opsForValue().set(key, value);}public void put(String key, Object value, long timeout, TimeUnit unit) {// 更新本地缓存localCache.put(key, value);// 更新 Redis 缓存redisTemplate.opsForValue().set(key, value, timeout, unit);}public void remove(String key) {// 删除本地缓存localCache.invalidate(key);// 删除 Redis 缓存redisTemplate.delete(key);}public void preloadCache(List<String> keys) {// 预热缓存for (String key : keys) {Object value = redisTemplate.opsForValue().get(key);if (value != null) {localCache.put(key, value);}}}public void clearLocalCache() {
public void clearLocalCache() {// 清空本地缓存localCache.invalidateAll();}
}
10.4 通信性能优化
10.4.1 网络优化
优化网络通信:
- 连接池化:使用连接池管理连接
- 长连接:使用长连接减少连接开销
- 批量传输:批量传输数据
- 压缩传输:压缩传输数据
public class NetworkOptimizer {private PoolingHttpClientConnectionManager connectionManager;private CloseableHttpClient httpClient;public NetworkOptimizer() {// 初始化连接池connectionManager = new PoolingHttpClientConnectionManager();connectionManager.setMaxTotal(200);connectionManager.setDefaultMaxPerRoute(20);// 创建 HTTP 客户端httpClient = HttpClients.custom().setConnectionManager(connectionManager).setKeepAliveStrategy((response, context) -> 30 * 1000) // 30 秒.build();}public HttpResponse sendRequest(HttpRequest request) throws IOException {// 发送请求if (request instanceof HttpEntityEnclosingRequest) {// 压缩请求体HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest) request;HttpEntity entity = entityRequest.getEntity();if (entity != null && entity.getContentLength() > 1024) {// 大于 1KB 的内容进行压缩ByteArrayOutputStream baos = new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);entity.writeTo(gzipOutputStream);gzipOutputStream.close();ByteArrayEntity compressedEntity = new ByteArrayEntity(baos.toByteArray());compressedEntity.setContentEncoding("gzip");entityRequest.setEntity(compressedEntity);// 添加压缩头request.addHeader("Content-Encoding", "gzip");}}// 添加 Accept-Encoding 头request.addHeader("Accept-Encoding", "gzip, deflate");// 执行请求return httpClient.execute((HttpUriRequest) request);}public void sendBatchRequests(List<HttpRequest> requests) throws IOException {// 批量发送请求for (HttpRequest request : requests) {sendRequest(request);}}public void close() {try {httpClient.close();} catch (IOException e) {logger.error("关闭 HTTP 客户端失败", e);}}
}
10.4.2 序列化优化
优化序列化:
- 高效序列化:使用高效的序列化方式
- 部分序列化:只序列化必要的字段
- 延迟序列化:延迟序列化大对象
- 复用对象:复用序列化对象
public class SerializationOptimizer {private ObjectMapper jsonMapper;private Kryo kryo;public SerializationOptimizer() {// 初始化 JSON 序列化器jsonMapper = new ObjectMapper();jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);jsonMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);// 初始化 Kryo 序列化器kryo = new Kryo();kryo.register(Task.class);kryo.register(TaskDefinition.class);kryo.register(TaskInstance.class);kryo.register(ExecutorNode.class);}public byte[] serializeToJson(Object obj) throws JsonProcessingException {// 序列化为 JSONreturn jsonMapper.writeValueAsBytes(obj);}public <T> T deserializeFromJson(byte[] data, Class<T> clazz) throws IOException {// 从 JSON 反序列化return jsonMapper.readValue(data, clazz);}public byte[] serializeToKryo(Object obj) {// 序列化为 KryoByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeObject(output, obj);output.close();return baos.toByteArray();}public <T> T deserializeFromKryo(byte[] data, Class<T> clazz) {// 从 Kryo 反序列化Input input = new Input(data);T obj = kryo.readObject(input, clazz);input.close();return obj;}public byte[] serializeToProtobuf(MessageLite message) {// 序列化为 Protobufreturn message.toByteArray();}public <T extends MessageLite> T deserializeFromProtobuf(byte[] data, Parser<T> parser) throws InvalidProtocolBufferException {// 从 Protobuf 反序列化return parser.parseFrom(data);}
}
总结
本文详细设计了一套分布式任务调度系统 GridJob,包括系统架构、核心组件、数据模型、调度算法、高可用性和容错机制、扩展性设计、安全性考虑、监控和告警系统、部署架构以及性能优化策略等方面。
GridJob 系统具有以下特点:
- 高可用性:通过主从架构、状态同步、健康检查、任务失败重试等机制保证系统的高可用性。
- 高可靠性:通过数据一致性保障、幂等性设计、灾备机制等保证系统的高可靠性。
- 高扩展性:通过水平扩展、垂直扩展、插件化架构等实现系统的高扩展性。
- 高性能:通过调度性能优化、执行性能优化、存储性能优化、通信性能优化等提升系统的性能。
- 安全性:通过认证与授权、数据安全、审计日志等保障系统的安全性。
- 可监控性:通过监控指标、监控系统、告警系统等实现系统的可监控性。
GridJob 系统适用于各种分布式任务调度场景,如批处理任务、定时任务、流式任务、实时任务等。通过合理的架构设计和实现,可以满足大规模、高并发、高可靠的任务调度需求。
在实际实现过程中,可以根据具体需求和场景选择合适的技术栈和实现方式,如 Java、Spring Boot、ZooKeeper、Kafka、MySQL、Redis 等。同时,可以根据业务需求进行定制化开发,如添加特定的任务类型、调度策略、执行模式等。
未来,GridJob 系统可以在以下方面进行扩展和优化:
- 智能调度:引入机器学习算法,实现智能调度和资源分配。
- 多语言支持:支持多种编程语言的任务执行。
- 可视化管理:提供更加友好的可视化管理界面。
- 云原生支持:深度集成云原生技术,如容器、服务网格等。
- 边缘计算支持:扩展到边缘计算场景,支持边缘节点的任务调度。
总之,GridJob 系统是一个功能完善、性能优越、可扩展性强的分布式任务调度系统,可以满足各种复杂场景的任务调度需求。