当前位置: 首页 > news >正文

基于 SpringBoot+Logicflow 的轻流程任务执行

此方案是我工作中遇到的一个标准运维的一个需求
可以在web页面编辑流程节点 ,节点类型,支持拖拽等等,节点可以选择脚本,插件等等功能
然后通过建任务记录(record表),关联某个模板 (task_template表)
只做简单实现记录,代码是执行不了的,方便以后面试的时候问起来,省得记不起来

LogicFlow是什么
官网:http://logicflow.cn/tutorial/about
LogicFlow是一款流程图编辑框架,提供了一系列流程图交互、编辑所必需的功能和灵活的节点自定义、插件等拓展机制。 LogicFlow支持前端自定义开发各种逻辑编排场景,如流程图、ER图、BPMN流程等。在工作审批流配置、机器人逻辑编排、无代码平台流程配置都有较好的应用。
特性
可视化模型:通过 LogicFlow 提供的直观可视化界面,用户可以轻松创建、编辑和管理复杂的逻辑流程图。
高可定制性:用户可以根据自己的需要定制节点、连接器和样式,创建符合特定用例的定制逻辑流程图。
自执行引擎:执行引擎支持浏览器端执行流程图逻辑,为无代码执行提供新思路。

//任务模板json,此json通过前端logicflow可以图形化

{"nodes": [{"id": "start-node","type": "start","x": 150,"y": 100,"text": "开始","properties": {}},{"id": "task-node-01","type": "task","x": 150,"y": 250,"text": "数据库备份","properties": {"type": "script","scriptId": "9cf678b4-9108-454e-8b21-67b53a7b6dc0","scriptName": "ssh测试ping","scriptOrigin": 2,"scriptVersion": "1.0.0","scriptType": 2,  "scriptTypeDesc": "用于管理开源应用的脚本",  "inputParameters": [{"parameterName": "env","parameterValue": "SIT","isRequired": true}],"timeout": 1800,"status": "fail"}},{"id": "task-node-02","type": "task","x": 400,"y": 250,"text": "发送失败通知","properties": {"type": "email","pluginCode": "1001","pluginName": "通知","pluginVersion": "v1.0","pluginDescription": "用于发送流程节点执行结果通知","nodeName": "发送通知","status": "success","noticeMethod": "email,meixin","noticeUsers": "ex_xuyy15,zhangly97","noticeSubject": "通知主题【告警】数据库备份任务执行失败","noticeContent": "您好!失败了啊"}},{"id": "end-node","type": "end","x": 275,"y": 400,"text": "流程结束","properties": {"description": "接收备份成功分支或通知节点流转,流程最终终止"}}],"edges": [{"id": "edge-start-to-backup","sourceNodeId": "start-node","targetNodeId": "task-node-01","text": "","properties": {"description": "流程开始后,执行数据库备份任务"}},{"id": "edge-backup-to-end-success","sourceNodeId": "task-node-01","targetNodeId": "end-node","text": "备份成功","properties": {"condition": "${taskResult_task-node-01 === 'true'}","description": "备份成功时,直接流向唯一结束节点"}},{"id": "edge-backup-to-notify","sourceNodeId": "task-node-01","targetNodeId": "task-node-02","text": "备份失败","properties": {"condition": "${taskResult_task-node-01 === 'false'}","description": "备份失败时,先触发发送失败通知"}},{"id": "edge-notify-to-end","sourceNodeId": "task-node-02","targetNodeId": "end-node","text": "","properties": {"description": "失败通知发送完成后,流向唯一结束节点"}}],"properties": {"flowId": "db-backup-flow-001","flowName": "数据库自动备份流程(单结束节点版)","version": "1.0.1","description": "每日凌晨2点执行数据库备份:成功则直接结束,失败则先发通知再结束,所有分支最终汇聚到同一个结束节点"}
}
/*** 流程边模型* 定义节点之间的连接关系,支持条件判断*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Edge {private String id;                 // 边IDprivate String sourceNodeId;       // 源节点ID(从哪个节点出发)private String targetNodeId;       // 目标节点ID(到哪个节点)private String text;               // 边上显示的文本private Map<String, Object> properties;  // 边的属性,可包含condition条件表达式
}
/*** 流程节点模型* 支持所有节点类型:开始、任务、结束、并行网关、条件并行网关、分支网关、汇聚网关*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Node {private String id;         // 节点IDprivate String type;       // 节点类型:start/task/end/parallelGateway/conditionalParallelGateway/branchGateway/convergeGatewayprivate Integer x;         // x坐标private Integer y;         // y坐标private String text;       // 节点显示文本private Map<String, Object> properties;  // 节点自定义属性// 节点类型常量定义public static final String TYPE_START = "start";  // 开始节点public static final String TYPE_TASK = "task";    // 任务节点public static final String TYPE_END = "end";      // 结束节点// 网关类型public static final String TYPE_PARALLEL_GATEWAY = "parallelGateway";  // 并行网关public static final String TYPE_CONDITIONAL_PARALLEL_GATEWAY = "conditionalParallelGateway";  // 条件并行网关public static final String TYPE_BRANCH_GATEWAY = "branchGateway";      // 分支网关public static final String TYPE_CONVERGE_GATEWAY = "convergeGateway";  // 汇聚网关
}
/*** 流程上下文* 用于在节点之间传递数据、存储中间结果和跟踪流程状态*/
@Data
public class WorkflowContext {// 存储流程变量(节点间共享的数据)private Map<String, Object> variables = new HashMap<>();// 标记流程是否已结束private boolean processEnded = false;// 存储等待中的网关(用于汇聚网关)private Set<String> pendingGateways = new HashSet<>();// 脚本要执行的目标ipprivate List<String> executeIpList = new ArrayList<>();// 用于关联记录表和执行情况和ip详细执行情况private Long opsTaskExecutionInfoId;// 任务idprivate String taskId;// 任务记录private OpsTaskRecordPO opsTaskRecordPO;// 新增:并行分支完成状态跟踪(key=分支标识,value=是否完成)// 用ConcurrentHashMap保证多线程并行分支操作安全private Map<String, Boolean> branchCompletionStatus = new ConcurrentHashMap<>();// 线程安全的分支完成状态存储(汇聚网关依赖此判断)private final Map<String, Boolean> branchCompletedMap = new ConcurrentHashMap<>();/*** 设置变量*/public void setVariable(String key, Object value) {variables.put(key, value);}/*** 获取变量*/public Object getVariable(String key) {return variables.get(key);}/*** 添加等待中的网关*/public void addPendingGateway(String gatewayId) {pendingGateways.add(gatewayId);}/*** 创建上下文副本(用于并行网关的分支)*/public WorkflowContext copy() {WorkflowContext copy = new WorkflowContext();copy.setVariables(new HashMap<>(this.variables));  // 浅拷贝变量copy.setProcessEnded(this.processEnded);return copy;}/*** 标记某个分支已完成(并行分支执行完后调用)* @param branchKey 分支标识(用并行网关出口边的sourceNodeId,确保唯一)*/public void markBranchCompleted(String branchKey) {branchCompletedMap.put(branchKey, Boolean.TRUE);}/*** 判断某个分支是否完成*/public boolean isBranchCompleted(String branchKey) {return Boolean.TRUE.equals(branchCompletedMap.get(branchKey));}}
/*** 流程定义模型* 包含整个流程的所有节点、边和全局属性*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class WorkflowDefinition {private List<Node> nodes;          // 所有节点private List<Edge> edges;          // 所有边private Map<String, Object> properties;  // 流程全局属性(如flowId、flowName等)
}
// 任务表
@Data
@TableName(value = "ops_task_record")
public class OpsTaskRecordPO {/*** 周期任务名称*/@TableField(value = "task_name")private String taskName;/*** 任务模板主键id*/@TableField(value = "ops_task_template_id")private Long opsTaskTemplateId;/*** 任务模板名称*/@TableField(value = "ops_task_template_name")private String opsTaskTemplateName;/*** 任务模板配置(JSON格式)*/@TableField(value = "ops_task_template_flow_json")private String opsTaskTemplateFlowJson;/*** 任务类型 周期任务,定时任务*/@TableField(value = "ops_task_type")private String opsTaskType;/*** 执行类型:审批后立即执行/周期执行/定时执行*/@TableField(value = "execute_type")private String executeType;/*** 执行开始时间(yyyy-MM-dd HH:mm:ss)*/@TableField(value = "execute_start_time")@JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd HH:mm:ss" ,timezone = "GMT+8" )@JsonDeserialize(using = DateDeserializers.DateDeserializer.class)private Date executeStartTime;/*** 执行结束时间(yyyy-MM-dd HH:mm:ss)*/@TableField(value = "execute_end_time")@JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd HH:mm:ss" ,timezone = "GMT+8" )@JsonDeserialize(using = DateDeserializers.DateDeserializer.class)private Date executeEndTime;/*** 任务状态(0-未启用 1-运行中 2-已暂停 3-已结束)*/@TableField(value = "execute_status")private OpsTaskStatusEnum executeStatus;/*** 管理员*/@TableField(value = "administrator")private String administrator;/*** 首次开始执行时间(yyyy-MM-dd HH:mm:ss)*/@TableField(value = "first_execute_time")@JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd HH:mm:ss" ,timezone = "GMT+8" )@JsonDeserialize(using = DateDeserializers.DateDeserializer.class)private Date firstExecuteTime;/*** 最后一次执行开始时间(yyyy-MM-dd HH:mm:ss)*/@TableField(value = "last_execute_time")@JsonFormat(shape = JsonFormat.Shape.STRING, pattern="yyyy-MM-dd HH:mm:ss" ,timezone = "GMT+8" )@JsonDeserialize(using = DateDeserializers.DateDeserializer.class)private Date lastExecuteTime;/*** 任务周期(单位:天/周/月,如:1D/2W/1M)*/@TableField(value = "task_cycle")private String taskCycle;/*** 任务周期 cron*/@TableField(value = "task_cycle_cron")private String taskCycleCron;/*** 已执行周期数*/@TableField(value = "executed_cycles")private Integer executedCycles;
}
/*** 任务处理器接口* 不同类型的任务节点实现此接口*/
public interface TaskProcessor {/*** 执行任务* @param taskNode 任务节点* @param context 流程上下文* @return 任务执行结果*/TaskResult execute(Node taskNode, WorkflowContext context) throws Exception;
}
/*** 通知任务处理器* 处理"发送通知"类型的任务节点*/
@Component
public class EmailProcessor implements TaskProcessor {@Overridepublic TaskResult execute(Node taskNode, WorkflowContext context) throws Exception {}}
/*** 脚本任务处理器* */
@Component
public class ScriptTaskProcessor implements TaskProcessor {@Overridepublic TaskResult execute(Node taskNode, WorkflowContext context) throws Exception {}}
/*** 流程引擎核心类* 处理所有节点类型:开始、任务、结束、各种网关*/
@Component
public class WorkflowEngine {private static final Logger log = LoggerFactory.getLogger(WorkflowEngine.class);private static final long PARALLEL_BRANCH_TIMEOUT = 30; // 并行分支超时时间(分钟)private static final TimeUnit PARALLEL_TIMEOUT_UNIT = TimeUnit.MINUTES;@Autowiredprivate TaskProcessorFactory taskProcessorFactory;@Resource(name = "opsTaskExecutor")ThreadPoolExecutor opsTaskExecutor;/*** 执行流程*/public void execute(String workflowJson, WorkflowContext context) {// 校验入参if (workflowJson == null || context == null || context.getOpsTaskRecordPO() == null) {throw new IllegalArgumentException("流程JSON、上下文或任务记录不能为空");}OpsTaskRecordPO opsTaskRecordPO = context.getOpsTaskRecordPO();OpsTaskExecutionInfoPO opsTaskExecutionInfoPO = null;try {// 1. 解析流程定义WorkflowDefinition workflow = JSON.parseObject(workflowJson, WorkflowDefinition.class);if (workflow.getNodes() == null || workflow.getNodes().isEmpty()) {throw new RuntimeException("流程定义中无节点配置");}// 2. 查找开始节点Node startNode = workflow.getNodes().stream().filter(node -> Node.TYPE_START.equals(node.getType())).findFirst().orElseThrow(() -> new RuntimeException("流程定义中未找到开始节点"));// 3. 创建执行记录(首次执行)// 先有任务,再有执行记录,所以要根据recordId去查执行明细表Integer maxExecutedCycles = opsTaskExecutionInfoMapper.getMaxExecutedCycles(opsTaskRecordPO.getOpsTaskRecordId()); opsTaskExecutionInfoPO = new OpsTaskExecutionInfoPO();// set执行明细表的字段值 省略。使用mybatisplus 的注解主键字段,保存成功后会填充主键idopsTaskExecutionInfoMapper.save(opsTaskExecutionInfoPO);//将执行明细的id传递到流程上下文context.setOpsTaskExecutionInfoId(opsTaskExecutionInfoPO.getId());// 4. 执行流程(核心:此处会等待所有并行分支完成)processNode(startNode.getId(), workflow, context);// 5. 流程正常完成:统计最终状态updateFinalExecutionStatus(opsTaskExecutionInfoPO, context);} catch (Exception e) {LogUtil.info(log, "WorkflowEngine-execute", "WorkflowEngine-execute流程执行失败:{}", e);} finally {if (opsTaskExecutionInfoPO != null && opsTaskExecutionInfoPO.getExecuteEndTime() == null) {opsTaskExecutionInfoPO.setExecuteEndTime(new Date());updateExecutionInfo(opsTaskExecutionInfoPO);}}}/*** 处理节点(递归核心,保持原有逻辑结构,优化状态传递)*/private void processNode(String nodeId, WorkflowDefinition workflow, WorkflowContext context) {if (context.isProcessEnded()) {LogUtil.info(log, "processNode", "流程已结束,跳过节点:{}", nodeId);return;}// 获取当前节点Node currentNode = workflow.getNodes().stream().filter(node -> node.getId().equals(nodeId)).findFirst().orElseThrow(() -> new RuntimeException("未找到节点:" + nodeId));LogUtil.info(log, "processNode", "开始处理节点:{}({}),节点类型:{}",currentNode.getId(), currentNode.getText(), currentNode.getType());// 按节点类型处理switch (currentNode.getType()) {case Node.TYPE_START:processStartNode(currentNode, workflow, context);break;case Node.TYPE_TASK:processTaskNode(currentNode, workflow, context);break;case Node.TYPE_END:processEndNode(currentNode, context);break;case Node.TYPE_PARALLEL_GATEWAY://处理并行网关processParallelGateway(currentNode, workflow, context);break;case Node.TYPE_CONDITIONAL_PARALLEL_GATEWAY://处理条件并行网关processConditionalParallelGateway(currentNode, workflow, context);break;case Node.TYPE_BRANCH_GATEWAY://处理分支网关processBranchGateway(currentNode, workflow, context);break;case Node.TYPE_CONVERGE_GATEWAY://处理汇聚网关processConvergeGateway(currentNode, workflow, context);break;default:throw new RuntimeException("不支持的节点类型:" + currentNode.getType());}}/*** 处理开始节点(无逻辑变更,保持原有)*/private void processStartNode(Node startNode, WorkflowDefinition workflow, WorkflowContext context) {List<Edge> outgoingEdges = getOutgoingEdges(startNode.getId(), workflow);if (outgoingEdges.size() != 1) {throw new RuntimeException("开始节点必须有且仅有一条出口边,当前:" + outgoingEdges.size());}processNode(outgoingEdges.get(0).getTargetNodeId(), workflow, context);}/*** 处理任务节点(优化:变量存储线程安全,异常信息更详细)*/private void processTaskNode(Node taskNode, WorkflowDefinition workflow, WorkflowContext context) {String taskNodeId = taskNode.getId();try {// 获取任务处理器(策略模式,保持原有)TaskProcessor processor = taskProcessorFactory.getProcessor(taskNode);if (processor == null) {throw new RuntimeException("未找到任务节点[" + taskNodeId + "]的处理器");}// 执行任务LogUtil.info(log, "processTaskNode", "开始执行任务节点:{}", taskNodeId);TaskResult result = processor.execute(taskNode, context);LogUtil.info(log, "processTaskNode", "任务节点{}执行结果:{}", taskNodeId, result.isSuccess());// 线程安全存储任务结果(主上下文,无副本)context.setVariable("taskResult_" + taskNodeId, result.isSuccess());if (!result.isSuccess()) {context.setVariable("taskError_" + taskNodeId, result.getMessage());}// 继续下一个节点processNextNodes(taskNodeId, workflow, context);} catch (Exception e) {String errorMsg = "任务节点[" + taskNodeId + "]执行失败:" + e.getMessage();context.setVariable("taskError_" + taskNodeId, errorMsg);throw new RuntimeException(errorMsg, e);}}
/*** 处理结束节点(无逻辑变更,保持原有)*/private void processEndNode(Node endNode, WorkflowContext context) {LogUtil.info(log, "processEndNode", "流程结束于节点:{}({})", endNode.getId(), endNode.getText());context.setProcessEnded(true);}/*** 处理并行网关(优化核心:用ExecutorService+Future等待分支,无上下文副本)*/private void processParallelGateway(Node gatewayNode, WorkflowDefinition workflow, WorkflowContext context) {List<Edge> outgoingEdges = getOutgoingEdges(gatewayNode.getId(), workflow);if (outgoingEdges.isEmpty()) {throw new RuntimeException("并行网关[" + gatewayNode.getId() + "]必须至少有一条出口边");}String gatewayId = gatewayNode.getId();LogUtil.info(log, "processParallelGateway", "并行网关{}开始执行{}个分支", gatewayId, outgoingEdges.size());// 存储并行分支的Future(用于等待所有分支完成)List<Future<?>> branchFutures = new ArrayList<>(outgoingEdges.size());try {// 提交所有分支任务到线程池for (Edge edge : outgoingEdges) {String nextNodeId = edge.getTargetNodeId();String branchKey = edge.getSourceNodeId() + "_" + nextNodeId; // 唯一分支标识(网关ID+目标节点ID)// 提交任务(lambda捕获final变量,确保线程安全)Future<?> future = opsTaskExecutor.submit(() -> {try {LogUtil.info(log, "processParallelGateway", "并行网关{}分支{}开始执行", gatewayId, branchKey);processNode(nextNodeId, workflow, context); // 直接用主上下文,状态同步context.markBranchCompleted(branchKey); // 标记分支完成(主上下文)LogUtil.info(log, "processParallelGateway", "并行网关{}分支{}执行完成", gatewayId, branchKey);} catch (Exception e) {String errorMsg = "并行网关[" + gatewayId + "]分支[" + branchKey + "]执行失败";LogUtil.error(log, "processParallelGateway", errorMsg, e);context.setVariable("parallelBranchError_" + branchKey, errorMsg);throw new RuntimeException(errorMsg, e);}});branchFutures.add(future);}// 等待所有分支完成(加超时,避免永久阻塞)for (Future<?> future : branchFutures) {future.get(PARALLEL_BRANCH_TIMEOUT, PARALLEL_TIMEOUT_UNIT); // 超时抛出TimeoutException}LogUtil.info(log, "processParallelGateway", "并行网关{}所有分支执行完成", gatewayId);processNextNodes(gatewayId, workflow, context); // 继续下一个节点(通常是汇聚网关)} catch (TimeoutException e) {// 并行分支超时:标记所有未完成分支为失败String errorMsg = "并行网关[" + gatewayId + "]分支执行超时(超过" + PARALLEL_BRANCH_TIMEOUT + "分钟)";LogUtil.error(log, "processParallelGateway", errorMsg, e);context.setVariable("parallelGatewayTimeout_" + gatewayId, errorMsg);throw new RuntimeException(errorMsg, e);} catch (InterruptedException | ExecutionException e) {// 分支执行异常:包装异常并抛出String errorMsg = "并行网关[" + gatewayId + "]分支执行异常";LogUtil.error(log, "processParallelGateway", errorMsg, e);throw new RuntimeException(errorMsg, e.getCause());}}/*** 处理条件并行网关(复用并行网关逻辑,先筛选条件分支)*/private void processConditionalParallelGateway(Node gatewayNode, WorkflowDefinition workflow, WorkflowContext context) {List<Edge> outgoingEdges = getOutgoingEdges(gatewayNode.getId(), workflow);// 筛选满足条件的分支(用SpEL解析)List<Edge> qualifiedEdges = outgoingEdges.stream().filter(edge -> evaluateCondition(edge.getProperties().getOrDefault("condition", "true").toString(), context)).collect(Collectors.toList());if (qualifiedEdges.isEmpty()) {throw new RuntimeException("条件并行网关[" + gatewayNode.getId() + "]无满足条件的分支");}String gatewayId = gatewayNode.getId();LogUtil.info(log, "processConditionalParallelGateway", "条件并行网关{}筛选出{}个满足条件的分支", gatewayId, qualifiedEdges.size());// 替换网关的出口边为筛选后的分支,复用并行网关逻辑workflow.setEdges(workflow.getEdges().stream().filter(edge -> !edge.getSourceNodeId().equals(gatewayId) || qualifiedEdges.contains(edge)).collect(Collectors.toList()));// 调用并行网关处理逻辑(复用代码,减少冗余)processParallelGateway(gatewayNode, workflow, context);}/*** 处理分支网关(优化:用SpEL解析条件,逻辑更健壮)*/private void processBranchGateway(Node gatewayNode, WorkflowDefinition workflow, WorkflowContext context) {List<Edge> outgoingEdges = getOutgoingEdges(gatewayNode.getId(), workflow);if (outgoingEdges.isEmpty()) {throw new RuntimeException("分支网关[" + gatewayNode.getId() + "]无出口边");}// 查找第一个满足条件的分支Edge qualifiedEdge = null;for (Edge edge : outgoingEdges) {String condition = edge.getProperties().getOrDefault("condition", "true").toString();if (evaluateCondition(condition, context)) {qualifiedEdge = edge;break;}}// 无满足条件分支时,查找默认分支(无condition或condition=true)if (qualifiedEdge == null) {qualifiedEdge = outgoingEdges.stream().filter(edge -> !edge.getProperties().containsKey("condition")|| "true".equals(edge.getProperties().get("condition").toString())).findFirst().orElseThrow(() -> new RuntimeException("分支网关[" + gatewayNode.getId() + "]无满足条件分支,也无默认分支"));}LogUtil.info(log, "processBranchGateway", "分支网关{}选择分支:{}(目标节点:{})",gatewayNode.getId(), qualifiedEdge.getId(), qualifiedEdge.getTargetNodeId());processNode(qualifiedEdge.getTargetNodeId(), workflow, context);}/*** 处理汇聚网关(优化:依赖主上下文分支状态,判断逻辑正确)*/private void processConvergeGateway(Node gatewayNode, WorkflowDefinition workflow, WorkflowContext context) {List<Edge> incomingEdges = getIncomingEdges(gatewayNode.getId(), workflow);if (incomingEdges.isEmpty()) {LogUtil.warn(log, "processConvergeGateway", "汇聚网关[" + gatewayNode.getId() + "]无入口边,默认继续执行");processNextNodes(gatewayNode.getId(), workflow, context);return;}// 检查所有入口边对应的分支是否完成boolean allBranchesCompleted = checkAllBranchesCompleted(incomingEdges, context);if (!allBranchesCompleted) {// 分支未完成:标记pending(实际项目可结合定时任务重试,此处简化)String pendingMsg = "汇聚网关[" + gatewayNode.getId() + "]等待所有分支完成,暂存状态";LogUtil.info(log, "processConvergeGateway", pendingMsg);context.setVariable("convergeGatewayPending_" + gatewayNode.getId(), pendingMsg);return;}// 所有分支完成:继续执行LogUtil.info(log, "processConvergeGateway", "汇聚网关{}所有分支已完成,继续执行", gatewayNode.getId());processNextNodes(gatewayNode.getId(), workflow, context);}// ==================== 工具方法(优化后)====================/*** 获取节点出口边(无变更)*/private List<Edge> getOutgoingEdges(String nodeId, WorkflowDefinition workflow) {return workflow.getEdges().stream().filter(edge -> edge.getSourceNodeId().equals(nodeId)).collect(Collectors.toList());}/*** 获取节点入口边(无变更)*/private List<Edge> getIncomingEdges(String nodeId, WorkflowDefinition workflow) {return workflow.getEdges().stream().filter(edge -> edge.getTargetNodeId().equals(nodeId)).collect(Collectors.toList());}/*** 处理下一个节点(优化:增加多出口边日志提醒)*/private void processNextNodes(String currentNodeId, WorkflowDefinition workflow, WorkflowContext context) {if (context.isProcessEnded()) {return;}List<Edge> outgoingEdges = getOutgoingEdges(currentNodeId, workflow);if (outgoingEdges.size() > 1) {LogUtil.info(log, "processNextNodes", "节点{}存在{}条出口边,可能导致并行执行(非网关节点建议单出口)",currentNodeId, outgoingEdges.size());}for (Edge edge : outgoingEdges) {// 解析边的条件(SpEL)if (edge.getProperties().containsKey("condition") && !evaluateCondition(edge.getProperties().get("condition").toString(), context)) {LogUtil.info(log, "processNextNodes", "节点{}出口边{}条件不满足,跳过", currentNodeId, edge.getId());continue;}processNode(edge.getTargetNodeId(), workflow, context);}}/*** 评估条件表达式(简化实现,实际可集成SpEL等表达式引擎)*/private boolean evaluateCondition(String condition, WorkflowContext context) {// 示例:处理 ${backupResult === 'success'} 这样的表达式if (condition.startsWith("${") && condition.endsWith("}")) {String expr = condition.substring(2, condition.length() - 1);// 简单处理 backupResult === 'success' 这种形式if (expr.contains("===")) {String[] parts = expr.split("===");String varName = parts[0].trim();String expectedValue = parts[1].trim().replace("'", "");Object actualValue = context.getVariable(varName);return expectedValue.equals(actualValue != null ? actualValue.toString() : null);}}// 默认返回true(无条件或无法解析的条件)return true;}/*** 检查所有分支是否完成(优化:依赖主上下文的分支状态)*/private boolean checkAllBranchesCompleted(List<Edge> incomingEdges, WorkflowContext context) {for (Edge edge : incomingEdges) {// 分支标识:入口边的「源节点ID+目标节点ID」(与并行分支标记的一致)String branchKey = edge.getSourceNodeId() + "_" + edge.getTargetNodeId();if (!context.isBranchCompleted(branchKey)) {LogUtil.info(log, "checkAllBranchesCompleted", "汇聚网关入口边{}对应分支{}未完成", edge.getId(), branchKey);return false;}}return true;}/*** 更新执行记录(抽取通用方法,减少冗余)*/private void updateExecutionInfo(OpsTaskExecutionInfoPO executionInfoPO) {//更新执行状态,执行时间等字段,根据主键idexecutionInfoPO.setUpdateTime(LocalDateTime.now());//opsTaskExecutionInfoMapper.updateById();}/*** 统计并更新最终执行状态(抽取通用方法)*/private void updateFinalExecutionStatus(OpsTaskExecutionInfoPO executionInfoPO, WorkflowContext context) {Long executionInfoId = context.getOpsTaskExecutionInfoId();// executionInfoPO.setExecutionStatus(TicketExecutionStatusEnum.EXECUTED);
}
/*** 任务执行情况表 PO 类(与数据库表 ops_task_execution_info 映射)* @author: * @since: 2025/9/8 15:16*/
@Data
@TableName("ops_task_execution_info")
public class OpsTaskExecutionInfoPO {/*** 关联任务列表主键ID(对应表字段 ops_task_record_id)*/@TableField(value = "ops_task_record_id")private Long opsTaskRecordId;/*** 执行次数(第几次执行)(对应表字段 execute_count)*/@TableField(value = "execute_count")private Integer executeCount;/*** 执行开始时间(对应表字段 execute_start_time)* 备注:表中字段允许为 NULL,实体类用 Date 类型映射 datetime 字段*/@TableField(value = "execute_start_time")private Date executeStartTime;/*** 执行结束时间(对应表字段 execute_end_time)*/@TableField(value = "execute_end_time")private Date executeEndTime;/*** 执行状态(对应表字段 execution_status)*/@TableField(value = "execution_status")private TicketExecutionStatusEnum executionStatus;
}
/*** @author: * 任务模板* @since: 2025/9/1 13:41*/
@Data
@TableName("ops_task_template")
public class OpsTaskTemplatePO extends BasePO {/** 任务模板名称(脚本名称) */@TableField("task_template_name")private String taskTemplateName;/** 任务模板描述(脚本描述) */@TableField("task_template_desc")private String taskTemplateDesc;/** 流程配置JSON(脚本的核心配置) */@TableField("flow_json")private String flowJson;/** 流程唯一标识Key */@TableField("diagram_key")private String diagramKey;@TableField("market_diagram_key")private String marketDiagramKey;
}
  /*** 测试调用LTS 提交任务, 并接收任务执行反馈结果* https://www.cnblogs.com/MrYuChen-Blog/p/14803475.html* @author * @date 2025/9/15 11:07* @return Map<Object>*/@GetMapping("test01")public Map<String, Object> test01() {Job job = new Job();job.setTaskId("task-AAAAAAAAAAAAAAA");job.setCronExpression("0/3 * * * * ?");//设置任务类型 区分不同的任务 执行不同的业务逻辑//job.setParam("type", "aType");job.setNeedFeedback(true);//任务触发时间 如果设置了 cron 则该设置无效//job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());// job.setMaxRetryTimes(5);// 这个是 cron expression 和 quartz 一样,可选// job.setCronExpression(cronExpression);// 这个是指定执行时间,可选// job.setTriggerTime(new Date());// 当 cronExpression 和 triggerTime 都不设置的时候,默认是立即执行任务//任务执行节点组job.setTaskTrackerNodeGroup("test_TaskTracker");//当任务队列中存在这个任务的时候,是否替换更新job.setReplaceOnExist(true);Map<String, Object> submitResult = new HashMap<String, Object>(4);try {//任务提交返回值 responseResponse response = jobClient.submitJob(job);submitResult.put("success", response.isSuccess());submitResult.put("msg", response.getMsg());submitResult.put("code", response.getCode());} catch (Exception e) {log.error("提交任务失败", e);throw new RuntimeException("提交任务失败");}return submitResult;}
/*** * 任务具体的业务逻辑,此类可以被Quartz,LTS,xxl-job 调用,我使用的是LTS定时任务框架* @since: 2025/9/2 9:24*/
@Component
@JobRunner4TaskTracker
public class JobRunnerA implements JobRunner {private static final Logger log = LoggerFactory.getLogger(JobRunnerA.class);@ResourceOpsTaskRecordMapper opsTaskRecordMapper;@ResourceOpsTaskExecutionInfoMapper opsTaskExecutionInfoMapper;@ResourceWorkflowEngine workflowEngine;@ResourceJobClient jobClient;@Value("${lts.tasktracker.node-group}")private String taskTrackerNodeGroup;@Overridepublic Result run(JobContext jobContext) throws Throwable {Job job = jobContext.getJob();//扩展参数Map<String, String> extParams = job.getExtParams();//任务id,也是record表的主键idString taskId = job.getTaskId();LogUtil.info(log, "JobRunnerA", "JobRunnerA-{},开始执行", taskId);Date date = new Date();// 周期任务的有效时间String executeStartTime = extParams.get("executeStartTime");String executeEndTime = extParams.get("executeEndTime");//周期性任务应该判断当前时间是否在周期任务的有效时间范围内,如果在则执行任务OpsTaskRecordPO recordPO = opsTaskRecordMapper.getById(Long.parseLong(taskId));if (date.after(DateUtil.parseDate(executeStartTime)) && date.before(DateUtil.parseDate(executeEndTime))) {try {WorkflowContext workflowContext = new WorkflowContext();//将任务id传入上下文中workflowContext.setTaskId(taskId);//将任务对象放入上下文workflowContext.setOpsTaskRecordPO(recordPO);// 执行任务完成后,如果任务执行成功,则更新任务记录表的状态,记录执行情况表,ip表workflowEngine.execute(recordPO.getOpsTaskTemplateFlowJson(), workflowContext);// ops_task_execution_info 查询当前已执行次数(max(executedCycles),首次执行为0)Integer maxExecutedCycles = opsTaskExecutionInfoService.lambdaQuery().eq(OpsTaskExecutionInfoPO::getOpsTaskRecordId, recordPO.getId()).select(OpsTaskExecutionInfoPO::getExecuteCount).orderByDesc(OpsTaskExecutionInfoPO::getExecuteCount).last("LIMIT 1") // 取最大的一条.oneOpt() // 用oneOpt避免空指针,返回Optional.map(OpsTaskExecutionInfoPO::getExecuteCount).orElse(0); // 首次执行时默认0//  ops_task_record 计算新的执行次数(+1)并更新任务记录int newExecutedCycles = maxExecutedCycles + 1;opsTaskRecordService.lambdaUpdate().eq(OpsTaskRecordPO::getId, recordPO.getId()) // 补全更新条件:根据ID定位.set(OpsTaskRecordPO::getExecutedCycles, newExecutedCycles).set(OpsTaskRecordPO::getExecuteStatus, OpsTaskStatusEnum.PERIODIC_RUNNING) // 更新为执行中.update();} catch (Exception e) {LogUtil.info(log, "JobRunnerA", "JobRunnerA-{},更新任务记录为失败状态:{}", e);}} else {// 如果不在范围内,则不执行任务,并且任务取消掉,更新记录表的状态jobClient.cancelJob(taskId, taskTrackerNodeGroup);recordPO.setExecuteStatus(OpsTaskStatusEnum.COMPLETED);opsTaskRecordService.updateById(recordPO);}LogUtil.info(log, "JobRunnerA", "JobRunnerA-{},执行完毕", taskId);return new Result(Action.EXECUTE_SUCCESS);}}
  @ResourceWorkflowEngine workflowEngine;@PostMapping("workflowEngineTest")public Object workflowEngineTest(@RequestBody WorkflowDefinition flowMapJson){workflowEngine.execute(JSONUtil.toJsonStr(flowMapJson), new WorkflowContext());return "成功过孤寡孤寡孤寡嘎嘎嘎嘎嘎嘎";}
http://www.dtcms.com/a/410398.html

相关文章:

  • 基础组合计数(三道例题)
  • ShardingSphere 与分库分表:分布式数据库中间件实战指南
  • 《三重AI协作交易系统:从技术债泥潭到毫秒级响应的实战笔记》
  • AI 赋能楼宇自控 DDC 系统:重构智慧建筑的核心引擎
  • 更改wordpress密码上海关键词优化排名哪家好
  • 最好的设计师网站wordpress 实例
  • IDEA 实现SpringBoot热部署(HotSwap和DevTools混用)
  • 《IDEA 2025 长效使用指南:2099 年有效期配置实战之JetBrains全家桶有效》​
  • IntelliJ IDEA / Android Studio 里直接跑 Cursor(不用来回切窗口)
  • HarmonyOS应用前后台状态切换
  • 网站建设app销售好做吗哪里长沙网站开发
  • pdf文件根据页数解析成图片 js vue3
  • Http与WebSocket
  • AI 赋能 EMS 微电网能效管理平台:构建分布式能源的智能调控中枢
  • 内网信息收集与命令详解
  • 电茶炉方案开发,茶炉板MCU控制方案分析
  • React Zustand 学习笔记(对照Vue3)
  • PyTorch实现CIFAR-10图像分类:从数据加载到模型训练全流程
  • 鸿蒙应用内存优化全攻略:从泄漏排查到对象池实战
  • ReactUse 与ahook对比
  • 网站建设与维护属于什么岗位wordpress免费企业站主题
  • 长安网站设计仿照别的网站做
  • 如何快速定位bug,编写测试用例?
  • 【LeetCode 142】环形链表 II:寻找环的入口
  • 卷轴 缓冲绘制 超级玛丽demo5
  • 1.9 IP地址和Mac地址
  • C# WinForms的入门级画板实现
  • 云南网站建设方案简述营销型网站开发流程
  • 随时随地学算法:Hello-Algo与cpolar的远程学习方案
  • App 上架全流程指南,iOS 应用发布步骤、ipa 文件上传工具、TestFlight 分发与 App Store 审核经验分享