当前位置: 首页 > news >正文

Spring Boot SSE 流式输出,智能体的实时响应

前言

在现代Web应用中,实时数据推送已经成为提升用户体验的关键技术。特别是在AI对话、实时监控、长任务执行等场景中,传统的请求-响应模式往往无法满足实时性需求。本文将详细介绍如何使用Spring Boot的SSE(Server-Sent Events)技术实现流式输出,并以AI智能体为例展示具体实现。

一、什么是SSE?

SSE(Server-Sent Events)是一种允许服务器向客户端实时推送数据的技术。与WebSocket不同,SSE是单向通信(服务器→客户端),实现简单且浏览器兼容性好。

SSE优势:

  • 简单的HTTP协议,无需额外协议
  • 自动重连机制
  • 浏览器原生支持
  • 与现有HTTP基础设施兼容

二、项目结构概览

src/main/java/cn/bugstack/ai/trigger/http/
└── AiAgentController.java     # 流式输出控制器

三、核心代码实现

3.1 控制器层实现

@Slf4j
@RestController
@RequestMapping("/api/v1/agent")
@CrossOrigin(origins = "*", allowedHeaders = "*", methods = {RequestMethod.GET, RequestMethod.POST, RequestMethod.OPTIONS})
public class AiAgentController implements IAiAgentService {@Resource(name = "autoAgentExecuteStrategy")private IExecuteStrategy autoAgentExecuteStrategy;@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@PostMapping("/autoAgent")public ResponseBodyEmitter autoAgent(AutoAgentRequestDTO request, HttpServletResponse response) {log.info("请求参数:{}", request);try {// 设置SSE响应头setupSSEHeaders(response);// 创建流式输出对象ResponseBodyEmitter emitter = new ResponseBodyEmitter();// 构建执行命令实体ExecuteCommandEntity executeCommandEntity = buildExecuteCommand(request);// 异步执行AutoAgentexecuteAutoAgentAsync(executeCommandEntity, emitter);return emitter;} catch (Exception e) {log.error("AutoAgent请求处理异常:{}", e.getMessage(), e);return createErrorEmitter(e);}}
}

3.2 关键方法详解

3.2.1 SSE响应头设置
private void setupSSEHeaders(HttpServletResponse response) {response.setContentType("text/event-stream");  // SSE媒体类型response.setCharacterEncoding("UTF-8");response.setHeader("Cache-Control", "no-cache"); // 禁用缓存response.setHeader("Connection", "keep-alive");  // 保持长连接
}

参数说明:

  • text/event-stream:SSE标准媒体类型
  • no-cache:确保实时数据不被缓存
  • keep-alive:维持HTTP连接
3.2.2 异步执行逻辑
private void executeAutoAgentAsync(ExecuteCommandEntity command, ResponseBodyEmitter emitter) {threadPoolExecutor.execute(() -> {try {autoAgentExecuteStrategy.execute(command, emitter);} catch (Exception e) {log.error("AutoAgent执行异常:{}", e.getMessage(), e);sendErrorMessage(emitter, "执行异常:" + e.getMessage());} finally {completeEmitter(emitter);}});
}

3.3 业务层Emitter使用示例

在策略实现类中,我们可以这样使用Emitter进行流式输出:

@Service
public class AutoAgentExecuteStrategy implements IExecuteStrategy {@Overridepublic void execute(ExecuteCommandEntity command, ResponseBodyEmitter emitter) {try {// 步骤1:任务分析emitter.send("🎯 开始分析任务...");String analysis = analyzeTask(command.getMessage());emitter.send("📊 分析结果: " + analysis);// 步骤2:分步执行for (int step = 1; step <= command.getMaxStep(); step++) {emitter.send("🚀 执行第 " + step + " 步...");String stepResult = executeStep(step, command);emitter.send("✅ 步骤 " + step + " 完成: " + stepResult);// 模拟处理时间Thread.sleep(1000);}// 最终结果emitter.send("🎉 任务执行完成!");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("任务执行被中断", e);} catch (Exception e) {throw new RuntimeException("任务执行失败", e);}}
}

3.4 异常处理机制

private void sendErrorMessage(ResponseBodyEmitter emitter, String message) {try {emitter.send(message);} catch (Exception ex) {log.error("发送异常信息失败:{}", ex.getMessage(), ex);}
}private void completeEmitter(ResponseBodyEmitter emitter) {try {emitter.complete();} catch (Exception e) {log.error("完成流式输出失败:{}", e.getMessage(), e);}
}private ResponseBodyEmitter createErrorEmitter(Exception e) {ResponseBodyEmitter errorEmitter = new ResponseBodyEmitter();try {errorEmitter.send("请求处理异常:" + e.getMessage());errorEmitter.complete();} catch (Exception ex) {log.error("发送错误信息失败:{}", ex.getMessage(), ex);}return errorEmitter;
}

四、前端对接示例

4.1 使用EventSource接收流数据

<!DOCTYPE html>
<html>
<head><title>AI智能体流式输出演示</title>
</head>
<body><div id="output" style="white-space: pre-wrap; border: 1px solid #ccc; padding: 10px; height: 400px; overflow-y: auto;"></div><script>const output = document.getElementById('output');function startAgent() {const eventSource = new EventSource('/api/v1/agent/autoAgent?message=帮我分析数据&sessionId=123');eventSource.onmessage = function(event) {output.innerHTML += event.data + '\n';output.scrollTop = output.scrollHeight; // 自动滚动到底部};eventSource.onerror = function(event) {output.innerHTML += '❌ 连接错误或已完成\n';eventSource.close();};eventSource.onopen = function(event) {output.innerHTML += '🔗 连接已建立,开始接收数据...\n';};}// 页面加载后自动开始window.onload = startAgent;</script>
</body>
</html>

4.2 使用Fetch API的流式读取

async function startAgentWithFetch() {const response = await fetch('/api/v1/agent/autoAgent', {method: 'POST',headers: {'Content-Type': 'application/json',},body: JSON.stringify({message: '帮我分析数据',sessionId: '123',maxStep: 5})});const reader = response.body.getReader();const decoder = new TextDecoder();while (true) {const { done, value } = await reader.read();if (done) break;const chunk = decoder.decode(value);console.log('收到数据:', chunk);// 处理数据更新UI}
}

五、关键技术要点

5.1 线程池配置

@Configuration
public class ThreadPoolConfig {@Bean("agentThreadPoolExecutor")public ThreadPoolExecutor threadPoolExecutor() {return new ThreadPoolExecutor(10, // 核心线程数50, // 最大线程数60L, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), // 任务队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);}
}

5.2 数据传输格式

SSE支持多种数据格式:

// 发送简单文本
emitter.send("Hello World");// 发送JSON数据
Map<String, Object> data = new HashMap<>();
data.put("step", 1);
data.put("status", "processing");
data.put("message", "正在执行第一步");
emitter.send(ResponseBodyEmitter.from(objectMapper.writeValueAsString(data)));// 发送多个数据块
emitter.send(ResponseBodyEmitter.from("第一部分", "第二部分"));

六、实际应用场景

6.1 AI对话场景

public void streamAIConversation(String question, ResponseBodyEmitter emitter) {// 模拟AI思考过程emitter.send("🤔 正在思考您的问题...");// 分步输出回答String[] answerParts = generateAIAnswer(question);for (String part : answerParts) {emitter.send(part);sleep(500); // 模拟AI思考间隔}emitter.send("💡 回答完成,还有其他问题吗?");
}

6.2 长任务进度反馈

public void processLargeFile(File file, ResponseBodyEmitter emitter) {long totalSize = file.length();long processed = 0;try (BufferedReader reader = new BufferedReader(new FileReader(file))) {String line;while ((line = reader.readLine()) != null) {// 处理每一行processLine(line);processed += line.length();// 计算并发送进度int progress = (int) ((processed * 100) / totalSize);emitter.send("进度: " + progress + "%");}emitter.send("✅ 文件处理完成");} catch (IOException e) {emitter.send("❌ 文件处理失败: " + e.getMessage());}
}

七、注意事项和最佳实践

  1. 资源清理:确保在任务完成或异常时调用emitter.complete()
  2. 超时处理:可以设置Emitter的超时时间new ResponseBodyEmitter(30000L)
  3. 背压控制:在处理大量数据时要注意控制发送频率
  4. 错误处理:完善的异常处理机制确保连接正常关闭
  5. 连接管理:监控活跃连接数,避免资源耗尽

八、总结

通过本文的介绍,我们详细学习了Spring Boot中SSE流式输出的实现方式。这种技术特别适合需要实时数据推送的场景,如:

  • AI对话系统 - 实时显示AI思考过程
  • 实时监控 - 持续推送系统状态
  • 文件处理 - 实时显示处理进度
  • 数据同步 - 实时同步数据变更

SSE技术以其简单易用、浏览器兼容性好等优势,成为实现服务器推送的首选方案。结合Spring Boot的ResponseBodyEmitter,我们可以轻松构建出功能强大的流式应用。

http://www.dtcms.com/a/445486.html

相关文章:

  • Linux系统性能监控—sar命令
  • PostgreSQL备份不是复制文件?物理vs逻辑咋选?误删还能精准恢复到1分钟前?
  • 网站开发主管招聘wordpress 手机悬浮
  • 描述逻辑对人工智能自然语言处理中深层语义分析的影响与启示
  • 首屏加载耗时从5秒优化到1秒内:弱网与低端安卓机下的前端优化秘笈
  • 【新版】Elasticsearch 8.15.2 完整安装流程(Linux国内镜像提速版)
  • LeetCode 分类刷题:74. 搜索二维矩阵
  • 网站建设项目职责memcache安装wordpress
  • MySQL查看数据表锁定情况
  • sq网站推广用jsp做的网站源代码下载
  • 玩转ClaudeCode:通过Chrome DevTools MCP实现高级调试与反反爬策略
  • 国内做焊接机器人平台网站网络营销的方法是什么
  • 网站建设一般用什么软件敏捷模型是软件开发模型吗
  • 做网站好的品牌泰安房产网签查询
  • No商业网站建设wordpress 调用插件
  • 免费模板网站都有什么区别合肥网络seo
  • 什么是网站地址云服务器上放多个网站
  • 电子商务网站费用预算必须在当地网站备案
  • 遵义市播州区建设厅网站镇江网站建设和优化推广多少钱
  • 安阳建设网站哪家好久久项目咨询有限公司
  • 3g门户网站无锡企业网站制作
  • 手表回收网网站个人网页设计作品介绍
  • 如何用vps系统搭建企业网站以及邮箱系统网站建设运营预算
  • 贵阳网站建设app开发修仙网页游戏大全
  • 广东省建设网站深圳市网站建设科技公司
  • 一流的手机网站建设广州推广系统
  • 网站建设广州北京怎么做网站推广
  • php做网站的公司有哪些网站如何免费做SEO优化
  • 佛山优化网站网站开发虚拟主机是什么
  • 洛阳青峰做网站教育网站制作价格