流式响应的demo , 前端markdown格式显示, 打字机效果展示
效果:
aiStream
重要:后端响应的数据要以标准的流式响应格式
格式大致如下:
data:{"id":"","object":"","created":0,"model":"","choices":[{"delta":{"role":"assistant","content":"#"},"index":0,"finish_reason":null}]}
event:enterprise-risk-report
event 是监听事件名称,可以按实际需求来
项目环境:
Vue 3 + Vite
node 版本20+
效果:
AiStream.vue
<template><div class="ai-stream-demo"><div class="input-section"><textareav-model="question"placeholder="请输入您的问题,例如:生成风险评估报告"rows="4"cols="50"></textarea><br /><button @click="askQuestion" :disabled="loading">提问</button><button @click="generateEnterpriseRiskReport" :disabled="loading">生成风险评估报告</button><button @click="generatePolicyMatchingReport" :disabled="loading">生成匹配报告</button><button @click="clearContent">清空内容</button></div><div class="response-section"><div v-if="loading">⏳ 正在接收数据...</div><!-- 使用 key 强制重新渲染,有时有助于解决更新问题 --><div class="markdown-content" :key="displayContent" v-html="renderedContent"></div></div></div>
</template><script setup>
import { ref, computed } from 'vue'
import { marked } from 'marked'
import DOMPurify from 'dompurify'// --- 配置 marked ---
marked.setOptions({breaks: true,gfm: true,sanitize: false,smartLists: true,smartypants: false,headerIds: false,mangle: false,
});// --- 状态变量 ---
const question = ref('')
// const lastFullContent = ref('') // 存储累积的原始 content 数据
const lastFullContent = ref('') // 存储上一次收到的完整文本
const displayContent = ref('') // 用于打字机效果显示的内容
const loading = ref(false)
const typewriterQueue = ref([]) // 打字机队列
const isTyping = ref(false) // 是否正在打字// --- 计算属性:渲染 Markdown 为 HTML ---
const renderedContent = computed(() => {if (!displayContent.value) return ''try {let htmlContent = marked.parse(displayContent.value)htmlContent = DOMPurify.sanitize(htmlContent)return htmlContent} catch (error) {console.error('Markdown 解析错误:', error)return DOMPurify.sanitize(displayContent.value.replace(/\n/g, '<br>'))}
})// --- 处理新内容 ---
const handleNewContent = (content) => {if (content) {lastFullContent.value += contenttypewriterEffect(content)}
}// --- 打字机效果 ---
const typewriterEffect = async (newText) => {if (isTyping.value) {typewriterQueue.value.push(newText)return}isTyping.value = trueconst currentLength = displayContent.value.lengthconst targetText = displayContent.value + newTextfor (let i = currentLength; i < targetText.length; i++) {displayContent.value = targetText.substring(0, i + 1)const delay = targetText[i] === '\n' ? 30 : 15await new Promise(resolve => setTimeout(resolve, delay))}isTyping.value = falseif (typewriterQueue.value.length > 0) {const nextText = typewriterQueue.value.shift()await typewriterEffect(nextText)}
}// --- 清空内容 ---
const clearContent = () => {lastFullContent.value = ''displayContent.value = ''question.value = ''typewriterQueue.value = []isTyping.value = false
}// --- 通用 SSE 连接函数 (基于 fetch) ---
const connectSSE = (url, data) => {// 重置状态lastFullContent.value = ''displayContent.value = ''typewriterQueue.value = []isTyping.value = falseloading.value = true// --- EventSource Polyfill (基于你提供的代码和 fetch) ---class EventSourcePolyfill {constructor(url, options = {}) {this.url = urlthis.method = options.method || 'GET'this.headers = options.headers || {}this.payload = options.payload || nullthis.onmessage = nullthis.onerror = nullthis.listeners = {}this.currentEventType = nullthis.connect()}connect() {fetch(this.url, {method: this.method,headers: this.headers,body: this.payload}).then(response => {const contentType = response.headers.get('content-type')if (!contentType || !contentType.includes('text/event-stream')) {throw new Error('响应不是SSE格式')}const reader = response.body.getReader()const decoder = new TextDecoder('utf-8')let buffer = ''const processStream = () => {reader.read().then(({ done, value }) => {if (done) {// 流结束,触发完成事件if (this.listeners['stream-complete']) {this.listeners['stream-complete'].forEach(callback => callback({data: '[DONE]'}));}return}buffer += decoder.decode(value, { stream: true })// 根据 SSE 规范,使用 \n\n 分隔事件,但逐行处理更常见且兼容性好const lines = buffer.split(/\r\n|\n|\r/)buffer = lines.pop() // 保留不完整的行for (const line of lines) {if (line.trim() === '') continueif (line.startsWith('event:')) {this.currentEventType = line.substring(6).trim()} else if (line.startsWith('data:')) {const dataStr = line.substring(5).trim()// --- 处理 [DONE] 或特殊结束标记 ---if (dataStr === '[DONE]') {if (this.listeners['stream-complete']) {this.listeners['stream-complete'].forEach(callback => callback({data: '[DONE]'}));}continue; // 跳过后续处理}// --- 解析 JSON 格式的 data ---try {const dataObj = JSON.parse(dataStr)// ✅ 提取 content 字段(根据你的后端响应结构)const content = dataObj.choices[0].delta.content || '';const event = {data: content,type: this.currentEventType || 'message'}if (this.onmessage) {this.onmessage(event)}if (this.listeners[event.type]) {this.listeners[event.type].forEach(callback => callback(event))}this.currentEventType = null // 重置事件类型} catch (e) {console.warn('SSE 数据解析失败:', e, '原始数据:', dataStr)// 如果不是 JSON,当作纯文本处理(向后兼容)const event = {data: dataStr,type: this.currentEventType || 'message'}if (this.listeners[event.type]) {this.listeners[event.type].forEach(callback => callback(event))}}}}processStream()}).catch(error => {if (this.onerror) {this.onerror(error)}})}processStream()}).catch(error => {if (this.onerror) {this.onerror(error)}})}addEventListener(type, callback) {if (!this.listeners[type]) {this.listeners[type] = []}this.listeners[type].push(callback)}close() {console.warn('EventSourcePolyfill: close() not fully implemented for fetch-based SSE.')}}// --- 创建 SSE 连接 ---const eventSource = new EventSourcePolyfill(url, {method: 'POST',headers: {'Content-Type': 'application/json'},payload: JSON.stringify(data)})// --- 监听通用消息(已移除,避免与特定事件重复处理)---// eventSource.onmessage = (event) => {// // 此处已注释,因为后端发送的是带有event类型的SSE数据// // 会被下面的addEventListener处理,避免重复// }// --- 监听特定事件 ---eventSource.addEventListener('ai-answer', (event) => {// event.data 已经是从 choices[0].delta.content 提取的内容handleNewContent(event.data)})// --- 监听特定事件(如 enterprise-risk-report)---eventSource.addEventListener('enterprise-risk-report', (event) => {// event.data 已经是从 choices[0].delta.content 提取的内容handleNewContent(event.data)})eventSource.addEventListener('policy-matching-report', (event) => {// event.data 已经是从 choices[0].delta.content 提取的内容handleNewContent(event.data)})eventSource.addEventListener('stream-complete', (event) => {typewriterEffect('\n\n✅ 报告生成完成')eventSource.close()loading.value = false})eventSource.addEventListener('stream-error', (event) => {typewriterEffect('\n\n❌ ' + (event.data || '发生未知错误'))eventSource.close()loading.value = false})// --- 错误处理 ---eventSource.onerror = (error) => {console.error('SSE连接错误:', error)typewriterEffect('\n\n❌ 连接发生错误: ' + error.message)eventSource.close()loading.value = false}return eventSource
}// --- 提问 ---
const askQuestion = () => {if (!question.value.trim()) {alert('请输入问题')return}const data = {question: question.value,snCode: 'default' // 示例 SN}connectSSE('/api/ai/questions', data)
}// --- 生成风险评估报告 ---
const generateEnterpriseRiskReport = () => {const data = {question: question.value || '请生成风险评估报告', // 提供默认问题snCode: 'default'}connectSSE('/api/ai/enterprise-risk-report', data)
}// --- 生成政策匹配报告 ---
const generatePolicyMatchingReport = () => {const data = {question: question.value || '请生成政策匹配报告', // 提供默认问题snCode: 'default'}connectSSE('/api/ai/policy-matching-report', data)
}
</script><style scoped>
.ai-stream-demo {max-width: 1200px;margin: 0 auto;padding: 20px;font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;text-align: left;
}.input-section {margin-bottom: 30px;
}.input-section textarea {width: 100%;padding: 10px;border: 1px solid #ddd;border-radius: 6px;resize: vertical;font-size: 16px;
}.input-section button {margin-top: 15px;margin-right: 10px;padding: 10px 20px;background-color: #007bff;color: white;border: none;border-radius: 6px;cursor: pointer;font-size: 16px;transition: background-color 0.2s;
}.input-section button:disabled {background-color: #cccccc;cursor: not-allowed;
}.input-section button:hover:not(:disabled) {background-color: #0056b3;
}.response-section {border: 1px solid #e1e4e8;border-radius: 6px;padding: 20px;background-color: #fafbfc;min-height: 300px;box-shadow: inset 0 1px 2px rgba(27, 31, 35, 0.075);overflow-y: auto; /* 如果内容过多,允许滚动 */
}/* 确保 markdown-content 内的元素能正常显示 */
.markdown-content {line-height: 1.6;color: #24292e;word-wrap: break-word; /* 长单词换行 */
}.markdown-content h1,
.markdown-content h2,
.markdown-content h3 {margin-top: 1.5em;margin-bottom: 1em;font-weight: 600;line-height: 1.25;
}.markdown-content h1 {font-size: 2em;padding-bottom: 0.3em;border-bottom: 1px solid #eaecef;
}.markdown-content h2 {font-size: 1.5em;padding-bottom: 0.3em;border-bottom: 1px solid #eaecef;
}.markdown-content p {margin-top: 0;margin-bottom: 1em;
}.markdown-content ul,
.markdown-content ol {padding-left: 2em;margin-top: 0;margin-bottom: 1em;
}.markdown-content li {margin-bottom: 0.25em;
}.markdown-content pre {padding: 16px;overflow: auto;font-size: 85%;line-height: 1.45;background-color: #f6f8fa;border-radius: 6px;margin-top: 0;margin-bottom: 16px;
}.markdown-content code {padding: 0.2em 0.4em;margin: 0;font-size: 85%;background-color: rgba(27, 31, 35, 0.05);border-radius: 6px;font-family: ui-monospace, SFMono-Regular, "SF Mono", Consolas, "Liberation Mono", Menlo, monospace;
}
</style>
后端代码 controller:
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;/***ai 智能问答*/
@Api("ai 智能问答")
@RestController
@RequestMapping("/api/ai")
public class AIQuestionsController {private static final Logger logger = Logger.getLogger(AIQuestionsController.class.getName());private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);@Resourceprivate AIQuestionService aiQuestionService;/*** 为SSE连接设置心跳保活机制(定时发送心跳消息)** @param emitter SSE发射器* @param isCompleted 完成状态标识* @param intervalSec 心跳间隔(秒)*/private void setupHeartbeat(SseEmitter emitter, AtomicBoolean isCompleted, int intervalSec) {scheduler.scheduleAtFixedRate(() -> {if (isCompleted.get()) {return; // 已完成,不再发心跳}try {synchronized (emitter) {if (!isCompleted.get()) {emitter.send(SseEmitter.event().comment("heartbeat"));}}logger.fine("已发送心跳,保持SSE连接活跃");} catch (Exception e) {logger.log(Level.WARNING, "发送心跳失败,连接可能已断开", e);}}, intervalSec, intervalSec, TimeUnit.SECONDS);}/***智能问答请求接口(流式响应)@param requestVO 问答请求参数(含问题、snCode 等)@return SseEmitter 流式响应发射器*/@ApiOperation(value = "智能问答请求接口(流式响应)")@PostMapping("/questions")public SseEmitter questions(@RequestBody ChatRequestVO requestVO) {
// 1. 创建 SSE 发射器,设置初始超时时间(120秒)SseEmitter emitter = new SseEmitter(120 * 1000L);AtomicBoolean isCompleted = new AtomicBoolean(false);// 2. 设置自动续期机制:每60秒检查一次,每次延长120秒setupHeartbeat(emitter, isCompleted, 10);
// 3. 调用流式服务,实时推送响应片段aiQuestionService.questions(requestVO,
// 接收流式内容片段,推送给前端(修复:删除多余的 MediaType 参数)fragment -> {if (isCompleted.get()) return; // 防止在连接关闭后继续发送数据try {synchronized (emitter) {if (!isCompleted.get()) {emitter.send(SseEmitter.event().data(fragment, MediaType.TEXT_MARKDOWN).name("ai-answer"));}}} catch (Exception e) {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.SEVERE, "SSE 推送回答片段失败", e);try {emitter.completeWithError(e);} catch (Exception ex) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", ex);}}}},
// 流式响应结束,通知前端并关闭连接() -> {if (isCompleted.compareAndSet(false, true)) {try {synchronized (emitter) {emitter.send(SseEmitter.event().data("流式响应已完成", MediaType.TEXT_MARKDOWN).name("stream-complete"));}} catch (Exception e) {logger.log(Level.WARNING, "SSE 发送结束标记失败", e);} finally {try {emitter.complete();logger.info("AI 流式响应已结束");} catch (Exception e) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", e);}}}});
// 4. 全局 SSE 连接异常处理emitter.onError(e -> {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.SEVERE, "SSE 连接异常", e);try {emitter.completeWithError(e);} catch (Exception ex) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", ex);}}});// 5. SSE 连接超时处理emitter.onTimeout(() -> {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.WARNING, "SSE 连接超时");try {emitter.complete();} catch (Exception e) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", e);}}});// 6. SSE 连接完成处理emitter.onCompletion(() -> {logger.info("SSE 连接已完成");});return emitter;}/***风险评估报告接口(流式响应)@param requestVO 报告请求参数(含问题、snCode 等)@return SseEmitter 流式响应发射器*/@ApiOperation(value = "风险评估报告接口(流式响应)")@PostMapping("/enterprise-risk-report")public SseEmitter enterpriseRiskReport(@RequestBody ChatRequestVO requestVO) {SseEmitter emitter = new SseEmitter(0L);AtomicBoolean isCompleted = new AtomicBoolean(false);// 设置自动续期机制:每60秒发送一次心跳保持连接活跃setupHeartbeat(emitter, isCompleted, 10);aiQuestionService.enterpriseRiskReport(requestVO,fragment -> {if (isCompleted.get()) return; // 防止在连接关闭后继续发送数据try {synchronized (emitter) {if (!isCompleted.get()) {emitter.send(SseEmitter.event().data(fragment, MediaType.TEXT_MARKDOWN).name("enterprise-risk-report"));}}} catch (Exception e) {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.SEVERE, "SSE 推送报告片段失败", e);try {emitter.completeWithError(e);} catch (Exception ex) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", ex);}}}},() -> {if (isCompleted.compareAndSet(false, true)) {try {synchronized (emitter) {emitter.send(SseEmitter.event().data("报告生成已完成", MediaType.TEXT_MARKDOWN).name("stream-complete"));}} catch (Exception e) {logger.log(Level.WARNING, "SSE 发送结束标记失败", e);} finally {try {emitter.complete();logger.info("AI 风险评估报告已结束");} catch (Exception e) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", e);}}}});emitter.onError(e -> {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.SEVERE, "SSE 连接异常", e);try {emitter.completeWithError(e);} catch (Exception ex) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", ex);}}});// SSE 连接超时处理emitter.onTimeout(() -> {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.WARNING, "SSE 连接超时");try {emitter.complete();} catch (Exception e) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", e);}}});// SSE 连接完成处理emitter.onCompletion(() -> {logger.info("SSE 连接已完成");});return emitter;}/***政策匹配政府端报告接口(流式响应)@param requestVO 报告请求参数(含问题、snCode 等)@return SseEmitter 流式响应发射器*/@ApiOperation(value = "政策匹配政府端报告接口(流式响应)")@PostMapping("/policy-matching-report")public SseEmitter policyMatchingReport(@RequestBody ChatRequestVO requestVO) {SseEmitter emitter = new SseEmitter(0L);AtomicBoolean isCompleted = new AtomicBoolean(false);// 设置自动续期机制:每60秒检查一次,每次延长120秒setupHeartbeat(emitter, isCompleted, 10);aiQuestionService.policyMatchingReport(requestVO,fragment -> {if (isCompleted.get()) return; // 防止在连接关闭后继续发送数据try {synchronized (emitter) {if (!isCompleted.get()) {emitter.send(SseEmitter.event().data(fragment, MediaType.TEXT_MARKDOWN).name("policy-matching-report"));}}} catch (Exception e) {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.SEVERE, "SSE 推送报告片段失败", e);try {emitter.completeWithError(e);} catch (Exception ex) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", ex);}}}},() -> {if (isCompleted.compareAndSet(false, true)) {try {synchronized (emitter) {emitter.send(SseEmitter.event().data("报告生成已完成", MediaType.TEXT_MARKDOWN).name("stream-complete"));}} catch (Exception e) {logger.log(Level.WARNING, "SSE 发送结束标记失败", e);} finally {try {emitter.complete();logger.info("AI 政策匹配报告已结束");} catch (Exception e) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", e);}}}});emitter.onError(e -> {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.SEVERE, "SSE 连接异常", e);try {emitter.completeWithError(e);} catch (Exception ex) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", ex);}}});// SSE 连接超时处理emitter.onTimeout(() -> {if (isCompleted.compareAndSet(false, true)) {logger.log(Level.WARNING, "SSE 连接超时");try {emitter.complete();} catch (Exception e) {logger.log(Level.SEVERE, "完成SSE连接时发生异常", e);}}});// SSE 连接完成处理emitter.onCompletion(() -> {logger.info("SSE 连接已完成");});return emitter;}
}
接口:
public interface AIQuestionService {void questions(ChatRequestVO requestVO, Consumer<String> contentConsumer, Runnable finishRunnable);/*** 企业风险评估报告*/void enterpriseRiskReport(ChatRequestVO requestVO, Consumer<String> contentConsumer, Runnable finishRunnable);/*** 政策匹配政府端报告*/void policyMatchingReport(ChatRequestVO requestVO, Consumer<String> contentConsumer, Runnable finishRunnable);}
实现类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;@Service
@Log4j2
public class AIQuestionServiceImpl implements AIQuestionService {// 配置参数@Value ("${config.aiUrl}")private String apiUrl;// 聊天@Value("${config.aiKey}")private String apiKey;// 风险评估报告@Value("${config.enterpriseRiskAssessmentKey}")private String enterpriseRiskAssessmentKey;// 匹配报告@Value("${config.policyMatchingKey}")private String policyMatchingKey;@Value("${config.aiConnect-timeout:30}")private int connectTimeout;@Value("${config.aiRead-timeout:120}")private int readTimeout;@Value("${config.aiWrite-timeout:30}")private int writeTimeout;private OkHttpClient okHttpClient;// 初始化 HTTP 客户端@PostConstructpublic void initHttpClient () {this.okHttpClient = new OkHttpClient.Builder ().connectTimeout (connectTimeout, TimeUnit.SECONDS).readTimeout (readTimeout, TimeUnit.SECONDS).writeTimeout (writeTimeout, TimeUnit.SECONDS).retryOnConnectionFailure (true).connectionPool (new ConnectionPool (1, 5, TimeUnit.MINUTES)).build ();log.info("AI 流式接口客户端初始化完成,API 地址:{}", apiUrl);}// 核心流式方法@Overridepublic void questions (ChatRequestVO requestVO,Consumer<String> contentConsumer,Runnable finishRunnable) {if (requestVO == null || requestVO.getQuestion () == null || requestVO.getQuestion ().trim ().isEmpty ()) {String errorMsg = "AI 流式请求:提问内容不能为空";log.warn (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}if (contentConsumer == null || finishRunnable == null) {throw new IllegalArgumentException ("AI 流式请求:内容消费者和结束回调不能为空");}String chatId = "chat-" + UUID.randomUUID().toString().replace("-", "");log.info("AI 流式会话开始,chatId:{},用户问题:{}", chatId, requestVO.getQuestion ());JSONObject requestBody = buildStreamRequestBody (requestVO, chatId);if (requestBody == null) {String errorMsg = "AI 流式请求:请求体构建失败";log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}Request httpRequest = buildStreamHttpRequest (requestBody, apiKey); // 使用聊天API Keyif (httpRequest == null) {String errorMsg = "AI 流式请求:HTTP 请求构建失败";log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}sendStreamRequest(httpRequest, chatId, contentConsumer, finishRunnable);}@Overridepublic void enterpriseRiskReport(ChatRequestVO requestVO,Consumer<String> contentConsumer,Runnable finishRunnable) {
// if (requestVO == null || requestVO.getQuestion () == null || requestVO.getQuestion ().trim ().isEmpty ()) {
// String errorMsg = "AI 流式请求:提问内容不能为空";
// log.warn (errorMsg);
// contentConsumer.accept (errorMsg);
// finishRunnable.run ();
// return;
// }if (contentConsumer == null || finishRunnable == null) {throw new IllegalArgumentException ("AI 流式请求:内容消费者和结束回调不能为空");}String chatId = "enterprise-" + UUID.randomUUID().toString().replace("-", "");log.info("AI 企业风险评估报告开始,chatId:{},用户问题:{}", chatId, requestVO.getQuestion ());JSONObject requestBody = buildStreamRequestBody (requestVO, chatId);if (requestBody == null) {String errorMsg = "AI 流式请求:请求体构建失败";log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}Request httpRequest = buildStreamHttpRequest (requestBody, enterpriseRiskAssessmentKey); // 使用企业风险评估API Keyif (httpRequest == null) {String errorMsg = "AI 流式请求:HTTP 请求构建失败";log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}sendStreamRequest(httpRequest, chatId, contentConsumer, finishRunnable);}@Overridepublic void policyMatchingReport(ChatRequestVO requestVO,Consumer<String> contentConsumer,Runnable finishRunnable) {
// if (requestVO == null || requestVO.getQuestion () == null || requestVO.getQuestion ().trim ().isEmpty ()) {
// String errorMsg = "AI 流式请求:提问内容不能为空";
// log.warn (errorMsg);
// contentConsumer.accept (errorMsg);
// finishRunnable.run ();
// return;
// }if (contentConsumer == null || finishRunnable == null) {throw new IllegalArgumentException ("AI 流式请求:内容消费者和结束回调不能为空");}String chatId = "policy-" + UUID.randomUUID().toString().replace("-", "");log.info("AI 政策匹配报告开始,chatId:{},用户问题:{}", chatId, requestVO.getQuestion ());JSONObject requestBody = buildStreamRequestBody (requestVO, chatId);if (requestBody == null) {String errorMsg = "AI 流式请求:请求体构建失败";log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}Request httpRequest = buildStreamHttpRequest (requestBody, policyMatchingKey); // 使用政策匹配API Keyif (httpRequest == null) {String errorMsg = "AI 流式请求:HTTP 请求构建失败";log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}sendStreamRequest(httpRequest, chatId, contentConsumer, finishRunnable);}// 构建请求体private JSONObject buildStreamRequestBody (ChatRequestVO requestVO, String chatId) {try {JSONObject requestBody = new JSONObject ();JSONArray messages = new JSONArray ();JSONObject userMessage = new JSONObject ();userMessage.put ("role", "user");JSONArray content = new JSONArray();JSONObject textContent = new JSONObject();textContent.put("type", "text");textContent.put("text", requestVO.getQuestion());content.add(textContent);userMessage.put("content", content);messages.add(userMessage);JSONObject variables = new JSONObject();if (requestVO.getSnCode() != null && !requestVO.getSnCode().trim().isEmpty()) {variables.put("sn_code", requestVO.getSnCode().trim());}requestBody.put("messages", messages);requestBody.put("variables", variables);requestBody.put("controller", new JSONObject());requestBody.put("chatId", chatId);requestBody.put("stream", true);log.debug ("AI 流式请求体(chatId:{}):{}", chatId, requestBody.toJSONString ());return requestBody;} catch (Exception e) {log.error ("构建 AI 流式请求体失败", e);return null;}}// 构建 HTTP 请求private Request buildStreamHttpRequest (JSONObject requestBody, String authKey) {try {MediaType jsonMediaType = MediaType.parse ("application/json; charset=utf-8");RequestBody httpRequestBody = RequestBody.create (requestBody.toString (), jsonMediaType);return new Request.Builder().url(apiUrl).addHeader("Authorization", "Bearer " + authKey).addHeader("Content-Type", jsonMediaType.toString()).addHeader("Accept", "text/event-stream").addHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36").addHeader("Referer", "http://192.168.201.39:3088").post (httpRequestBody).build ();} catch (Exception e) {log.error ("构建 AI 流式 HTTP 请求失败", e);return null;}}// 发送流式请求private void sendStreamRequest (Request httpRequest, String chatId,Consumer<String> contentConsumer, Runnable finishRunnable) {okHttpClient.newCall (httpRequest).enqueue (new Callback () {@Overridepublic void onFailure (Call call, IOException e) {String errorMsg = String.format ("AI 流式请求发送失败(chatId:% s):% s", chatId, e.getMessage ());log.error (errorMsg, e);contentConsumer.accept (errorMsg);finishRunnable.run ();}@Overridepublic void onResponse (Call call, Response response) throws IOException {try (ResponseBody responseBody = response.body ()) {if (!response.isSuccessful ()) {String errorResp = Objects.requireNonNull (responseBody).string ();String errorMsg = String.format ("AI 流式请求响应失败(chatId:% s):状态码 =% d,响应内容 =% s",chatId, response.code (), errorResp);log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}if (responseBody == null) {String errorMsg = String.format ("AI 流式请求响应体为空(chatId:% s)", chatId);log.error (errorMsg);contentConsumer.accept (errorMsg);finishRunnable.run ();return;}// 调用修复后的解析方法parseStreamResponse (responseBody, chatId, contentConsumer);} catch (Exception e) {String errorMsg = String.format ("AI 流式响应处理异常(chatId:% s):% s", chatId, e.getMessage ());log.error (errorMsg, e);contentConsumer.accept (errorMsg);} finally {finishRunnable.run ();log.info("AI 流式会话结束(chatId:{})", chatId);}}});}/**修复后的流式响应解析方法:过滤 event: 开头的事件标记行优化 JSON 解析异常日志,打印完整原始行*/private void parseStreamResponse (ResponseBody responseBody, String chatId,Consumer<String> contentConsumer) throws IOException {log.info("开始接收 AI 流式响应(chatId:{})", chatId);try (okio.BufferedSource source = responseBody.source ()) {while (!source.exhausted ()) {String line = source.readUtf8Line ();if (line == null) break;// ########## 核心修复 1:过滤非数据行 ##########// 1. 过滤空行、注释行(: 开头)// 2. 过滤 event: 开头的事件标记行(如 event: error)if (line.isEmpty ()|| line.startsWith (":")|| line.trim ().startsWith ("event:")) {log.debug ("AI 流式响应跳过非数据行(chatId:{}):{}", chatId, line);continue;}// 提取 JSON 字符串(处理 data: 前缀)String jsonStr = line.startsWith ("data:") ? line.substring (6).trim () : line.trim ();// 过滤空的 JSON 字符串if (jsonStr.isEmpty ()) {log.debug ("AI 流式响应跳过空 JSON 行(chatId:{})", chatId);continue;}// 检查结束标记if ("[DONE]".equals (jsonStr)) {log.info("AI 流式响应接收完成(chatId:{})", chatId);break;}contentConsumer.accept (jsonStr);// 解析 JSON 提取内容片段try {
// JSONObject responseJson = JSON.parseObject (jsonStr);
// JSONArray choices = responseJson.getJSONArray ("choices");
// if (choices != null && !choices.isEmpty ()) {
// JSONObject choice = choices.getJSONObject (0);
// JSONObject delta = choice.getJSONObject ("delta");
// if (delta != null && delta.containsKey ("content")) {
// String contentFragment = delta.getString ("content");
// if (contentFragment != null && !contentFragment.isEmpty ()) {
// log.debug ("AI 流式响应片段(chatId:{}):{}", chatId, contentFragment);
// contentConsumer.accept (contentFragment);
// }
// }
// }} catch (Exception e) {// 打印完整原始行(含 data: 前缀),便于定位服务端返回格式问题log.warn ("AI 流式响应片段解析失败(chatId:{}),原始行:{},异常原因:{}",chatId, line, e.getMessage ());}}}}
}