当前位置: 首页 > news >正文

springboot(40) : 数据断流告警

 检测逻辑

package com.alibaba.gts.flm.push.data.client.service;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.gts.flm.push.data.client.common.util.DateUtil;
import com.alibaba.gts.flm.push.data.client.service.model.FcoWarningKeyDTO;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Service
@Slf4j
public class FlowCutOffWarningService {


    @Autowired
    private RestTemplateService restTemplateService;

    @Value("${sendDingDingUrl:-}")
    private String sendDingDingApiUrl;

    @Value("${fcowTaskLimit:5}")
    private Integer fcowTaskLimit;

    private ScheduledExecutorService executorService;

    private Map<String, Long> keyTime = new ConcurrentHashMap<>();

    private Map<String, JSONObject> taskStatus = new ConcurrentHashMap<>();

    // uuid
    // 每秒许可数 = 12次 / 60秒
    private Map<String, RateLimiter> taskRateLimiters = new ConcurrentHashMap<>();

    private AtomicInteger threadCount;

    // 启动定时任务
    // limitVal : 每秒许可数 = 12次 / 60秒
    public void startTask(List<FcoWarningKeyDTO> fcoWarningKeyDTOS, Long second, Double limitVal) {
        if (fcoWarningKeyDTOS == null || fcoWarningKeyDTOS.size() == 0 || second == null || limitVal == 0) {
            throw new RuntimeException("参数为空");
        }
        if (executorService == null) {
            executorService = new ScheduledThreadPoolExecutor(fcowTaskLimit);
            threadCount = new AtomicInteger(0);
        }

        if (threadCount.get() >= fcowTaskLimit) {
            throw new RuntimeException("断流检测任务上限5个,需要启动更多任务请修改配置[fcowTaskLimit]的值");
        }
        String uuid = UUID.randomUUID().toString();
        taskRateLimiters.put(uuid, RateLimiter.create(limitVal));
        log.info("启动[{}秒]断流监测任务...uuid:{}", second, uuid);
        executorService.scheduleWithFixedDelay(() -> {
            try {
                //log.info("执行[{}秒]断流监测任务...", second);
                JSONObject status = new JSONObject();
                status.put("fcoWarningKeyDTOS", fcoWarningKeyDTOS);
                status.put("second", second);
                status.put("lastRunTime", DateUtil.now());
                taskStatus.put(uuid, status);
                StringBuilder sb = compare(fcoWarningKeyDTOS, second, keyTime);
                if (sb == null || sb.length() == 0) {
                    return;
                }
                if (!taskRateLimiters.get(uuid).tryAcquire()) {
                    log.warn("发送钉钉过于频繁,本次忽略,uuid:{},limitVal:{}", uuid, limitVal);
                    return;
                }
                sendDingDing(sb);
            } catch (Exception e) {
                log.error("断流监测任务执行出错,fcoWarningKeyDTOS:{},second:{}秒,error:{}", JSONObject.toJSONString(fcoWarningKeyDTOS), second, ExceptionUtils.getStackTrace(e));
            }
        }, second, second, TimeUnit.SECONDS);
        log.info("[{}秒]断流监测任务启动成功,uuid:{}", second, uuid);
        threadCount.set(threadCount.get() + 1);
    }

    public Map<String, JSONObject> getTaskStatus() {
        return taskStatus;
    }

    public Map<String, Long> getKeyTime() {
        return keyTime;
    }

    public Map<String, String> getKeyTimeFormat() {
        Map<String, String> map = new HashMap<>();
        keyTime.forEach((k, v) -> {
            map.put(k, DateUtil.format(v));
        });
        return map;
    }

    public void flow(String k) {
        keyTime.put(k, System.currentTimeMillis());
    }

    public StringBuilder compare(List<FcoWarningKeyDTO> fcoWarningKeyDTOS, Long second, Map<String, Long> keyTime) {
        StringBuilder sb = null;
        for (FcoWarningKeyDTO fcoWarningKeyDTO : fcoWarningKeyDTOS) {
            if (keyTime.containsKey(fcoWarningKeyDTO.getCode()) && (System.currentTimeMillis() - keyTime.get(fcoWarningKeyDTO.getCode())) / 1000 > second) {
                String lastTime = keyTime.containsKey(fcoWarningKeyDTO.getCode()) ? DateUtil.format(keyTime.get(fcoWarningKeyDTO.getCode())) : "-";
                sb = append(sb, fcoWarningKeyDTO, lastTime, second);
            }
        }
        return sb;
    }

    private StringBuilder append(StringBuilder sb, FcoWarningKeyDTO fcoWarningKeyDTO, String lastTime, Long second) {
        if (sb == null) {
            sb = new StringBuilder();
        }
        sb.append("#### " + "[" + fcoWarningKeyDTO.getName() + "]数据断流").append("\n\n");
        sb.append(" > 时间: ").append(DateUtil.now()).append("\n\n");
        sb.append(" > 描述: ").append(second).append("秒内无数据").append("\n\n");
        sb.append(" > 上次数据时间: " + lastTime).append("\n\n");
        return sb;
    }

    private void sendDingDing(StringBuilder sb) {
        JSONObject req = new JSONObject();
        JSONObject at = new JSONObject();
        at.put("isAtAll", "false");
        req.put("title", "数据断流警告");
        req.put("text", sb);
        req.put("at", at);
        restTemplateService.post(sendDingDingApiUrl, req);
    }
}

使用



    @Autowired
    private FlowCutOffWarningService flowCutOffWarningService;
List<FcoWarningKeyDTO> fcoWarningKeyDTOS3Second = new LinkedList<FcoWarningKeyDTO>() {{
            add(new FcoWarningKeyDTO("test", "测试"));
        }};
        flowCutOffWarningService.startTask(fcoWarningKeyDTOS3Second, 3L, 0.003);

RestTemplateService

springboot(39) : RestTemplate完全体_Lxinccode的博客-CSDN博客

http://www.dtcms.com/a/1301.html

相关文章:

  • redis--windows配置--redis基础
  • 网络安全(黑客)自学
  • 网络基础-应用层协议-HTTP/HTTPS
  • 算法训练 第二周
  • SpringMvc 之crud增删改查应用
  • [论文笔记]RE2
  • 电脑死机的时候,CPU到底在做什么?
  • CSS3技巧36:backdrop-filter 背景滤镜
  • (图论) ——【Leetcode每日一题】
  • 外包干了2个月,技术退步明显了...
  • 为什么选择C/C++内存检测工具AddressSanitizer?如何使用AddressSanitizer?
  • 第5篇 vue的通信框架axios和ui框架-element-ui以及node.js
  • 关于老项目从JDK8升级到JDK17所需要注意的细节
  • 推荐书目:Python从入门到精通(文末送书)
  • 第25节-PhotoShop基础课程-文本工具组
  • 14:00面试,14:06就出来了,问的问题有点变态。。。
  • 盲打键盘的正确指法指南
  • Bash常见快捷键
  • 2023年会展行业研究报告
  • thinkPhp5返回某些指定字段
  • Postman应用——接口请求(Get和Post请求)
  • C++中的auto是一个关键字,用于在编译时自动推导变量的类型
  • 动态规划问题
  • Python的pandas库来实现将Excel文件转换为JSON格式的操作
  • 联发科3纳米芯片预计2024年量产,此前称仍未获批给华为供货
  • 怎么把两首歌曲拼接在一起?
  • Ubuntu20.4搭建基于iRedMail的邮件服务器
  • HTML显示中文空格字符,emsp;一个中文字符,ensp;半个中文字符
  • [.NET学习笔记] - Thread.Sleep与Task.Delay在生产中应用的性能测试
  • Linux static_key原理与应用