Spring Boot SSE 流式输出,智能体的实时响应
前言
在现代Web应用中,实时数据推送已经成为提升用户体验的关键技术。特别是在AI对话、实时监控、长任务执行等场景中,传统的请求-响应模式往往无法满足实时性需求。本文将详细介绍如何使用Spring Boot的SSE(Server-Sent Events)技术实现流式输出,并以AI智能体为例展示具体实现。
一、什么是SSE?
SSE(Server-Sent Events)是一种允许服务器向客户端实时推送数据的技术。与WebSocket不同,SSE是单向通信(服务器→客户端),实现简单且浏览器兼容性好。
SSE优势:
- 简单的HTTP协议,无需额外协议
- 自动重连机制
- 浏览器原生支持
- 与现有HTTP基础设施兼容
二、项目结构概览
src/main/java/cn/bugstack/ai/trigger/http/
└── AiAgentController.java # 流式输出控制器
三、核心代码实现
3.1 控制器层实现
@Slf4j
@RestController
@RequestMapping("/api/v1/agent")
@CrossOrigin(origins = "*", allowedHeaders = "*", methods = {RequestMethod.GET, RequestMethod.POST, RequestMethod.OPTIONS})
public class AiAgentController implements IAiAgentService {@Resource(name = "autoAgentExecuteStrategy")private IExecuteStrategy autoAgentExecuteStrategy;@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@PostMapping("/autoAgent")public ResponseBodyEmitter autoAgent(AutoAgentRequestDTO request, HttpServletResponse response) {log.info("请求参数:{}", request);try {// 设置SSE响应头setupSSEHeaders(response);// 创建流式输出对象ResponseBodyEmitter emitter = new ResponseBodyEmitter();// 构建执行命令实体ExecuteCommandEntity executeCommandEntity = buildExecuteCommand(request);// 异步执行AutoAgentexecuteAutoAgentAsync(executeCommandEntity, emitter);return emitter;} catch (Exception e) {log.error("AutoAgent请求处理异常:{}", e.getMessage(), e);return createErrorEmitter(e);}}
}
3.2 关键方法详解
3.2.1 SSE响应头设置
private void setupSSEHeaders(HttpServletResponse response) {response.setContentType("text/event-stream"); // SSE媒体类型response.setCharacterEncoding("UTF-8");response.setHeader("Cache-Control", "no-cache"); // 禁用缓存response.setHeader("Connection", "keep-alive"); // 保持长连接
}
参数说明:
text/event-stream
:SSE标准媒体类型no-cache
:确保实时数据不被缓存keep-alive
:维持HTTP连接
3.2.2 异步执行逻辑
private void executeAutoAgentAsync(ExecuteCommandEntity command, ResponseBodyEmitter emitter) {threadPoolExecutor.execute(() -> {try {autoAgentExecuteStrategy.execute(command, emitter);} catch (Exception e) {log.error("AutoAgent执行异常:{}", e.getMessage(), e);sendErrorMessage(emitter, "执行异常:" + e.getMessage());} finally {completeEmitter(emitter);}});
}
3.3 业务层Emitter使用示例
在策略实现类中,我们可以这样使用Emitter进行流式输出:
@Service
public class AutoAgentExecuteStrategy implements IExecuteStrategy {@Overridepublic void execute(ExecuteCommandEntity command, ResponseBodyEmitter emitter) {try {// 步骤1:任务分析emitter.send("🎯 开始分析任务...");String analysis = analyzeTask(command.getMessage());emitter.send("📊 分析结果: " + analysis);// 步骤2:分步执行for (int step = 1; step <= command.getMaxStep(); step++) {emitter.send("🚀 执行第 " + step + " 步...");String stepResult = executeStep(step, command);emitter.send("✅ 步骤 " + step + " 完成: " + stepResult);// 模拟处理时间Thread.sleep(1000);}// 最终结果emitter.send("🎉 任务执行完成!");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("任务执行被中断", e);} catch (Exception e) {throw new RuntimeException("任务执行失败", e);}}
}
3.4 异常处理机制
private void sendErrorMessage(ResponseBodyEmitter emitter, String message) {try {emitter.send(message);} catch (Exception ex) {log.error("发送异常信息失败:{}", ex.getMessage(), ex);}
}private void completeEmitter(ResponseBodyEmitter emitter) {try {emitter.complete();} catch (Exception e) {log.error("完成流式输出失败:{}", e.getMessage(), e);}
}private ResponseBodyEmitter createErrorEmitter(Exception e) {ResponseBodyEmitter errorEmitter = new ResponseBodyEmitter();try {errorEmitter.send("请求处理异常:" + e.getMessage());errorEmitter.complete();} catch (Exception ex) {log.error("发送错误信息失败:{}", ex.getMessage(), ex);}return errorEmitter;
}
四、前端对接示例
4.1 使用EventSource接收流数据
<!DOCTYPE html>
<html>
<head><title>AI智能体流式输出演示</title>
</head>
<body><div id="output" style="white-space: pre-wrap; border: 1px solid #ccc; padding: 10px; height: 400px; overflow-y: auto;"></div><script>const output = document.getElementById('output');function startAgent() {const eventSource = new EventSource('/api/v1/agent/autoAgent?message=帮我分析数据&sessionId=123');eventSource.onmessage = function(event) {output.innerHTML += event.data + '\n';output.scrollTop = output.scrollHeight; // 自动滚动到底部};eventSource.onerror = function(event) {output.innerHTML += '❌ 连接错误或已完成\n';eventSource.close();};eventSource.onopen = function(event) {output.innerHTML += '🔗 连接已建立,开始接收数据...\n';};}// 页面加载后自动开始window.onload = startAgent;</script>
</body>
</html>
4.2 使用Fetch API的流式读取
async function startAgentWithFetch() {const response = await fetch('/api/v1/agent/autoAgent', {method: 'POST',headers: {'Content-Type': 'application/json',},body: JSON.stringify({message: '帮我分析数据',sessionId: '123',maxStep: 5})});const reader = response.body.getReader();const decoder = new TextDecoder();while (true) {const { done, value } = await reader.read();if (done) break;const chunk = decoder.decode(value);console.log('收到数据:', chunk);// 处理数据更新UI}
}
五、关键技术要点
5.1 线程池配置
@Configuration
public class ThreadPoolConfig {@Bean("agentThreadPoolExecutor")public ThreadPoolExecutor threadPoolExecutor() {return new ThreadPoolExecutor(10, // 核心线程数50, // 最大线程数60L, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), // 任务队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);}
}
5.2 数据传输格式
SSE支持多种数据格式:
// 发送简单文本
emitter.send("Hello World");// 发送JSON数据
Map<String, Object> data = new HashMap<>();
data.put("step", 1);
data.put("status", "processing");
data.put("message", "正在执行第一步");
emitter.send(ResponseBodyEmitter.from(objectMapper.writeValueAsString(data)));// 发送多个数据块
emitter.send(ResponseBodyEmitter.from("第一部分", "第二部分"));
六、实际应用场景
6.1 AI对话场景
public void streamAIConversation(String question, ResponseBodyEmitter emitter) {// 模拟AI思考过程emitter.send("🤔 正在思考您的问题...");// 分步输出回答String[] answerParts = generateAIAnswer(question);for (String part : answerParts) {emitter.send(part);sleep(500); // 模拟AI思考间隔}emitter.send("💡 回答完成,还有其他问题吗?");
}
6.2 长任务进度反馈
public void processLargeFile(File file, ResponseBodyEmitter emitter) {long totalSize = file.length();long processed = 0;try (BufferedReader reader = new BufferedReader(new FileReader(file))) {String line;while ((line = reader.readLine()) != null) {// 处理每一行processLine(line);processed += line.length();// 计算并发送进度int progress = (int) ((processed * 100) / totalSize);emitter.send("进度: " + progress + "%");}emitter.send("✅ 文件处理完成");} catch (IOException e) {emitter.send("❌ 文件处理失败: " + e.getMessage());}
}
七、注意事项和最佳实践
- 资源清理:确保在任务完成或异常时调用
emitter.complete()
- 超时处理:可以设置Emitter的超时时间
new ResponseBodyEmitter(30000L)
- 背压控制:在处理大量数据时要注意控制发送频率
- 错误处理:完善的异常处理机制确保连接正常关闭
- 连接管理:监控活跃连接数,避免资源耗尽
八、总结
通过本文的介绍,我们详细学习了Spring Boot中SSE流式输出的实现方式。这种技术特别适合需要实时数据推送的场景,如:
- AI对话系统 - 实时显示AI思考过程
- 实时监控 - 持续推送系统状态
- 文件处理 - 实时显示处理进度
- 数据同步 - 实时同步数据变更
SSE技术以其简单易用、浏览器兼容性好等优势,成为实现服务器推送的首选方案。结合Spring Boot的ResponseBodyEmitter
,我们可以轻松构建出功能强大的流式应用。