dag实现案例 02、实现简易版dag调度系统(基于01之上升级)
文章目录
- 前言
- 源码
- 前言回顾
- 实现逻辑
- 实现思路
- 引入pom.xml依赖 & logback配置文件
- DAG.java:有向无环图数据结构
- TaskNode.java:任务节点抽象类
- TaskExecutor、ShellTaskExecutor.java:任务执行器封装
- DAGScheduler.java:DAG任务调度器
- CycleDetector.java:DAG成环检测工具
- 单测验证
- Main.java:单测案例
- 测试结果
- 资料获取
前言
博主介绍:✌目前全网粉丝4W+,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java后端技术领域。
涵盖技术内容:Java后端、大数据、算法、分布式微服务、中间件、前端、运维等。
博主所有博客文件目录索引:博客目录索引(持续更新)
CSDN搜索:长路
视频平台:b站-Coder长路
[toc]
源码
gitee:https://gitee.com/changluJava/demo-exer/tree/master/java-dagscheduler/java-dag
github:https://github.com/changluya/Java-Demos/tree/master/java-dagscheduler/java-dag
前言回顾
在01当中,实现了一个非常简易版的有向无环图+check成环问题,有一部分局限性。
1、数组+链表显示,任务数量需要提前初始化数组大小。
2、添加任务目前仅仅只能够传入int类型作为一个唯一任务。
3、在指定头结点的情况下,进行成环依赖检测,成环依赖检测能力不太足。
实现逻辑
在02中,我们基于01进行进一步实现,实现内容如下:
1、map+链表实现,支持使用字符串作为任务唯一标识,无任务添加限制。
2、抽象任务为TaskNode类,DAG节点、边集合都采用map形式。
3、封装任务执行器,支持与任务类型绑定相应的执行器。
4、实现拓扑排序,可在拓扑排序中提前预判确认是否有环情况。
5、单独实现成环依赖检测机制,支持分析有环情况原因,是否分析参数可配。
- 实现思路与01版本基本保持一致,可视化打印成环路径情况,用于分析排查原因。
**说明:**当前实现部分参考Dolphinscheduler的DAG核心实现逻辑,
实现思路
引入pom.xml依赖 & logback配置文件
pom.xml:
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.7.36</slf4j.version>
</properties><dependencies><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency>
</dependencies>
logback.xml:方便后面分析打印日志
<?xml version="1.0" encoding="UTF-8"?>
<configuration><property name="LOG_HOME" value="./logs"/><property name="LOG_PATTERN"value="[%d{'yy-MM-dd HH:mm:ss,SSS',GMT+8:00}] %-5p [%.10t][%X{CU}] %logger{36}[%L] - %m%n"/><!-- 彩色日志格式 --><property name="CONSOLE_COLOR_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow([%thread]) %cyan(%logger{15}) %msg%n"/><property name="LOG_CHARSET" value="UTF-8"/><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!-- 日志格式配置 --><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!-- 引用上面的键值对及输出的格式 --><pattern>${CONSOLE_COLOR_PATTERN}</pattern></encoder></appender><!-- Configure so that it outputs to both console and log file --><root level="info"><appender-ref ref="STDOUT" /></root></configuration>
DAG.java:有向无环图数据结构
DAG.java:该数据结构中存储了所有的点以及所有的边,实际上就是01demo中数组+链表转为了map+链表而已。
package com.changlu.demo2;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** dag:有向无环图* 模拟数组+链表 => map+集合*/
public class DAG {private Map<String, TaskNode> nodes = new HashMap<>();// 节点集合 模拟的节点数组private Map<String, List<String>> edges = new HashMap<>(); // 边集合(任务依赖关系) 模拟的数组+链表/*** 添加节点* @param taskId 任务id* @param taskNode 任务节点*/public void addNode(String taskId, TaskNode taskNode) {nodes.put(taskId, taskNode);edges.put(taskId, new ArrayList<>());}/*** 添加边* @param fromTaskId 来源任务id* @param toTaskId 目标任务id*/public void addEdge(String fromTaskId, String toTaskId) {edges.get(fromTaskId).add(toTaskId);}// 获取所有节点public Map<String, TaskNode> getNodes() {return nodes;}// 获取所有边public Map<String, List<String>> getEdges() {return edges;}}
TaskNode.java:任务节点抽象类
**说明:**每个任务实际上都是有自己的属性,可以为单独的一个实体,我们这里将其进行封装起来,不同的任务TaskId不同,在任务节点中可以封装相应任务属性,支持后续任务执行时参数使用。
package com.changlu.demo2;import java.util.Map;/*** 任务node节点*/
public class TaskNode {private String taskId; // 任务唯一标识private String taskName; // 任务名称private String taskType; // 任务类型(Shell、SQL 等)private Map<String, Object> params; // 任务参数public TaskNode() {}public TaskNode(String taskId, String taskName, String taskType) {this.taskId = taskId;this.taskName = taskName;this.taskType = taskType;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public String getTaskName() {return taskName;}public void setTaskName(String taskName) {this.taskName = taskName;}public String getTaskType() {return taskType;}public void setTaskType(String taskType) {this.taskType = taskType;}public Map<String, Object> getParams() {return params;}public void setParams(Map<String, Object> params) {this.params = params;}
}
TaskExecutor、ShellTaskExecutor.java:任务执行器封装
**TaskExecutor.java:**任务执行器接口封装,每一个任务最终都需要被调度执行,这里去封装出来一个接口execute执行方法,执行时,将相应的任务实体传入执行。
package com.changlu.demo2.task;import com.changlu.demo2.TaskNode;public interface TaskExecutor {void execute(TaskNode taskNode);
}
**ShellTaskExecutor.java:**该实现类为demo案例里shell任务对应执行器,很简单只需要实现下接口即可。
package com.changlu.demo2.task;import com.changlu.demo2.TaskNode;// 示例:Shell 任务执行器
public class ShellTaskExecutor implements TaskExecutor {@Overridepublic void execute(TaskNode taskNode) {System.out.println("Executing Shell Task: " + taskNode.getTaskName());// 这里可以调用真实的 Shell 脚本}
}
DAGScheduler.java:DAG任务调度器
DAGScheduler.java:该类为DAG任务调度器,在调度器中,我们传入DAG图数据结构实例,在调度器中我们来实现拓扑排序以及任务的调度执行。
说明:对于成环检测,是在进行拓扑排序过程里能够检测到的,但是具体某个节点无法预测,需要单独使用CycleDetector类来完成具体原因分析。
package com.changlu.demo2;import com.changlu.demo2.task.ShellTaskExecutor;
import com.changlu.demo2.task.TaskExecutor;import java.util.*;public class DAGScheduler {private DAG dag;// 存储任务类型 -> 任务执行器private Map<String, TaskExecutor> executors = new HashMap<>();public DAGScheduler(DAG dag) {this.dag = dag;// 注册任务执行器executors.put("Shell", new ShellTaskExecutor());}// 拓扑排序(检测依赖并生成执行顺序)public List<String> topologicalSort() {// dag图的顶点集合Map<String, List<String>> edges = dag.getEdges();// 入度map,目的为找到入度为0的顶点Map<String, Integer> inDegree = new HashMap<>();// 初始化入度for (String node : dag.getNodes().keySet()) {inDegree.put(node, 0);}// 入度累加for (List<String> deps : edges.values()) {for (String dep : deps) {inDegree.put(dep, inDegree.get(dep) + 1);}}// 底层实现原理:队列实现拓扑排序// 拓扑排序的起点Queue<String> queue = new LinkedList<>();// 入度为0的表示顶点,收集顶点到队列中for (String taskId : inDegree.keySet()) {if (inDegree.get(taskId) == 0) {queue.offer(taskId);}}// 成环校验:入度为0的情况 表示出现成环问题if (queue.isEmpty()) {throw new RuntimeException("出现成环问题,请及时排查解决!");}// 拓扑排序结果集List<String> result = new ArrayList<>();while (!queue.isEmpty()) {String taskId = queue.poll();result.add(taskId);// 处理当前节点所关联的依赖边for (String depTaskId : edges.get(taskId)) {// 依赖去重逻辑inDegree.put(depTaskId, inDegree.get(depTaskId) - 1);if (inDegree.get(depTaskId) == 0) {queue.offer(depTaskId);}}}return result;}// 执行dag任务public void execute() {// 计算出dag图的拓扑排序执行路径List<String> orderTaskIds = this.topologicalSort();// 按照拓扑排序顺序的节点执行任务System.out.println("规划的拓扑排序顺序为:" + orderTaskIds);for (String taskId : orderTaskIds) {TaskNode taskNode = dag.getNodes().get(taskId);// 匹配对应的任务执行器TaskExecutor taskExecutor = executors.get(taskNode.getTaskType());if (taskExecutor != null) {taskExecutor.execute(taskNode);}else {System.out.println("No executor found for task type: " + taskNode.getTaskType());throw new RuntimeException(String.format("No executor found for task type: %s, taskId: %s", taskNode.getTaskType(), taskNode.getTaskId()));}}}}
CycleDetector.java:DAG成环检测工具
CycleDetector.java:单独实现针对dag来进行递归判断是否出现环问题的逻辑,同时支持分析出现环路的路径情况,用于快速定位分析。
package com.changlu.demo2;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;/*** DAG成环检测器*/
public class CycleDetector {private static final Logger log = LoggerFactory.getLogger(CycleDetector.class);private DAG dag;private boolean isAnalysis = false;private String analysisMsg = "";public CycleDetector(DAG dag) {this.dag = dag;}public CycleDetector(DAG dag, boolean isAnalysis) {this.dag = dag;this.isAnalysis = isAnalysis;}/*** 是否存在出现成环情况*/public boolean isCycle() {// 入度map,目的为找到入度为0的顶点Map<String, Integer> inDegree = getInDegreeMap();// 校验是否出现成环问题。// 成环情况:无入度为0的节点数int inDegreeZeroNodeNums = 0;for (Integer value : inDegree.values()) {if (value == 0) inDegreeZeroNodeNums ++;}boolean isCycle = inDegreeZeroNodeNums == 0;// 出现成环情况 & 开启分析if (isCycle && isAnalysis) {doProcessAnalysis();}return isCycle;}/*** 分析成环原因* 时间复杂度:O(n * n)、空间复杂度O(n)*/private void doProcessAnalysis() {log.info("分析参数已开启,已检测到成环情况,开始分析成环原因...");// 得到dag图的顶点List<String> headTaskIds = new ArrayList<>();// 方案1:筛选得到入度为0的顶点(成环情况,无法筛选)// 入度map,目的为找到入度为0的顶点
// Map<String, Integer> inDegreeMap = this.getInDegreeMap();
// for (String taskId : inDegreeMap.keySet()) {
// if (inDegreeMap.get(taskId) == 0) {
// headTaskIds.add(taskId);
// }
// }// 方案2:筛选全部的顶点 这里原因导致后续时间复杂度:O(n * n),如果能够找到顶点,时间复杂度就是O(n)headTaskIds.addAll(this.dag.getNodes().keySet());// 初始化访问map、递归mapMap<String, Boolean> visited = new HashMap<>();Map<String, Map.Entry<Boolean, Long>> recursionStack = new HashMap<>();for (String taskId : this.dag.getNodes().keySet()) {visited.put(taskId, false);recursionStack.put(taskId, this.getEntry(false));}// 开始从顶点(顶点可能有点多个场景)开始进行递归处理for (String headTaskId : headTaskIds) {boolean isCycle = processAnalysisUtil(headTaskId, visited, recursionStack);// 如果检测过程中已经确认有环了,核心原因已分析:this.analysisMsg// 此时可以提前结束分析if (isCycle) {return;}}log.info("分析成环原因已结束");}/*** 递归成环路径分析工具* @param taskId 任务id* @param visited 访问过节点* @param recursionStack 递归访问*/private boolean processAnalysisUtil(String taskId, Map<String, Boolean> visited, Map<String, Map.Entry<Boolean, Long>> recursionStack) {visited.put(taskId, true);// 单独设计一个recursionStack原因是因为,如果某个节点同时被上游两个节点依赖,不能单独只依靠visited来判断出现成环情况recursionStack.put(taskId, this.getEntry(true));List<String> depTaskIds = this.dag.getEdges().get(taskId);for (String depTaskId : depTaskIds) {// 判断是否访问过if (!visited.get(depTaskId)) { // 未访问过情况if (processAnalysisUtil(depTaskId, visited, recursionStack)) {return true;}}else if (recursionStack.get(depTaskId).getKey()){ // 递归路径中访问过情况// 注:无法直接分析得到某个点,因为一旦不是从顶点往下遍历就会出现错误预测,this.analysisMsg = String.format("出现成环情况!成环点为:%s, 其上游依赖点为:%s", depTaskId, taskId);// 这里确认成环路径 根据添加的时间戳顺序来进行排序Map<String, Map.Entry<Boolean, Long>> sortedMap = recursionStack.entrySet().stream()// 过滤掉 key 为 false 的情况.filter(entry -> entry.getValue().getKey()).sorted(Map.Entry.comparingByValue((entry1, entry2) -> {Long timeStamp1 = entry1.getValue();Long timeStamp2 = entry2.getValue();return timeStamp1.compareTo(timeStamp2);})).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));// 将排序后的键拼接成字符串String analysisCyclePath = sortedMap.keySet().stream().map(key -> "[" + key + "]").collect(Collectors.joining(" -> "));analysisCyclePath = analysisCyclePath + " -> " + "[" + depTaskId + "]";this.analysisMsg = String.format("成环路径为 => %s, 起始扫描起点 => %s", analysisCyclePath, "[" + depTaskId + "]");log.info(this.analysisMsg);return true;}}// 回退某个节点往下找依赖过程中的标识字段recursionStack.put(taskId, this.getEntry(false));return false;}// -------------------辅助封装方法-------------------private Map.Entry<Boolean, Long> getEntry(Boolean key) {if (key) {// 自定义TimestampGenerator.generateUniqueTimestamp原因为:可能会出现设置时间戳一致的情况,导致依赖路径无法确认return getMapEntry(key, TimestampGenerator.generateUniqueTimestamp());}else {return getMapEntry(key, -1L);}}/*** 创建一个 Map.Entry 对象。* @param key 键* @param value 值* @param <T> 键的类型* @param <R> 值的类型* @return 返回一个 Map.Entry 对象*/private <T, R> Map.Entry<T, R> getMapEntry(T key, R value) {return new AbstractMap.SimpleEntry<>(key, value);}/*** 获取点入度map* @return 入度结果集*/private Map<String, Integer> getInDegreeMap() {Map<String, Integer> inDegree = new HashMap<>();// 初始化入度for (String node : dag.getNodes().keySet()) {inDegree.put(node, 0);}// 入度累加for (List<String> deps : dag.getEdges().values()) {for (String dep : deps) {inDegree.put(dep, inDegree.get(dep) + 1);}}return inDegree;}/*** 获取成环分析结果* @return 成环分析结果*/public String getAnalysisMsg() {return analysisMsg;}}class TimestampGenerator {private static final AtomicLong counter = new AtomicLong(0);/*** 生成一个唯一的长整型时间戳* 简易版单机* @return 唯一的时间戳*/public static long generateUniqueTimestamp() {return System.currentTimeMillis() * 1000 + counter.getAndIncrement();}
}
单测验证
Main.java:单测案例
package com.changlu.demo2;public class Main {public static void main(String[] args) {// 创建 DAGDAG dag = new DAG();// 添加任务节点TaskNode task1 = new TaskNode("a-1", "Shell Task a-1", "Shell");TaskNode task2 = new TaskNode("b-2", "Shell Task b-2", "Shell");TaskNode task3 = new TaskNode("c-3", "Shell Task c-3", "Shell");TaskNode task4 = new TaskNode("d-4", "Shell Task d-4", "Shell");TaskNode task5 = new TaskNode("e-5", "Shell Task e-5", "Shell");TaskNode task6 = new TaskNode("f-6", "Shell Task f-6", "Shell");TaskNode task7 = new TaskNode("g-7", "Shell Task g-7", "Shell");TaskNode task8 = new TaskNode("h-8", "Shell Task h-8", "Shell");TaskNode task9 = new TaskNode("i-9", "Shell Task i-9", "Shell");// 初始化任务节点dag.addNode(task1.getTaskId(), task1);dag.addNode(task2.getTaskId(), task2);dag.addNode(task3.getTaskId(), task3);dag.addNode(task4.getTaskId(), task4);dag.addNode(task5.getTaskId(), task5);dag.addNode(task6.getTaskId(), task6);dag.addNode(task7.getTaskId(), task7);dag.addNode(task8.getTaskId(), task8);dag.addNode(task9.getTaskId(), task9);// a-1 -> b-2、c-3dag.addEdge(task1.getTaskId(), task2.getTaskId());dag.addEdge(task1.getTaskId(), task3.getTaskId());// b-2 -> d-4、e-5dag.addEdge(task2.getTaskId(), task4.getTaskId());dag.addEdge(task2.getTaskId(), task5.getTaskId());// c-3 -> e-5、f-6dag.addEdge(task3.getTaskId(), task5.getTaskId());dag.addEdge(task3.getTaskId(), task6.getTaskId());// d-4 -> g-7dag.addEdge(task4.getTaskId(), task7.getTaskId());// e-5 -> g-7dag.addEdge(task5.getTaskId(), task7.getTaskId());// f-6 -> h-8dag.addEdge(task6.getTaskId(), task8.getTaskId());// g-7 -> i-9dag.addEdge(task7.getTaskId(), task9.getTaskId());// 成环依赖// dag.addEdge(task9.getTaskId(), task1.getTaskId());// 成环工具检测器,开启成环分析CycleDetector cycleDetector = new CycleDetector(dag, true);if (cycleDetector.isCycle()) {throw new RuntimeException("当前dag出现成环情况!");}// 调度执行DAGScheduler scheduler = new DAGScheduler(dag);scheduler.execute();}}
测试结果
默认的dag图如下所示:
补充依赖导致出现成环情况:
情况1:无成环情况
说明:任务正常无异常情况。
情况2:有成环情况
补充成环依赖代码:
//成环依赖
dag.addEdge(task9.getTaskId(), task1.getTaskId());
如果开启了成环依赖分析,我们可以查看到上面打印的成环检测日志信息。注意之类打印的起始扫描起点,仅仅只是我们程序从该点开始扫描扫出来出现成环情况。
我们应该依照成环路径去查看下是否我们的dag图出现了成环问题,从而确认原因问题。
真正原因如下成环案例,是i-9错误的依赖到了a-1情况。
资料获取
大家点赞、收藏、关注、评论啦~
精彩专栏推荐订阅:在下方专栏👇🏻
- 长路-文章目录汇总(算法、后端Java、前端、运维技术导航):博主所有博客导航索引汇总
- 开源项目Studio-Vue—校园工作室管理系统(含前后台,SpringBoot+Vue):博主个人独立项目,包含详细部署上线视频,已开源
- 学习与生活-专栏:可以了解博主的学习历程
- 算法专栏:算法收录
更多博客与资料可查看👇🏻获取联系方式👇🏻,🍅文末获取开发资源及更多资源博客获取🍅