当前位置: 首页 > news >正文

dag实现案例 02、实现简易版dag调度系统(基于01之上升级)

文章目录

  • 前言
  • 源码
  • 前言回顾
  • 实现逻辑
  • 实现思路
    • 引入pom.xml依赖 & logback配置文件
    • DAG.java:有向无环图数据结构
    • TaskNode.java:任务节点抽象类
    • TaskExecutor、ShellTaskExecutor.java:任务执行器封装
    • DAGScheduler.java:DAG任务调度器
    • CycleDetector.java:DAG成环检测工具
  • 单测验证
    • Main.java:单测案例
    • 测试结果
  • 资料获取

前言

博主介绍:✌目前全网粉丝4W+,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java后端技术领域。

涵盖技术内容:Java后端、大数据、算法、分布式微服务、中间件、前端、运维等。

博主所有博客文件目录索引:博客目录索引(持续更新)

CSDN搜索:长路

视频平台:b站-Coder长路

[toc]

dag实现案例 02、实现简易版dag调度系统(基于01之上升级)

源码

gitee:https://gitee.com/changluJava/demo-exer/tree/master/java-dagscheduler/java-dag

github:https://github.com/changluya/Java-Demos/tree/master/java-dagscheduler/java-dag

前言回顾

在01当中,实现了一个非常简易版的有向无环图+check成环问题,有一部分局限性。

1、数组+链表显示,任务数量需要提前初始化数组大小。

2、添加任务目前仅仅只能够传入int类型作为一个唯一任务。

3、在指定头结点的情况下,进行成环依赖检测,成环依赖检测能力不太足。

实现逻辑

img

在02中,我们基于01进行进一步实现,实现内容如下:

1、map+链表实现,支持使用字符串作为任务唯一标识,无任务添加限制。

2、抽象任务为TaskNode类,DAG节点、边集合都采用map形式。

3、封装任务执行器,支持与任务类型绑定相应的执行器。

4、实现拓扑排序,可在拓扑排序中提前预判确认是否有环情况

5、单独实现成环依赖检测机制,支持分析有环情况原因,是否分析参数可配。

  • 实现思路与01版本基本保持一致,可视化打印成环路径情况,用于分析排查原因。

**说明:**当前实现部分参考Dolphinscheduler的DAG核心实现逻辑,

实现思路

引入pom.xml依赖 & logback配置文件

img

pom.xml:

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.7.36</slf4j.version>
</properties><dependencies><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency>
</dependencies>

logback.xml:方便后面分析打印日志

<?xml version="1.0" encoding="UTF-8"?>
<configuration><property name="LOG_HOME" value="./logs"/><property name="LOG_PATTERN"value="[%d{'yy-MM-dd HH:mm:ss,SSS',GMT+8:00}] %-5p [%.10t][%X{CU}] %logger{36}[%L] - %m%n"/><!-- 彩色日志格式 --><property name="CONSOLE_COLOR_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow([%thread])  %cyan(%logger{15}) %msg%n"/><property name="LOG_CHARSET" value="UTF-8"/><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!--   日志格式配置   --><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--   引用上面的键值对及输出的格式   --><pattern>${CONSOLE_COLOR_PATTERN}</pattern></encoder></appender><!-- Configure so that it outputs to both console and log file --><root level="info"><appender-ref ref="STDOUT" /></root></configuration>

DAG.java:有向无环图数据结构

img

DAG.java:该数据结构中存储了所有的点以及所有的边,实际上就是01demo中数组+链表转为了map+链表而已。

package com.changlu.demo2;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** dag:有向无环图* 模拟数组+链表  => map+集合*/
public class DAG {private Map<String, TaskNode> nodes = new HashMap<>();// 节点集合 模拟的节点数组private Map<String, List<String>> edges = new HashMap<>(); // 边集合(任务依赖关系) 模拟的数组+链表/*** 添加节点* @param taskId 任务id* @param taskNode 任务节点*/public void addNode(String taskId, TaskNode taskNode) {nodes.put(taskId, taskNode);edges.put(taskId, new ArrayList<>());}/*** 添加边* @param fromTaskId 来源任务id* @param toTaskId 目标任务id*/public void addEdge(String fromTaskId, String toTaskId) {edges.get(fromTaskId).add(toTaskId);}// 获取所有节点public Map<String, TaskNode> getNodes() {return nodes;}// 获取所有边public Map<String, List<String>> getEdges() {return edges;}}

TaskNode.java:任务节点抽象类

**说明:**每个任务实际上都是有自己的属性,可以为单独的一个实体,我们这里将其进行封装起来,不同的任务TaskId不同,在任务节点中可以封装相应任务属性,支持后续任务执行时参数使用。

package com.changlu.demo2;import java.util.Map;/*** 任务node节点*/
public class TaskNode {private String taskId; // 任务唯一标识private String taskName; // 任务名称private String taskType; // 任务类型(Shell、SQL 等)private Map<String, Object> params; // 任务参数public TaskNode() {}public TaskNode(String taskId, String taskName, String taskType) {this.taskId = taskId;this.taskName = taskName;this.taskType = taskType;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public String getTaskName() {return taskName;}public void setTaskName(String taskName) {this.taskName = taskName;}public String getTaskType() {return taskType;}public void setTaskType(String taskType) {this.taskType = taskType;}public Map<String, Object> getParams() {return params;}public void setParams(Map<String, Object> params) {this.params = params;}
}

TaskExecutor、ShellTaskExecutor.java:任务执行器封装

img

**TaskExecutor.java:**任务执行器接口封装,每一个任务最终都需要被调度执行,这里去封装出来一个接口execute执行方法,执行时,将相应的任务实体传入执行。

package com.changlu.demo2.task;import com.changlu.demo2.TaskNode;public interface TaskExecutor {void execute(TaskNode taskNode);
}

**ShellTaskExecutor.java:**该实现类为demo案例里shell任务对应执行器,很简单只需要实现下接口即可。

package com.changlu.demo2.task;import com.changlu.demo2.TaskNode;// 示例:Shell 任务执行器
public class ShellTaskExecutor implements TaskExecutor {@Overridepublic void execute(TaskNode taskNode) {System.out.println("Executing Shell Task: " + taskNode.getTaskName());// 这里可以调用真实的 Shell 脚本}
}

DAGScheduler.java:DAG任务调度器

DAGScheduler.java:该类为DAG任务调度器,在调度器中,我们传入DAG图数据结构实例,在调度器中我们来实现拓扑排序以及任务的调度执行

说明:对于成环检测,是在进行拓扑排序过程里能够检测到的,但是具体某个节点无法预测,需要单独使用CycleDetector类来完成具体原因分析

package com.changlu.demo2;import com.changlu.demo2.task.ShellTaskExecutor;
import com.changlu.demo2.task.TaskExecutor;import java.util.*;public class DAGScheduler {private DAG dag;// 存储任务类型 -> 任务执行器private Map<String, TaskExecutor> executors = new HashMap<>();public DAGScheduler(DAG dag) {this.dag = dag;// 注册任务执行器executors.put("Shell", new ShellTaskExecutor());}// 拓扑排序(检测依赖并生成执行顺序)public List<String> topologicalSort() {// dag图的顶点集合Map<String, List<String>> edges = dag.getEdges();// 入度map,目的为找到入度为0的顶点Map<String, Integer> inDegree = new HashMap<>();// 初始化入度for (String node : dag.getNodes().keySet()) {inDegree.put(node, 0);}// 入度累加for (List<String> deps : edges.values()) {for (String dep : deps) {inDegree.put(dep, inDegree.get(dep) + 1);}}// 底层实现原理:队列实现拓扑排序// 拓扑排序的起点Queue<String> queue = new LinkedList<>();// 入度为0的表示顶点,收集顶点到队列中for (String taskId : inDegree.keySet()) {if (inDegree.get(taskId) == 0) {queue.offer(taskId);}}// 成环校验:入度为0的情况 表示出现成环问题if (queue.isEmpty()) {throw new RuntimeException("出现成环问题,请及时排查解决!");}// 拓扑排序结果集List<String> result = new ArrayList<>();while (!queue.isEmpty()) {String taskId = queue.poll();result.add(taskId);// 处理当前节点所关联的依赖边for (String depTaskId : edges.get(taskId)) {// 依赖去重逻辑inDegree.put(depTaskId, inDegree.get(depTaskId) - 1);if (inDegree.get(depTaskId) == 0) {queue.offer(depTaskId);}}}return result;}// 执行dag任务public void execute() {// 计算出dag图的拓扑排序执行路径List<String> orderTaskIds = this.topologicalSort();// 按照拓扑排序顺序的节点执行任务System.out.println("规划的拓扑排序顺序为:" + orderTaskIds);for (String taskId : orderTaskIds) {TaskNode taskNode = dag.getNodes().get(taskId);// 匹配对应的任务执行器TaskExecutor taskExecutor = executors.get(taskNode.getTaskType());if (taskExecutor != null) {taskExecutor.execute(taskNode);}else {System.out.println("No executor found for task type: " + taskNode.getTaskType());throw new RuntimeException(String.format("No executor found for task type: %s, taskId: %s", taskNode.getTaskType(), taskNode.getTaskId()));}}}}

CycleDetector.java:DAG成环检测工具

CycleDetector.java:单独实现针对dag来进行递归判断是否出现环问题的逻辑,同时支持分析出现环路的路径情况,用于快速定位分析。

package com.changlu.demo2;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;/*** DAG成环检测器*/
public class CycleDetector {private static final Logger log = LoggerFactory.getLogger(CycleDetector.class);private DAG dag;private boolean isAnalysis = false;private String analysisMsg = "";public CycleDetector(DAG dag) {this.dag = dag;}public CycleDetector(DAG dag, boolean isAnalysis) {this.dag = dag;this.isAnalysis = isAnalysis;}/*** 是否存在出现成环情况*/public boolean isCycle() {// 入度map,目的为找到入度为0的顶点Map<String, Integer> inDegree = getInDegreeMap();// 校验是否出现成环问题。// 成环情况:无入度为0的节点数int inDegreeZeroNodeNums = 0;for (Integer value : inDegree.values()) {if (value == 0) inDegreeZeroNodeNums ++;}boolean isCycle = inDegreeZeroNodeNums == 0;// 出现成环情况 & 开启分析if (isCycle && isAnalysis) {doProcessAnalysis();}return isCycle;}/*** 分析成环原因* 时间复杂度:O(n * n)、空间复杂度O(n)*/private void doProcessAnalysis() {log.info("分析参数已开启,已检测到成环情况,开始分析成环原因...");// 得到dag图的顶点List<String> headTaskIds = new ArrayList<>();// 方案1:筛选得到入度为0的顶点(成环情况,无法筛选)// 入度map,目的为找到入度为0的顶点
//        Map<String, Integer> inDegreeMap = this.getInDegreeMap();
//        for (String taskId : inDegreeMap.keySet()) {
//            if (inDegreeMap.get(taskId) == 0) {
//                headTaskIds.add(taskId);
//            }
//        }// 方案2:筛选全部的顶点 这里原因导致后续时间复杂度:O(n * n),如果能够找到顶点,时间复杂度就是O(n)headTaskIds.addAll(this.dag.getNodes().keySet());// 初始化访问map、递归mapMap<String, Boolean> visited = new HashMap<>();Map<String, Map.Entry<Boolean, Long>> recursionStack = new HashMap<>();for (String taskId : this.dag.getNodes().keySet()) {visited.put(taskId, false);recursionStack.put(taskId, this.getEntry(false));}// 开始从顶点(顶点可能有点多个场景)开始进行递归处理for (String headTaskId : headTaskIds) {boolean isCycle = processAnalysisUtil(headTaskId, visited, recursionStack);// 如果检测过程中已经确认有环了,核心原因已分析:this.analysisMsg// 此时可以提前结束分析if (isCycle) {return;}}log.info("分析成环原因已结束");}/*** 递归成环路径分析工具* @param taskId 任务id* @param visited 访问过节点* @param recursionStack 递归访问*/private boolean processAnalysisUtil(String taskId, Map<String, Boolean> visited, Map<String, Map.Entry<Boolean, Long>> recursionStack) {visited.put(taskId, true);// 单独设计一个recursionStack原因是因为,如果某个节点同时被上游两个节点依赖,不能单独只依靠visited来判断出现成环情况recursionStack.put(taskId, this.getEntry(true));List<String> depTaskIds = this.dag.getEdges().get(taskId);for (String depTaskId : depTaskIds) {// 判断是否访问过if (!visited.get(depTaskId)) { // 未访问过情况if (processAnalysisUtil(depTaskId, visited, recursionStack)) {return true;}}else if (recursionStack.get(depTaskId).getKey()){ // 递归路径中访问过情况// 注:无法直接分析得到某个点,因为一旦不是从顶点往下遍历就会出现错误预测,this.analysisMsg = String.format("出现成环情况!成环点为:%s, 其上游依赖点为:%s", depTaskId, taskId);// 这里确认成环路径 根据添加的时间戳顺序来进行排序Map<String, Map.Entry<Boolean, Long>> sortedMap = recursionStack.entrySet().stream()// 过滤掉 key 为 false 的情况.filter(entry -> entry.getValue().getKey()).sorted(Map.Entry.comparingByValue((entry1, entry2) -> {Long timeStamp1 = entry1.getValue();Long timeStamp2 = entry2.getValue();return timeStamp1.compareTo(timeStamp2);})).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));// 将排序后的键拼接成字符串String analysisCyclePath = sortedMap.keySet().stream().map(key -> "[" + key + "]").collect(Collectors.joining(" -> "));analysisCyclePath = analysisCyclePath + " -> " + "[" + depTaskId + "]";this.analysisMsg = String.format("成环路径为 => %s, 起始扫描起点 => %s", analysisCyclePath, "[" + depTaskId + "]");log.info(this.analysisMsg);return true;}}// 回退某个节点往下找依赖过程中的标识字段recursionStack.put(taskId, this.getEntry(false));return false;}// -------------------辅助封装方法-------------------private Map.Entry<Boolean, Long> getEntry(Boolean key) {if (key) {// 自定义TimestampGenerator.generateUniqueTimestamp原因为:可能会出现设置时间戳一致的情况,导致依赖路径无法确认return getMapEntry(key, TimestampGenerator.generateUniqueTimestamp());}else {return getMapEntry(key, -1L);}}/*** 创建一个 Map.Entry 对象。* @param key 键* @param value 值* @param <T> 键的类型* @param <R> 值的类型* @return 返回一个 Map.Entry 对象*/private  <T, R> Map.Entry<T, R> getMapEntry(T key, R value) {return new AbstractMap.SimpleEntry<>(key, value);}/*** 获取点入度map* @return 入度结果集*/private Map<String, Integer> getInDegreeMap() {Map<String, Integer> inDegree = new HashMap<>();// 初始化入度for (String node : dag.getNodes().keySet()) {inDegree.put(node, 0);}// 入度累加for (List<String> deps : dag.getEdges().values()) {for (String dep : deps) {inDegree.put(dep, inDegree.get(dep) + 1);}}return inDegree;}/*** 获取成环分析结果* @return 成环分析结果*/public String getAnalysisMsg() {return analysisMsg;}}class TimestampGenerator {private static final AtomicLong counter = new AtomicLong(0);/*** 生成一个唯一的长整型时间戳* 简易版单机* @return 唯一的时间戳*/public static long generateUniqueTimestamp() {return System.currentTimeMillis() * 1000 + counter.getAndIncrement();}
}

单测验证

Main.java:单测案例

package com.changlu.demo2;public class Main {public static void main(String[] args) {// 创建 DAGDAG dag = new DAG();// 添加任务节点TaskNode task1 = new TaskNode("a-1", "Shell Task a-1", "Shell");TaskNode task2 = new TaskNode("b-2", "Shell Task b-2", "Shell");TaskNode task3 = new TaskNode("c-3", "Shell Task c-3", "Shell");TaskNode task4 = new TaskNode("d-4", "Shell Task d-4", "Shell");TaskNode task5 = new TaskNode("e-5", "Shell Task e-5", "Shell");TaskNode task6 = new TaskNode("f-6", "Shell Task f-6", "Shell");TaskNode task7 = new TaskNode("g-7", "Shell Task g-7", "Shell");TaskNode task8 = new TaskNode("h-8", "Shell Task h-8", "Shell");TaskNode task9 = new TaskNode("i-9", "Shell Task i-9", "Shell");// 初始化任务节点dag.addNode(task1.getTaskId(), task1);dag.addNode(task2.getTaskId(), task2);dag.addNode(task3.getTaskId(), task3);dag.addNode(task4.getTaskId(), task4);dag.addNode(task5.getTaskId(), task5);dag.addNode(task6.getTaskId(), task6);dag.addNode(task7.getTaskId(), task7);dag.addNode(task8.getTaskId(), task8);dag.addNode(task9.getTaskId(), task9);// a-1  -> b-2、c-3dag.addEdge(task1.getTaskId(), task2.getTaskId());dag.addEdge(task1.getTaskId(), task3.getTaskId());// b-2  -> d-4、e-5dag.addEdge(task2.getTaskId(), task4.getTaskId());dag.addEdge(task2.getTaskId(), task5.getTaskId());// c-3  -> e-5、f-6dag.addEdge(task3.getTaskId(), task5.getTaskId());dag.addEdge(task3.getTaskId(), task6.getTaskId());// d-4  -> g-7dag.addEdge(task4.getTaskId(), task7.getTaskId());// e-5  -> g-7dag.addEdge(task5.getTaskId(), task7.getTaskId());// f-6  -> h-8dag.addEdge(task6.getTaskId(), task8.getTaskId());// g-7  -> i-9dag.addEdge(task7.getTaskId(), task9.getTaskId());// 成环依赖// dag.addEdge(task9.getTaskId(), task1.getTaskId());// 成环工具检测器,开启成环分析CycleDetector cycleDetector = new CycleDetector(dag, true);if (cycleDetector.isCycle()) {throw new RuntimeException("当前dag出现成环情况!");}// 调度执行DAGScheduler scheduler = new DAGScheduler(dag);scheduler.execute();}}

测试结果

默认的dag图如下所示:

img

补充依赖导致出现成环情况:

img

情况1:无成环情况

img

说明:任务正常无异常情况。

情况2:有成环情况

补充成环依赖代码:

//成环依赖
dag.addEdge(task9.getTaskId(), task1.getTaskId());

img

如果开启了成环依赖分析,我们可以查看到上面打印的成环检测日志信息。注意之类打印的起始扫描起点,仅仅只是我们程序从该点开始扫描扫出来出现成环情况。

我们应该依照成环路径去查看下是否我们的dag图出现了成环问题,从而确认原因问题。

真正原因如下成环案例,是i-9错误的依赖到了a-1情况。

img

资料获取

大家点赞、收藏、关注、评论啦~

精彩专栏推荐订阅:在下方专栏👇🏻

  • 长路-文章目录汇总(算法、后端Java、前端、运维技术导航):博主所有博客导航索引汇总
  • 开源项目Studio-Vue—校园工作室管理系统(含前后台,SpringBoot+Vue):博主个人独立项目,包含详细部署上线视频,已开源
  • 学习与生活-专栏:可以了解博主的学习历程
  • 算法专栏:算法收录

更多博客与资料可查看👇🏻获取联系方式👇🏻,🍅文末获取开发资源及更多资源博客获取🍅

http://www.dtcms.com/a/327469.html

相关文章:

  • GeoScene 空间大数据产品使用入门(6)进阶模型
  • 软考备考(三)
  • jupyter notebook中查看kernel对应环境的解决方案
  • RK3568 Linux驱动学习——Linux LED驱动开发
  • 安全合规5--终端安全检测和防御技术
  • 【1】Transformers快速入门:自然语言处理(NLP)是啥?
  • 肖臻《区块链技术与应用》第九讲:比特币交易的“智能”核心:深入解析脚本语言Script
  • 常见的设计模式(2)单例模式
  • TDengine 初体验
  • Flink Python API 提交 Socket 数据源的 WordCount 作业
  • TDengine 可观测性最佳实践
  • 荣耀手机无法连接win11电脑,错误消息:“无法在此设备上加载驱动程序 (hn_usbccgpfilter.sys)。”解决方案
  • Flink运行时的实现细节
  • 嵌入式Linux进程管理面试题大全(含详细解析)
  • 基于热成像摄像头检测蚊子的可行性研究
  • iOS 签名证书全生命周期实战,从开发到上架的多阶段应用
  • 《Qwen2.5-VL 》论文精读笔记
  • 网络协议之TCP和UDP
  • 【iOS】Block基础知识和底层探索
  • Model Context Protocol (MCP)标准化应用程序向大型语言模型 (LLM) 提供上下文协议
  • 如何通过 5 种方法轻松格式化 USB 驱动器
  • Kubernetes 资源管理全解析:从基础到企业级实践
  • MyBatis-Plus——SQL注入器
  • 华清远见25072班C语言学习day7
  • 《算法导论》第 21 章-用于不相交集合的数据结构
  • 01-Ansible 自动化介绍与使用
  • 【数据结构】二叉树结构与相关实现
  • .NET MAUI框架编译Android应用流程
  • 服务降级方式
  • Python实现Amazon Redshift数据库元数据提取类