【任务调度】DolphinScheduler钉钉告警消息格式修改
目录
一 、概述
二、钉钉告警消息修改
一 、概述
Apache DolphinScheduler 是一个分布式易扩展的可视化 DAG 工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。
Apache DolphinScheduler 旨在解决复杂的大数据任务依赖关系,并为应用程序提供数据和各种 OPS 编排中的关系。解决数据研发 ETL 依赖错综复杂,无法监控任务健康状态的问题。DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式组装任务,可以及时监控任务的执行状态,支持重试、指定节点恢复失败、暂停、恢复、终止任务等操作。
二、钉钉告警消息修改
使用的DS版本:dolphinscheduler-3.1.8
修改的类
org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSender.java修改的主要方法 generateMarkdownMsg
完整类代码如下:
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.utils.JSONUtils;import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.fasterxml.jackson.databind.JsonNode;/*** <p>* <a href="https://open.dingtalk.com/document/robots/custom-robot-access">...</a>* <a href="https://open.dingtalk.com/document/robots/customize-robot-security-settings">...</a>* </p>*/
public final class DingTalkSender {private static final Logger logger = LoggerFactory.getLogger(DingTalkSender.class);private final String url;private final String keyword;private final String secret;private String msgType;private final String atMobiles;private final String atUserIds;private final Boolean atAll;private final Boolean enableProxy;private String proxy;private Integer port;private String user;private String password;DingTalkSender(Map<String, String> config) {url = config.get(DingTalkParamsConstants.NAME_DING_TALK_WEB_HOOK);keyword = config.get(DingTalkParamsConstants.NAME_DING_TALK_KEYWORD);secret = config.get(DingTalkParamsConstants.NAME_DING_TALK_SECRET);msgType = config.get(DingTalkParamsConstants.NAME_DING_TALK_MSG_TYPE);atMobiles = config.get(DingTalkParamsConstants.NAME_DING_TALK_AT_MOBILES);atUserIds = config.get(DingTalkParamsConstants.NAME_DING_TALK_AT_USERIDS);atAll = Boolean.valueOf(config.get(DingTalkParamsConstants.NAME_DING_TALK_AT_ALL));enableProxy = Boolean.valueOf(config.get(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE));if (Boolean.TRUE.equals(enableProxy)) {port = Integer.parseInt(config.get(DingTalkParamsConstants.NAME_DING_TALK_PORT));proxy = config.get(DingTalkParamsConstants.NAME_DING_TALK_PROXY);user = config.get(DingTalkParamsConstants.NAME_DING_TALK_USER);password = config.get(DingTalkParamsConstants.NAME_DING_TALK_PASSWORD);}}private static HttpPost constructHttpPost(String url, String msg) {HttpPost post = new HttpPost(url);StringEntity entity = new StringEntity(msg, StandardCharsets.UTF_8);post.setEntity(entity);post.addHeader("Content-Type", "application/json; charset=utf-8");return post;}private static CloseableHttpClient getProxyClient(String proxy, int port, String user, String password) {HttpHost httpProxy = new HttpHost(proxy, port);CredentialsProvider provider = new BasicCredentialsProvider();provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, password));return HttpClients.custom().setDefaultCredentialsProvider(provider).build();}private static CloseableHttpClient getDefaultClient() {return HttpClients.createDefault();}private static RequestConfig getProxyConfig(String proxy, int port) {HttpHost httpProxy = new HttpHost(proxy, port);return RequestConfig.custom().setProxy(httpProxy).build();}private AlertResult checkSendDingTalkSendMsgResult(String result) {AlertResult alertResult = new AlertResult();alertResult.setStatus("false");if (null == result) {alertResult.setMessage("send ding talk msg error");logger.info("send ding talk msg error,ding talk server resp is null");return alertResult;}DingTalkSendMsgResponse sendMsgResponse = JSONUtils.parseObject(result, DingTalkSendMsgResponse.class);if (null == sendMsgResponse) {alertResult.setMessage("send ding talk msg fail");logger.info("send ding talk msg error,resp error");return alertResult;}if (sendMsgResponse.errcode == 0) {alertResult.setStatus("true");alertResult.setMessage("send ding talk msg success");return alertResult;}alertResult.setMessage(String.format("alert send ding talk msg error : %s", sendMsgResponse.getErrmsg()));logger.info("alert send ding talk msg error : {}", sendMsgResponse.getErrmsg());return alertResult;}/*** send dingtalk msg handler** @param title title* @param content content* @return*/public AlertResult sendDingTalkMsg(String title, String content) {AlertResult alertResult;try {String resp = sendMsg(title, content);return checkSendDingTalkSendMsgResult(resp);} catch (Exception e) {logger.info("send ding talk alert msg exception : {}", e.getMessage());alertResult = new AlertResult();alertResult.setStatus("false");alertResult.setMessage("send ding talk alert fail.");}return alertResult;}private String sendMsg(String title, String content) throws IOException {String msg = generateMsgJson(title, content);HttpPost httpPost = constructHttpPost(org.apache.commons.lang3.StringUtils.isBlank(secret) ? url : generateSignedUrl(), msg);CloseableHttpClient httpClient;if (Boolean.TRUE.equals(enableProxy)) {httpClient = getProxyClient(proxy, port, user, password);RequestConfig rcf = getProxyConfig(proxy, port);httpPost.setConfig(rcf);} else {httpClient = getDefaultClient();}try {CloseableHttpResponse response = httpClient.execute(httpPost);String resp;try {HttpEntity entity = response.getEntity();resp = EntityUtils.toString(entity, "UTF-8");EntityUtils.consume(entity);} finally {response.close();}logger.info("Ding Talk send msg :{}, resp: {}", msg, resp);return resp;} finally {httpClient.close();}}/*** generate msg json** @param title title* @param content content* @return msg*/private String generateMsgJson(String title, String content) {if (org.apache.commons.lang3.StringUtils.isBlank(msgType)) {msgType = DingTalkParamsConstants.DING_TALK_MSG_TYPE_TEXT;}Map<String, Object> items = new HashMap<>();items.put("msgtype", msgType);Map<String, Object> text = new HashMap<>();items.put(msgType, text);if (DingTalkParamsConstants.DING_TALK_MSG_TYPE_MARKDOWN.equals(msgType)) {generateMarkdownMsg(title, content, text);} else {generateTextMsg(title, content, text);}setMsgAt(items);return JSONUtils.toJsonString(items);}/*** generate text msg** @param title title* @param content content* @param text text*/private void generateTextMsg(String title, String content, Map<String, Object> text) {StringBuilder builder = new StringBuilder(title);builder.append("\n");builder.append(content);if (org.apache.commons.lang3.StringUtils.isNotBlank(keyword)) {builder.append(" ");builder.append(keyword);}byte[] byt = StringUtils.getBytesUtf8(builder.toString());String txt = StringUtils.newStringUtf8(byt);text.put("content", txt);}/*** generate markdown msg** @param title title* @param content content* @param text text*/private void generateMarkdownMsg(String title, String content, Map<String, Object> text) {logger.info("DingTalk alert content: {}", content);StringBuilder builder = new StringBuilder();builder.append("## 🚨 ").append("DolphinScheduler任务执行失败告警通知").append(" ").append("\n\n");builder.append("以下任务执行失败,请及时处理:").append("\n\n");builder.append("---\n\n");builder.append("- **工作流名称**: ").append(extractInfo(content, "工作流名称")).append("\n\n");builder.append("- **项目名称**: ").append(extractInfo(content, "项目名称")).append("\n\n");builder.append("- **任务名称**: ").append(extractInfo(content, "任务名称")).append("\n\n");builder.append("- **开始时间**: ").append(extractInfo(content, "开始时间")).append("\n\n");builder.append("- **结束时间**: ").append(extractInfo(content, "结束时间")).append("\n\n");builder.append("- **运行时长**: ").append(extractInfo(content, "运行时长")).append("\n\n");builder.append("- **执行主机**: ").append(extractInfo(content, "执行主机")).append("\n\n");builder.append("- **告警时间**: ").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\n\n");builder.append("---\n\n");handleAtMention(builder);byte[] byt = StringUtils.getBytesUtf8(builder.toString());String txt = StringUtils.newStringUtf8(byt);text.put("title", title);text.put("text", txt);}/*** 如何配置了艾特的人,告警消息增加信息** @param builder 告警信息*/private void handleAtMention(StringBuilder builder) {if (org.apache.commons.lang3.StringUtils.isNotBlank(atMobiles) ||org.apache.commons.lang3.StringUtils.isNotBlank(atUserIds)) {builder.append("### 通知人员\n");}if (org.apache.commons.lang3.StringUtils.isNotBlank(atMobiles)) {Arrays.stream(atMobiles.split(",")).forEach(mobile -> {builder.append("@").append(mobile).append(" ");});}if (org.apache.commons.lang3.StringUtils.isNotBlank(atUserIds)) {Arrays.stream(atUserIds.split(",")).forEach(userId -> {builder.append("@").append(userId).append(" ");});}if (Boolean.TRUE.equals(atAll)) {builder.append("\n> **通知全体成员**");}}/*** 解析任务消息Context** @param content 任务信息JSON对象字符串* @param key JSON对象字符串Key* @return 解析JSON后的Value*/private String extractInfo(String content, String key) {try {JsonNode rootNode = JSONUtils.parseObject(content, JsonNode.class);if (rootNode.isArray() && !rootNode.isEmpty()) {JsonNode firstNode = rootNode.get(0);switch (key) {case "工作流名称":return firstNode.get("processName").asText();case "任务名称":return firstNode.get("taskName").asText();case "项目名称":return firstNode.get("projectName").asText();case "状态":return firstNode.get("taskState").asText();case "开始时间":return firstNode.get("taskStartTime").asText();case "结束时间":return firstNode.get("taskEndTime").asText();case "执行主机":return firstNode.get("taskHost").asText();case "运行时长":String start = firstNode.get("taskStartTime").asText();String end = firstNode.get("taskEndTime").asText();return formatDuration(start, end);default:return "未知";}}} catch (Exception e) {logger.error("解析 content JSON 失败", e);}return "未知";}/*** 任务开始时间和结束时间计算运行时间* * @param startTime 任务开始时间* @param endTime 任务结束时间* @return 任务运行时间*/private String formatDuration(String startTime, String endTime) {try {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long startMs = sdf.parse(startTime).getTime();long endMs = sdf.parse(endTime).getTime();long seconds = (endMs - startMs) / 1000;if (seconds < 60) {return seconds + "s";} else {long minutes = seconds / 60;long secs = seconds % 60;return String.format("%d分%02d秒", minutes, secs);}} catch (Exception e) {return "未知";}}/*** configure msg @person** @param items items*/private void setMsgAt(Map<String, Object> items) {Map<String, Object> at = new HashMap<>();String[] atMobileArray =org.apache.commons.lang3.StringUtils.isNotBlank(atMobiles) ? atMobiles.split(","): new String[0];String[] atUserArray =org.apache.commons.lang3.StringUtils.isNotBlank(atUserIds) ? atUserIds.split(","): new String[0];boolean isAtAll = Objects.isNull(atAll) ? false : atAll;at.put("atMobiles", atMobileArray);at.put("atUserIds", atUserArray);at.put("isAtAll", isAtAll);items.put("at", at);}/*** generate sign url** @return sign url*/private String generateSignedUrl() {Long timestamp = System.currentTimeMillis();String stringToSign = timestamp + "\n" + secret;String sign = org.apache.commons.lang3.StringUtils.EMPTY;try {Mac mac = Mac.getInstance("HmacSHA256");mac.init(new SecretKeySpec(secret.getBytes("UTF-8"), "HmacSHA256"));byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8"));sign = URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8");} catch (Exception e) {logger.error("generate sign error, message:{}", e);}return url + "×tamp=" + timestamp + "&sign=" + sign;}static final class DingTalkSendMsgResponse {private Integer errcode;private String errmsg;public DingTalkSendMsgResponse() {}public Integer getErrcode() {return this.errcode;}public void setErrcode(Integer errcode) {this.errcode = errcode;}public String getErrmsg() {return this.errmsg;}public void setErrmsg(String errmsg) {this.errmsg = errmsg;}@Overridepublic boolean equals(final Object o) {if (o == this) {return true;}if (!(o instanceof DingTalkSendMsgResponse)) {return false;}final DingTalkSendMsgResponse other = (DingTalkSendMsgResponse) o;final Object this$errcode = this.getErrcode();final Object other$errcode = other.getErrcode();if (this$errcode == null ? other$errcode != null : !this$errcode.equals(other$errcode)) {return false;}final Object this$errmsg = this.getErrmsg();final Object other$errmsg = other.getErrmsg();if (this$errmsg == null ? other$errmsg != null : !this$errmsg.equals(other$errmsg)) {return false;}return true;}@Overridepublic int hashCode() {final int PRIME = 59;int result = 1;final Object $errcode = this.getErrcode();result = result * PRIME + ($errcode == null ? 43 : $errcode.hashCode());final Object $errmsg = this.getErrmsg();result = result * PRIME + ($errmsg == null ? 43 : $errmsg.hashCode());return result;}@Overridepublic String toString() {return "DingTalkSender.DingTalkSendMsgResponse(errcode=" + this.getErrcode() + ", errmsg="+ this.getErrmsg() + ")";}}
}
海豚中配置钉钉告警如下文:
【调度器】DolphinScheduler任务钉钉告警_dolphinscheduler告警实例-CSDN博客
修改完成后的代码 打包更新
修改前的钉钉告警消息格式:
修改后的钉钉告警消息