当前位置: 首页 > news >正文

用spring-webmvc包实现AI(Deepseek)事件流(SSE)推送

 前后端:  Spring Boot + Angular

spring-webmvc-5.2.2包

代码片段如下:

控制层:

@GetMapping(value = "/realtime/page/ai/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ApiOperation(value = "获取告警记录进行AI分析")
    public SseEmitter getRealTimeAlarmAi(AlarmRecordQueryParam queryParam) {
        final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
        IPage<AlarmRecord> page = alarmRecordService.findRealTimeByParam(queryParam);
        StringBuilder alarmInfo = new StringBuilder();
        try {
            // 根据状态设置前缀
            String prefix = queryParam.getStatus() == 1 ?
                    "最近十条历史告警记录:" : "当前十条实时告警信息:";
            String emptyMessage = queryParam.getStatus() == 1 ?
                    "暂无历史告警" : "当前无实时告警";
            if (page.getRecords() != null && !page.getRecords().isEmpty()) {
                alarmInfo.append(prefix);
                sseService.buildAlarmContent(page, alarmInfo, timeFormatter);
            } else {
                alarmInfo.append(emptyMessage);
            }
            sseService.validatePromptLength(alarmInfo, maxPromptLength);
        } catch (Exception e) {
            log.error("告警信息处理异常", e);
        }
        return sseService.createStreamConnection(alarmInfo.toString(), "告警");
    }
    @ApiOperation("查询图表数据用AI分析数据详情")
    @GetMapping("/chart/ai/sse")
    @OpLog(
            inputExpression = "开始时间:{#queryParam.startTime},结束时间:{#queryParam.endTime},图表ID:{#queryParam.chartId}",
            outputExpression = "{#code}"
    )
    public SseEmitter chartAiSSEData(@Validated ChartDataQueryParam queryParam) throws Exception {
        String ChartAi = "报表";
        ChartInstance chart = Optional.ofNullable(chartService.getById(queryParam.getChartId()))
                .orElseThrow(() -> new Exception("找不到:" + queryParam.getChartId() + "的图表定义"));
        List<ChartDeviceSensor> deviceSensors = ChartInstance.toChartDeviceSensorList(chart);
        String endTime = DataQueryParam.endTime(queryParam.getEndTime());
        DataQueryParam dataQueryParam = new DataQueryParam(queryParam.getStartTime(), endTime, deviceSensors);
        IChartDataService chartDataService = chartDataServiceManager.getInstance(chart.getChartTypeId());
        List dataList = chartDataService.getChartData(dataQueryParam);
        List<String> times = dataQueryParam.getDateType().getTimes(dataQueryParam.getStartTime(), dataQueryParam.getEndTime());
        ChartData chartData = new ChartData<>(chart.getId(), chart.getName(), chart.getChartFormat(), chart.getChartTypeId(), chart.getShowType(), chart.getCategoryId(), times, dataList);
        // 将 ChartData 转换为压缩字符串
        String csvData = ChartDataFormatter.formatChartData(chartData);
        log.info("当前请求字符长度:" + csvData.length());
        try {
            if (csvData.length() > maxPromptLength) {
                OpLogAspect.setCode(400); // 设置错误码
                throw new IllegalArgumentException("数据长度超过限制,最大允许长度:" + maxPromptLength);
            }
            OpLogAspect.setCode(200);
            return sseService.createStreamConnection(csvData,ChartAi);
        } catch (IllegalArgumentException e) {
            OpLogAspect.setCode(400); // 参数错误
            throw e;
        } catch (Exception e) {
            OpLogAspect.setCode(500); // 系统错误
            throw new RuntimeException("处理请求失败", e);
        }
    }

业务层代码:

package com.keydak.project.core.chart.ai.service.impl;

import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.keydak.project.core.alarm.data.vo.AlarmRecord;
import com.keydak.project.core.chart.ai.dto.KeydakAiConfigDTO;
import com.keydak.project.core.chart.ai.exception.BalanceException;
import com.keydak.project.core.chart.ai.service.SSEService;
import com.keydak.repository.core.enums.SystemGlobalConfigEnum;
import com.keydak.repository.core.service.ISystemGlobalConfigService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.fasterxml.jackson.core.type.TypeReference;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * AI服务实现
 *
 * @author xyt
 */
@Service
@Slf4j
public class SSEServiceImpl implements SSEService {

    @Autowired
    private ISystemGlobalConfigService systemGlobalConfigService;


    private final ObjectMapper objectMapper = new ObjectMapper();
    private RateLimiter rateLimiter;

    @PostConstruct
    public void init() {
        try {
            // 初始化限流器时动态获取配置
            KeydakAiConfigDTO initialConfig = getConfig();
            rateLimiter = new RateLimiter(initialConfig.getRateLimit());
        } catch (Exception e) {
            throw new RuntimeException("初始化失败,无法获取Keydak AI配置", e);
        }
    }

    // 线程池配置
    private static final int CORE_POOL_SIZE = 5; // 核心线程数
    private static final int MAX_POOL_SIZE = 8; // 最大线程数
    private static final long KEEP_ALIVE_TIME = 30;  // 线程空闲时间
    private static final int QUEUE_CAPACITY = 30; //队列
    private final ExecutorService executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.AbortPolicy() // 使用 AbortPolicy 直接拒绝任务而不执行
    );

    /**
     * 刷新限流器配置
     */
    @Override
    public synchronized void refreshRateLimiter(Integer rate) {
        try {
            if (rateLimiter == null) {
                rateLimiter = new RateLimiter(rate);
            } else {
                rateLimiter.updateRate(rate);
            }
            log.info("限流器已更新,新速率限制: {}", rate);
        } catch (Exception e) {
            log.error("刷新限流器配置失败", e);
        }
    }

    @PreDestroy
    public void destroy() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 获取Keydak AI配置信息。
     *
     * @return 返回Keydak AI配置信息的DTO对象
     */
    private KeydakAiConfigDTO getConfig() throws Exception {
        KeydakAiConfigDTO config = systemGlobalConfigService.getTag(
                SystemGlobalConfigEnum.KEYDAK_AI_CONFIG,
                KeydakAiConfigDTO.class
        );

        if (config == null) {
            throw new Exception("Keydak AI配置不存在");
        }

        return config;
    }


    @Override
    public SseEmitter createStreamConnection(String message, String aiType) {
        SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时

        try {
            KeydakAiConfigDTO config = getConfig();
            double balance = getBalance();
            log.info("当前余额: {} 元", balance);
            log.warn("当前可用令牌数: {}", rateLimiter.tokens.get());
            if (!rateLimiter.tryAcquire()) {
                log.warn("请求被限流 | 当前允许的QPS:{}", config.getRateLimit());
                handleRateLimitError(emitter);
                return emitter;
            }
        } catch (BalanceException e) {
            handleBalanceError(emitter, e.getMessage());
            return emitter;
        } catch (Exception e) {
            handleBalanceError(emitter, "系统错误: " + e.getMessage());
            return emitter;
        }

        // 保持原有事件监听
        emitter.onCompletion(() -> log.info("SSE连接完成"));
        emitter.onTimeout(() -> {
            log.warn("SSE连接超时");
            rateLimiter.refill(); // 超时请求返还令牌
        });
        emitter.onError(e -> log.error("SSE连接错误", e));

        // 保持原有线程池处理
        executor.execute(() -> {
            try {
                processSSEStream(message, aiType, emitter);
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }

    /**
     * 新增限流错误处理方法
     *
     * @param emitter 事件发射器
     * @throws IOException 如果发送失败
     */
    private void handleRateLimitError(SseEmitter emitter) {
        try {
            Map<String, Object> error = new LinkedHashMap<>();
            error.put("error", "rate_limit_exceeded");
            error.put("message", "请求过于频繁,请稍后再试");
            error.put("timestamp", System.currentTimeMillis());

            emitter.send(SseEmitter.event()
                    .data(objectMapper.writeValueAsString(error))
                    .name("rate-limit-error")
                    .reconnectTime(5000L));

            emitter.complete();
        } catch (IOException e) {
            log.error("发送限流错误失败", e);
        }
    }

    private void handleBalanceError(SseEmitter emitter, String errorMsg) {
        try {
            JSONObject error = new JSONObject();
            error.put("error", "balance_insufficient");
            error.put("message", errorMsg);
            emitter.send(SseEmitter.event()
                    .data(error.toJSONString())
                    .name("balance-error"));
            emitter.complete();
        } catch (Exception e) {
            log.error("发送余额错误信息失败", e);
        }
    }


    private void processSSEStream(String message, String aiType, SseEmitter emitter) throws Exception {
        HttpURLConnection connection = null;
        try {
            connection = createConnection();
            String jsonBody = buildRequestBody(message, aiType);
            log.info("发送AI请求数据: {}", jsonBody); // 记录请求体
            sendRequestData(connection, jsonBody);
            validateResponse(connection);
            try (InputStream inputStream = connection.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedException("处理被中断");
                    }
                    if (line.startsWith("data: ")) {
                        String jsonData = line.substring(6).trim();
                        log.debug("AI响应数据: {}", jsonData);
                        if ("[DONE]".equals(jsonData)) {
                            log.info("收到流结束标记");
                            sendCompletionEvent(emitter);  // 发送完成事件
                            break;  // 结束循环
                        }
                        try {
                            processStreamData(emitter, jsonData);
                        } catch (Exception e) {
                            log.error("数据处理失败,终止连接", e);
                            emitter.completeWithError(e);
                            break;
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("SSE处理发生异常", e);
            throw e;
        } finally {
            if (connection != null) connection.disconnect();
        }
    }

    private void processStreamData(SseEmitter emitter, String jsonData) throws Exception {
        try {
            Map<String, Object> apiResponse = objectMapper.readValue(
                    jsonData,
                    new TypeReference<Map<String, Object>>() {
                    }
            );

            List<Map<String, Object>> choices = (List<Map<String, Object>>) apiResponse.get("choices");
            if (choices == null || choices.isEmpty()) return;

            Map<String, Object> choice = choices.get(0);
            Map<String, Object> delta = (Map<String, Object>) choice.get("delta");

            Map<String, Object> chunk = new LinkedHashMap<>();
            chunk.put("timestamp", System.currentTimeMillis());
            chunk.put("messageId", UUID.randomUUID().toString());

            // 处理思考过程
            if (delta.containsKey("reasoning_content")) {
                String reasoning = (String) delta.get("reasoning_content");
                if (reasoning != null && !reasoning.trim().isEmpty()) {
                    chunk.put("type", "reasoning");
                    chunk.put("content", reasoning);
                    sendChunk(emitter, chunk);
                }
            }

            // 处理正式回答
            if (delta.containsKey("content")) {
                String content = (String) delta.get("content");
                if (content != null) {
                    chunk.put("type", "answer");
                    chunk.put("content", content);
                    sendChunk(emitter, chunk);
                }
            }

        } catch (JsonProcessingException e) {
            log.error("JSON解析失败 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());
            throw new IOException("Failed to process stream data", e);
        } catch (ClassCastException e) {
            log.error("数据结构类型错误 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());
            throw new IllegalStateException("Invalid data structure", e);
        } catch (Exception e) {
            log.error("处理数据块时发生未知错误 | 原始数据: {}", jsonData, e);
            throw e;
        }
    }

    private void sendChunk(SseEmitter emitter, Map<String, Object> chunk) throws IOException {
        String chunkJson = objectMapper.writeValueAsString(chunk);
        log.debug("发送数据块: {}", chunkJson);

        SseEmitter.SseEventBuilder event = SseEmitter.event()
                .data(chunkJson)
                .id(UUID.randomUUID().toString())
                .name("ai-message")
                .reconnectTime(5000L);

        emitter.send(event);
    }


    private void sendCompletionEvent(SseEmitter emitter) {
        try {
            Map<String, Object> completionEvent = new LinkedHashMap<>();
            completionEvent.put("event", "done");
            completionEvent.put("timestamp", System.currentTimeMillis());
            completionEvent.put("messageId", UUID.randomUUID().toString());

            String eventJson = objectMapper.writeValueAsString(completionEvent);

            emitter.send(SseEmitter.event()
                    .data(eventJson)
                    .id("COMPLETION_EVENT")
                    .name("stream-end")
                    .reconnectTime(0L));  // 停止重连

            log.info("已发送流结束事件");
        } catch (IOException e) {
            log.error("发送完成事件失败", e);
        } finally {
            emitter.complete();
            log.info("SSE连接已关闭");
        }
    }

    private HttpURLConnection createConnection() throws Exception {
        KeydakAiConfigDTO config = getConfig();
        HttpURLConnection connection = (HttpURLConnection) new URL(config.getUrl()).openConnection();
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestProperty("Authorization", "Bearer " + config.getKey());
        connection.setRequestProperty("Accept", "text/event-stream");
        connection.setConnectTimeout(30_000);
        connection.setReadTimeout(120_000);
        return connection;
    }

    private void sendRequestData(HttpURLConnection connection, String jsonBody) throws Exception {
        try (OutputStream os = connection.getOutputStream()) {
            os.write(jsonBody.getBytes(StandardCharsets.UTF_8));
            os.flush();
        }
    }

    private void validateResponse(HttpURLConnection connection) throws Exception {
        if (connection.getResponseCode() != 200) {
            String errorMsg = readErrorStream(connection);
            throw new RuntimeException("API请求失败: " + connection.getResponseCode() + " - " + errorMsg);
        }
    }

    private String readErrorStream(HttpURLConnection connection) throws IOException {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8))) {
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = reader.readLine()) != null) {
                response.append(line);
            }
            return response.toString();
        }
    }

    private String buildRequestBody(String userMessage, String aiType) throws IOException {
        KeydakAiConfigDTO config = null;
        try {
            config = getConfig();
        } catch (Exception e) {
            e.printStackTrace();
        }

        Map<String, Object> request = new HashMap<>();
        request.put("model", config.getModelType());
        request.put("stream", true);

        List<Map<String, String>> messages = new ArrayList<>();
        Map<String, String> message = new HashMap<>();
        message.put("role", "user");
        if ("报表".equals(aiType)) {
            //报表提问词
            message.put("content", buildPrompt(config.getPrompt(), userMessage));
        } else {
            //告警提问词
            message.put("content", buildPrompt(config.getPromptAlarm(), userMessage));
        }

        messages.add(message);

        request.put("messages", messages);

        return objectMapper.writeValueAsString(request);
    }

    private String buildPrompt(String basePrompt, String userMessage) {
        return String.format("%s\n%s\n", basePrompt, userMessage);
    }


    /**
     * 查询当前余额
     *
     * @return 当前余额
     * @throws IOException 如果请求失败
     */
    @Override
    @SneakyThrows
    public double getBalance() {
        HttpURLConnection connection = null;
        try {
            KeydakAiConfigDTO config = getConfig();
            URL url = new URL(config.getBalanceUrl());
            connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Authorization", "Bearer " + config.getKey());
            connection.setConnectTimeout(5000);
            connection.setReadTimeout(5000);

            int responseCode = connection.getResponseCode();
            if (responseCode != 200) {
                String errorBody = readErrorStream(connection); // 复用已有的错误流读取方法
                throw new IOException("HTTP Error: " + responseCode + " - " + errorBody);
            }

            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
                StringBuilder response = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    response.append(line);
                }

                JSONObject jsonObject = JSON.parseObject(response.toString());
                // 以下解析逻辑保持原样
                if (!jsonObject.containsKey("is_available") || !jsonObject.containsKey("balance_infos")) {
                    throw new IOException("Invalid balance response format");
                }
                JSONArray balanceInfos = jsonObject.getJSONArray("balance_infos");
                if (jsonObject.getBoolean("is_available") && balanceInfos != null && !balanceInfos.isEmpty()) {
                    JSONObject balanceInfo = balanceInfos.getJSONObject(0);
                    if (!balanceInfo.containsKey("total_balance")) {
                        throw new IOException("Missing total_balance field");
                    }
                    return balanceInfo.getDouble("total_balance");
                } else {
                    throw new IOException("Balance information is not available");
                }
            }
        } finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
    }

    /**
     * 限流器实现
     **/
    private static class RateLimiter {
        private volatile int capacity;
        private final AtomicInteger tokens;
        private volatile long lastRefillTime;
        private final Object lock = new Object();

        RateLimiter(int rate) {
            this.capacity = rate;
            this.tokens = new AtomicInteger(rate);
            this.lastRefillTime = System.currentTimeMillis();
        }

        public void refill() {
            synchronized (lock) {
                long now = System.currentTimeMillis();
                long elapsed = now - lastRefillTime;
                if (elapsed >= 1000) {
                    tokens.set(capacity); // 直接重置为最大容量
                    lastRefillTime = now;
                }
            }
        }

        public boolean tryAcquire() {
            synchronized (lock) {
                refill();
                if (tokens.get() > 0) {
                    tokens.decrementAndGet();
                    return true;
                }
                return false;
            }
        }

        public void updateRate(int newRate) {
            synchronized (lock) {
                this.capacity = newRate;
                tokens.set(Math.min(tokens.get(), newRate));
                lastRefillTime = System.currentTimeMillis();
            }
        }
    }


    /**
     * 告警内容构建方法
     **/
    @Override
    public void buildAlarmContent(IPage<AlarmRecord> page,
                                  StringBuilder alarmInfo,
                                  DateTimeFormatter formatter) {
        page.getRecords().forEach(record -> {
            // 时间格式化(使用首次告警时间)
            String time = Optional.ofNullable(record.getFirstTime())
                    .map(t -> t.format(formatter))
                    .orElse("时间未知");
            // 设备名称空值处理
            String device = StringUtils.defaultString(record.getDeviceName(), "未知设备");
            // 状态/数值处理逻辑
            String state = resolveStateValue(record);
            // 告警描述处理
            String desc = StringUtils.defaultString(record.getContent(), "未知告警类型");
            // 按规范格式拼接
            alarmInfo.append(
                    String.format("%s %s %s %s;", time, device, state, desc)
            );
        });
    }

    /**
     * 状态值解析方法
     */
    private String resolveStateValue(AlarmRecord record) {
        if (record.getValue() != null) {
            return record.getValue().stripTrailingZeros().toPlainString();
        }
        return record.getStatus() != null ?
                (record.getStatus() ? "1" : "0") : "状态未知";
    }

    /**
     * 长度校验方法
     **/
    @Override
    public void validatePromptLength(StringBuilder content, int maxLength) {
        if (content.length() > maxLength) {
            throw new IllegalArgumentException("告警数据过长,请缩小查询范围");
        }
    }

}

前端代码:

<div class="modal-area">
    <form name="formNg" novalidate>
        <div class="modal-header">
            <h3 class="modal-title" style="color: #FFFFFF">
                AI分析
            </h3>
        </div>
        <div class="modal-body">
            <div class="form">
                <!-- 加载状态 - 修改为动态效果 -->
                <div ng-if="connectionStatus === 'connecting'" class="loading">
                    <div class="ai-thinking-container">
                        <span>AI思考中</span>
                        <div class="ai-typing-indicator">
                            <div class="typing-dot"></div>
                            <div class="typing-dot"></div>
                            <div class="typing-dot"></div>
                        </div>
                    </div>
                </div>

                <!-- 思考过程 -->
                <div class="thinking-panel" ng-if="thinkingContent">
                    <div class="thinking-header">
                        <i class="fa fa-brain"></i> 思考过程
                        <!-- 总用时显示(完成后保留) -->
                        <span ng-if="thinkingTime">({{thinkingTime}}秒)</span>
                    </div>
                    <div class="thinking-content"
                         ng-bind-html="thinkingContent"
                         scroll-to-bottom="thinkingContent">
                    </div>
                </div>

                <!-- 正式回答 -->
                <div class="answer-panel" ng-if="answerContent">
                    <div class="answer-header">
                        <i class="fa fa-comment"></i> 以下是AI的分析
                    </div>
                    <div class="answer-content"
                         ng-bind-html="answerContent"
                         scroll-to-bottom="answerContent">
                    </div>
                </div>

                <!-- 错误提示 -->
                <div ng-if="connectionStatus === 'error'" class="alert alert-danger">
                    <i class="fa fa-exclamation-triangle"></i> 连接异常,请尝试重新分析
                </div>
            </div>
        </div>
        <div class="modal-footer">
            <button ng-click="retry()"
                    class="btn btn-warning"
                    ng-disabled="connectionStatus === 'connecting'">
                <i class="fa fa-redo"></i> 重新分析
            </button>
            <button ng-click="cancel()" class="btn btn-danger">
                <i class="fa fa-times"></i> 关闭
            </button>
        </div>
    </form>
</div>

<style>

    .thinking-header span {
        margin-left: 5px;
        font-size: 0.9em;
        opacity: 0.8;
        color: #a0c4ff;
    }

    .modal-body {
        height: 6rem; /* 设置固定高度 */
        overflow-y: auto; /* 内容超出时显示滚动条 */
        padding: 15px;
        background: #1B448A; /* 背景改为蓝色 */
        border-radius: 4px;
        font-family: 'Consolas', monospace;
        color: #FFFFFF;
    }

    /* 思考过程样式 */
    .thinking-panel {
        margin-bottom: 20px;
        border-left: 3px solid #4a90e2;
        padding-left: 15px;
    }

    .thinking-header {
        color: #4a90e2;
        font-size: 16px;
        margin-bottom: 10px;
    }

    .thinking-content {
        background: rgba(255, 255, 255, 0.05);
        padding: 12px;
        border-radius: 4px;
        color: #e0e0e0;
        line-height: 1.6;
    }

    /* 正式回答样式 */
    .answer-panel {
        margin-top: 25px;
        border-top: 1px solid #00c85333;
        padding-top: 15px;
    }

    .answer-header {
        color: #00c853;
        font-size: 16px;
        margin-bottom: 10px;
    }

    .answer-content {
        background: rgba(255, 255, 255, 0.05);
        padding: 12px;
        border-radius: 4px;
        color: #ffffff;
        line-height: 1.6;
    }

    /* 图标样式 */
    .fa-brain {
        color: #4a90e2;
        margin-right: 8px;
    }

    .fa-comment {
        color: #00c853;
        margin-right: 8px;
    }

    /* 新的加载动画样式 */
    .loading {
        color: #FFF;
        text-align: left;
        padding: 15px;
        font-size: 16px;
    }

    .ai-thinking-container {
        display: flex;
        align-items: center;
        gap: 8px;
    }

    .ai-typing-indicator {
        display: flex;
        align-items: center;
        gap: 4px;
        height: 20px;
    }

    .typing-dot {
        width: 8px;
        height: 8px;
        background-color: #FFFFFF;
        border-radius: 50%;
        opacity: 0.4;
        animation: typing-animation 1.4s infinite ease-in-out;
    }

    .typing-dot:nth-child(1) {
        animation-delay: 0s;
    }

    .typing-dot:nth-child(2) {
        animation-delay: 0.2s;
    }

    .typing-dot:nth-child(3) {
        animation-delay: 0.4s;
    }

    @keyframes typing-animation {
        0%, 60%, 100% {
            transform: translateY(0);
            opacity: 0.4;
        }
        30% {
            transform: translateY(-5px);
            opacity: 1;
        }
    }
</style>
// 报表分析
UI.Controllers.controller("AiTipsCtrl", [
    "$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",
    function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {
        // 状态管理
        $scope.connectionStatus = 'connecting'; // connecting | connected | error | completed
        $scope.thinkingContent = null;
        $scope.answerContent = null;
        $scope.thinkingTime = null; // 新增:思考时间变量
        $scope.startTime = null; // 新增:开始时间戳

        let eventSource = null;
        let thinkingBuffer = "";
        let answerBuffer = "";

        // 自动滚动指令
        $scope.scrollToBottom = function() {
            $timeout(() => {
                const container = document.querySelector('.modal-body');
                if (container) {
                    container.scrollTop = container.scrollHeight + 120;
                }
            }, 50);
        };

        // 内容更新方法
        function processChunkData(data) {
            if (data.type === 'reasoning') {
                // 如果是第一条思考内容,记录开始时间
                if (!thinkingBuffer && !$scope.startTime) {
                    $scope.startTime = new Date().getTime();
                }
                thinkingBuffer += data.content;
                $scope.thinkingContent = $sce.trustAsHtml(
                    thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
                // 更新思考时间
                updateThinkingTime();
            }
            else if (data.type === 'answer') {
                answerBuffer += data.content;
                $scope.answerContent = $sce.trustAsHtml(
                    answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
            }
            $scope.scrollToBottom();
        }

        function updateThinkingTime() {
            if ($scope.startTime) {
                const currentTime = new Date().getTime();
                $scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);
            }
        }


        // 初始化SSE连接
        function initSSE() {
            const url = '/data/chart/ai/sse?' + $.param(parent.queryParam);
            eventSource = new EventSource(url);

            eventSource.onopen = () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'connected';
                });
            };

            // 处理消息事件
            eventSource.addEventListener('ai-message', e => {
                $scope.$apply(() => {
                    try {
                        const data = JSON.parse(e.data);
                        processChunkData(data);
                    } catch (err) {
                        console.error('消息解析错误:', err);
                        $scope.answerContent = $sce.trustAsHtml(
                            '<div class="text-danger">数据格式错误</div>'
                        );
                    }
                });
            });

            // 处理结束事件
            eventSource.addEventListener('stream-end', () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'completed';
                    //最终更新一次思考时间
                    updateThinkingTime();
                    safeClose();
                });
            });

            // 错误处理
            eventSource.onerror = (err) => {
                $scope.$apply(() => {
                    console.error('SSE连接错误:', err);
                    $scope.connectionStatus = 'error';
                    safeClose();
                });
            };
        }

        // 安全关闭连接
        function safeClose() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        // 重新尝试
        $scope.retry = () => {
            safeClose();
            thinkingBuffer = "";
            answerBuffer = "";
            $scope.thinkingContent = null;
            $scope.answerContent = null;
            $scope.thinkingTime = null; //重置思考时间
            $scope.startTime = null; //重置开始时间
            $scope.connectionStatus = 'connecting';
            initSSE();
        };

        // 关闭模态框
        $scope.cancel = () => {
            safeClose();
            $uibModalInstance.dismiss();
        };

        // 初始化
        initSSE();

        // 清理
        $scope.$on('$destroy', () => {
            safeClose();
        });
    }
]);
// 告警分析
UI.Controllers.controller("AiAlarmTipsCtrl", [
    "$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",
    function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {
        // 状态管理
        $scope.connectionStatus = 'connecting'; // connecting | connected | error | completed
        $scope.thinkingContent = null;
        $scope.answerContent = null;
        $scope.thinkingTime = null; // 新增:思考时间变量
        $scope.startTime = null; // 新增:开始时间戳

        let eventSource = null;
        let thinkingBuffer = "";
        let answerBuffer = "";

        // 自动滚动指令
        $scope.scrollToBottom = function() {
            $timeout(() => {
                const container = document.querySelector('.modal-body');
                if (container) {
                    container.scrollTop = container.scrollHeight + 120;
                }
            }, 50);
        };

        // 内容更新方法
        function processChunkData(data) {
            if (data.type === 'reasoning') {
                // 如果是第一条思考内容,记录开始时间
                if (!thinkingBuffer && !$scope.startTime) {
                    $scope.startTime = new Date().getTime();
                }

                thinkingBuffer += data.content;
                $scope.thinkingContent = $sce.trustAsHtml(
                    thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
                // 更新思考时间
                updateThinkingTime();
            }
            else if (data.type === 'answer') {
                answerBuffer += data.content;
                $scope.answerContent = $sce.trustAsHtml(
                    answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
            }
            $scope.scrollToBottom();
        }

        // 更新思考时间
        function updateThinkingTime() {
            if ($scope.startTime) {
                const currentTime = new Date().getTime();
                $scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);
            }
        }

        // 初始化SSE连接
        function initSSE() {
            const url = '/alarm/record/realtime/page/ai/sse?' + $.param(parent.queryParam);
            eventSource = new EventSource(url);

            eventSource.onopen = () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'connected';
                });
            };

            // 处理消息事件
            eventSource.addEventListener('ai-message', e => {
                $scope.$apply(() => {
                    try {
                        const data = JSON.parse(e.data);
                        processChunkData(data);
                    } catch (err) {
                        console.error('消息解析错误:', err);
                        $scope.answerContent = $sce.trustAsHtml(
                            '<div class="text-danger">数据格式错误</div>'
                        );
                    }
                });
            });

            // 处理结束事件
            eventSource.addEventListener('stream-end', () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'completed';
                    // 最终更新一次思考时间
                    updateThinkingTime();
                    safeClose();
                });
            });

            // 错误处理
            eventSource.onerror = (err) => {
                $scope.$apply(() => {
                    console.error('SSE连接错误:', err);
                    $scope.connectionStatus = 'error';
                    safeClose();
                });
            };
        }

        // 安全关闭连接
        function safeClose() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        // 重新尝试
        $scope.retry = () => {
            safeClose();
            thinkingBuffer = "";
            answerBuffer = "";
            $scope.thinkingContent = null;
            $scope.answerContent = null;
            $scope.thinkingTime = null; // 重置思考时间
            $scope.startTime = null; // 重置开始时间
            $scope.connectionStatus = 'connecting';
            initSSE();
        };

        // 关闭模态框
        $scope.cancel = () => {
            safeClose();
            $uibModalInstance.dismiss();
        };

        // 初始化
        initSSE();

        // 清理
        $scope.$on('$destroy', () => {
            safeClose();
        });
    }
]);

            showAiTips: function (resolve) {
                this.showDialog("Template/AiTips.html", "AiTipsCtrl", resolve, 600);
            },
            showAiAlarmTips: function (resolve) {
                this.showDialog("Template/AiAlarmTips.html", "AiAlarmTipsCtrl", resolve, 600);
            }


数据库结构:

INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'balanceUrl', 'https://api.deepseek.com/user/balance', '余额查询');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'enable', 'true', '启用AI报表助手');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'key', '', 'API密钥');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'modelType', 'deepseek-reasoner', 'deepseek模型类型');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'prompt', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'promptAlarm', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'rateLimit', '3', '限制每秒多少次请求');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'url', 'https://api.deepseek.com/v1/chat/completions', 'API接口');

实体类(使用AES加密 密钥):

package com.keydak.project.core.chart.ai.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.*;

/**
 * AI配置信息
 *
 * @author xyt
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeydakAiConfigDTO {

    private Boolean enable;
    /**
     * API_URL地址
     **/
    private String url;
    /**
     * 查询余额地址
     **/
    private String balanceUrl;
    /**
     * API密钥
     **/
    private String key;
    /**
     * 限流次数
     **/
    private Integer rateLimit;
    /**
     * AI提问词(报表)
     **/
    private String prompt;

    /**
     * AI提问词(告警)
     **/
    private String promptAlarm;

    /**
     * 模型类型
     **/
    private String modelType;


    private static final String SALT = ""; // 16 bytes for AES-128
    private static final String ALGORITHM = "AES/ECB/PKCS5Padding";

    public void validate() {
        List<String> missingFields = new ArrayList<>();
        if (url == null) missingFields.add("API_URL地址");
        if (balanceUrl == null) missingFields.add("查询余额地址");
        if (key == null) missingFields.add("API密钥");
        if (rateLimit == null) missingFields.add("限流次数");
        if (prompt == null) missingFields.add("AI提问词");

        if (!missingFields.isEmpty()) {
            throw new IllegalStateException("参数不能为空: " + String.join(", ", missingFields));
        }
    }

    /**
     * 判断密钥是否已经加密
     */
    public boolean isEncryptedKey(String key) {
        try {
            // 尝试解密,如果能成功解密则认为已经是加密过的
            decryptKey(key);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 加密密钥
     **/
    public String encryptKey(String key) throws Exception {
        SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.ENCRYPT_MODE, secretKey);
        byte[] encryptedKey = cipher.doFinal(key.getBytes(StandardCharsets.UTF_8));
        return Base64.getEncoder().encodeToString(encryptedKey);
    }

    /**
     * 解密密钥
     */
    public String decryptKey(String encryptedKey) throws Exception {
        SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.DECRYPT_MODE, secretKey);
        byte[] decryptedKey = cipher.doFinal(Base64.getDecoder().decode(encryptedKey));
        return new String(decryptedKey, StandardCharsets.UTF_8);
    }
}

相关文章:

  • esp32 idf中的外部组件
  • OpenAI最近放出大新闻,准备在接下来的几个月内推出一款“开放”的语言模型
  • 基于HUTOOL实现RSA工具类
  • Vue3+Vite+TypeScript+Element Plus开发-02.Element Plus安装与配置
  • deepseek使用记录26——思维混乱背后的理论泡沫与骗局
  • LeetCode 热题 100_打家劫舍(83_198_中等_C++)(动态规划)
  • (C语言)虚数运算(结构体教程)(指针解法)(C语言教程)
  • 关于点扩散函数小记
  • 《向量数据库指南》——深度解析Kubernetes Operator在Mlivus Cloud中的实现原理
  • FPGA实现数码管显示分秒时间
  • 【编译原理】第三章 词法分析
  • Kubernetes高级应用(NFS存储)
  • 机器人基础知识-1
  • java学习+题解
  • 面试遇到的几个问题小记20250401
  • 【学习记录】pytorch载入模型的部分参数
  • 蓝桥杯省模赛 台阶方案
  • 微信登录、商品浏览前瞻
  • 简单版CentOS7配置haproxy
  • PyTorch 深度学习实战(31):可解释性AI与特征可视化
  • 福建、广西等地有大暴雨,国家防总启动防汛四级应急响应
  • 公示资费套餐、规范营销行为,今年信息通信行业将办好这十件实事
  • 101岁陕西省军区原司令员冀廷璧逝世,曾参加百团大战
  • 金融月评|尽早增强政策力度、调整施策点
  • 全国省市县国土空间总体规划已基本批复完成,进入全面实施阶段
  • 中国恒大披露清盘进展:要求债权人提交债权证明表