当前位置: 首页 > news >正文

Java SpringBoot 扣子CozeAI SseEmitter流式对话完整实战 打字机效果

书接上回:springBoot 整合 扣子cozeAI 智能体 对话https://blog.csdn.net/weixin_44548582/article/details/147457236

上文实现的是一次性等待得到完整的AI回复内容,但随着问题和AI的逻辑日趋复杂,会明显增加这个等待时间,这对前端用户并不友好,所以需要实现与coze对话的流式、打字机效果。

核心工具:SseEmitter

基本概念

SseEmitter 是 Spring Framework 提供的一个类,用于实现服务器发送事件(Server-Sent Events, SSE)。SSE 是一种允许服务器向客户端推送实时更新的技术,通常用于实现实时通知、数据流传输等功能。SseEmitter 通过 HTTP 长连接保持与客户端的通信,服务器可以持续向客户端发送数据,而客户端则通过 EventSource API 接收这些数据。

实现流式传输的原理

SseEmitter 实现流式传输的核心在于它使用了 HTTP 长连接和分块传输编码(Chunked Transfer Encoding)。当客户端发起 SSE 请求时,服务器会保持连接打开,并通过分块传输的方式逐步发送数据。每个数据块都是一个独立的事件,客户端可以实时接收并处理这些事件。

实现打字机效果的原理

打字机效果是指文本逐字或逐行显示的效果。通过 SseEmitter,可以实现这种效果。服务器可以逐步发送文本的每个字符或每行,客户端接收到数据后立即追加显示,从而模拟出打字机的效果。

实战代码

application.yml配置
# Tomcat
server:port: 9210#扣子参数
coze:clientId: xxxxxxxxxxxxxpublicKeyId: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyprivateKeyFilePath: 本地或服务器绝对路径/private_key.pemwwwBase: https://www.coze.cnapiBase: https://api.coze.cn# 智能体IDbotId: zzzzzzzzzzzzzzzzzzzzzzzzzzzz
扣子参数配置类 
/*** 扣子参数配置类* @Author: Tenk*/
@Component
@ConfigurationProperties(prefix = "coze") // 通过yml读取
public class CozeConfig {//OAuth应用IDprivate String clientId;//公钥private String publicKeyId;//私钥证书private String privateKeyFilePath;//coze官网private String wwwBase;//cozeApi请求地址private String apiBase;//智能体botIdprivate String botId;
}
Coze授权工具类
/*** 初始化CozeJWTOAuth授权工具** @url https://github.com/coze-dev/coze-java/blob/main/example/src/main/java/example/auth/JWTOAuthExample.java* @Author: Tenk*/
@Component
public class CozeJWTOAuthUtil {private static final Logger log = LoggerFactory.getLogger(CozeJWTOAuthUtil.class);@Autowiredprivate CozeConfig cozeConfig;@Autowiredprivate RedisService redisService;//CozeAPIprivate JWTOAuthClient oauth;public OAuthToken getAccessToken(Long userId) {OAuthToken accessToken;if (redisService.hasKey(CozeConstants.JWT_TOKEN + userId)) {accessToken = JSONObject.parseObject(redisService.getCacheObject(CozeConstants.JWT_TOKEN + userId).toString(), OAuthToken.class);} else {accessToken = oauth.getAccessToken(userId.toString());redisService.setCacheObject(CozeConstants.JWT_TOKEN + userId, accessToken, 14L, TimeUnit.MINUTES);}return accessToken;}public CozeConfig getCozeConfig() {return cozeConfig;}@PostConstructpublic void init() {this.oauth = createJWTOAuthClient();}public JWTOAuthClient getJWTOAuthClient() {return oauth;}/*** 初始化CozeJWTOAuth** @return*/public CozeAPI createCozeAPIByUser(String accessToken) {return new CozeAPI.Builder().auth(new TokenAuth(accessToken)).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();}public JWTOAuthClient createJWTOAuthClient() {try {//读取coze_private_key_pemString jwtOAuthPrivateKey = new String(Files.readAllBytes(Paths.get(cozeConfig.getPrivateKeyFilePath())), StandardCharsets.UTF_8);oauth = new JWTOAuthClient.JWTOAuthBuilder().clientID(cozeConfig.getClientId()).privateKey(jwtOAuthPrivateKey).publicKey(cozeConfig.getPublicKeyId()).baseURL(cozeConfig.getApiBase()).readTimeout(60000).connectTimeout(60000).build();} catch (Exception e) {log.error("初始化CozeJWTOAuth失败", e);return null;}return oauth;}
}
SSE服务类 
/*** SSE服务类** @Author: Tenk*/
@Service
public class SseServiceImpl implements SseService {private static final Logger log = LoggerFactory.getLogger(SseServiceImpl.class);/*** k:扣子会话id  v:SseEmitter* 这里一定是使用ConcurrentHashMap,因为他是多线程安全的。*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** k:会话id  v:userId* 这里一定是使用ConcurrentHashMap,因为他是多线程安全的。*/private static Map<String, Long> sseUserMap = new ConcurrentHashMap<>();@Overridepublic SseEmitter connect(String conversationId, Long userId) {SseEmitter sseEmitter;// 判断是否已经存在if (sseEmitterMap.containsKey(conversationId)) {sseEmitter = sseEmitterMap.get(conversationId);} else {// 最多6小时断开连接sseEmitter = new SseEmitter(6 * 60 * 60 * 1000L);}// 连接断开sseEmitter.onCompletion(() -> {disconnect("连接断开", conversationId);});// 连接超时sseEmitter.onTimeout(() -> {disconnect("连接超时", conversationId);});// 连接报错sseEmitter.onError((throwable) -> {disconnect("连接报错", conversationId);});sseEmitterMap.put(conversationId, sseEmitter);sseUserMap.put(conversationId, userId);return sseEmitter;}private static void disconnect(String action, String conversationId) {Long value = sseUserMap.get(conversationId);log.info("sse{},用户userId:{}", action, value);sseEmitterMap.remove(conversationId);sseUserMap.remove(conversationId);}
}
AI 接口 Controller 
/*** AI 接口 Controller* @Author: Tenk*/
@RestController
@RequestMapping("/ai/chats")
public class CozeController {@Autowiredprivate CozeService cozeService;@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;@Autowiredprivate SseService sseService;/*** 向AI发起流式对话请求** @param conversationId 会话ID* @param content        对话内容* @return 对话流*/@GetMapping(value = "/send", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter sendFlowMessage(@RequestParam String conversationId,@RequestParam String content) throws IOException {if (StringUtils.isEmpty(conversationId)) {throw new ServiceException("会话信息缺失", 500);}ChatBo bo = new ChatBo();bo.setUserId(userId);bo.setConversationId(conversationId);bo.setContent(content);SseEmitter emitter = sseService.connect(conversationId, userId);try{cozeService.sendFlowMessage(bo, emitter);}catch (Exception e){e.printStackTrace();// 捕获并发送 SSE 格式的错误emitter.send("{\"status\":\"fail\",\"data\":\""+e.getMessage()+"\"}");emitter.completeWithError(e);}return emitter;}
}
方法实现 Service 
/*** 方法具体实现 Service* @Author: Tenk*/
@Service
public class CozeServiceImpl implements CozeService {// @Autowired Mapper Service ……@Autowiredprivate CozeJWTOAuthUtil cozeJWTOAuthUtil;// 流式消息状态private static final String inProgress = "in-progress"; // 进行中private static final String done = "done";              // 完成/*** 构建 SSE 返回格式** @param status 响应状态(in-progress / done)* @param data   数据内容,可以是 textBuffer 或最终 data 对象* @return 构造好的 JSON 对象*/private JSONObject buildSseResult(String status, Object data) {JSONObject result = new JSONObject();result.put("status", status);result.put("data", data);return result;}@Override@Transactionalpublic void sendFlowMessage(ChatBo bo, SseEmitter emitter) {// 1. 初始化 Coze API 客户端CozeAPI cozeAPI = cozeJWTOAuthUtil.createCozeAPIByUser(cozeJWTOAuthUtil.getAccessToken(bo.getUserId()).getAccessToken());// 2. 构造用户发送的消息CreateMessageReq msgReq = CreateMessageReq.builder().conversationID(bo.getConversationId()).role(MessageRole.USER).content(bo.getContent()).contentType(MessageContentType.TEXT).build();// 整理用户消息,插入消息历史数据表Message userMsg = cozeAPI.conversations().messages().create(msgReq).getMessage();CozeMsgLog userMsgLog = new CozeMsgLog(bo.getUserId(),bo.getConversationId(),userMsg.getBotId(),userMsg.getChatId(),userMsg.getId(),null,bo.getContent(),userMsg.getContentType().getValue(),userMsg.getMetaData().toString(),userMsg.getReasoningContent(),userMsg.getRole().getValue(),userMsg.getSectionId(),MessageType.QUESTION.getValue(),new Date(userMsg.getCreatedAt() * 1000),new Date(userMsg.getUpdatedAt() * 1000));cozeMsgLogService.insertCozeMsgLog(userMsgLog);// 4. 打开 Coze 流式对话Flowable<ChatEvent> chatStream = cozeAPI.chat().stream(CreateChatReq.builder().botID(cozeJWTOAuthUtil.getCozeConfig().getTripBotId()).stream(true).autoSaveHistory(true).conversationID(bo.getConversationId()).userID(bo.getUserId().toString()).messages(Collections.singletonList(userMsg)).build());// 5. 发送初始提示信息try {JSONObject delayJson = new JSONObject();delayJson.put("type", "delay");delayJson.put("delayReason", "开始思考……");emitter.send(buildSseResult(inProgress, delayJson));} catch (IOException e) {log.error("规划行程开始错误", e);throw new ServiceException("规划行程开始错误");}StringBuffer fullContent = new StringBuffer();// 完整 AI 回复文本,包含一些不想给前端的特殊符号List<JSONObject> textBuffer = new ArrayList<>();     // 缓冲 SSE 数据int bufferThreshold = 3;                             // 缓冲阈值,当缓冲列表长度超过此值时,发送给前端// 7. 订阅流式对话事件chatStream.timeout(10, TimeUnit.MINUTES).observeOn(Schedulers.io()).subscribe(event -> {// 增量消息(例如:['H','e','l','l','o',' ','W','o','r','l',……])if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {// 提取增量消息String delta = event.getMessage().getContent();// 逐步拼接成完整消息fullContent.append(delta);// 清洗输出给前端的文本:去除 <、>、[、] 特殊符号,发送给前端,视情况而定,非必须String cleanText= delta.replaceAll("[<>\\[\\]]", "");// TODO 自定义业务逻辑// 实际发送if (!cleanText.isEmpty()) {JSONObject textJson = new JSONObject();textJson.put("type", "text");textJson.put("text", cleanText);synchronized (textBuffer) {// 添加到缓冲列表textBuffer.add(textJson);// 发送缓冲列表if (textBuffer.size() >= bufferThreshold) {/** 示例* {*     "data": [*         {*             "text": "hello world\n",*             "type": "text"*         },*         {*             "text": "### title three\n",*             "type": "text"*         },*         {*             "text": "#### title four\n",*             "type": "text"*         }*     ],*     "status": "in-progress"* }*/emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}}// TODO 自定义业务逻辑}// AI处理、回复完成// event:conversation.message.completed会有两次,&&后的条件是取其中一次,详见 https://www.coze.cn/open/docs/developer_guides/chat_v3#70a1d1bdif (ChatEventType.CONVERSATION_MESSAGE_COMPLETED.equals(event.getEvent()) && MessageType.ANSWER.getValue().equals(event.getMessage().getType().getValue())) {// === 最后一批textBuffer没发的统一发出 ===synchronized (textBuffer) {if (!textBuffer.isEmpty()) {emitter.send(buildSseResult(inProgress, new ArrayList<>(textBuffer)));textBuffer.clear();}}// === 构造最终完成的数据包 ===JSONObject finalData = new JSONObject();finalData.put("xxx", "自定义数据");finalData.put("yyy", "自定义内容");finalData.put("botMessage", fullContent.toString());// 发送状态为 done 的 SSEemitter.send(buildSseResult(done, finalData));// AI回复的内容,插入消息历史数据表Message message = event.getMessage();CozeMsgLog aiMsgLog = new CozeMsgLog(bo.getUserId(),message.getConversationId(),message.getBotId(),message.getChatId(),message.getId(),message.getContent(),finalData.toString(),message.getContentType().getValue(),message.getMetaData() == null ? null : message.getMetaData().toString(),message.getReasoningContent(),message.getRole().getValue(),message.getSectionId(),message.getType().getValue(),new Date(message.getCreatedAt() * 1000),new Date());cozeMsgLogService.insertCozeMsgLog(aiMsgLog);}},error -> {log.error("AI对话异常:{}", error.getMessage(), error);emitter.send(buildSseResult(done, "AI思考失败"));emitter.completeWithError(error);cozeAPI.shutdownExecutor();},() -> {// 释放资源emitter.complete();cozeAPI.shutdownExecutor();});}
}

相关文章:

  • Android 网络全栈攻略(五)—— 从 OkHttp 拦截器来看 HTTP 协议二
  • 使用OpenSSL生成根证书并自签署证书
  • 数据结构(6)线性表-队列
  • 【leetcode】3356. 零数组变换②
  • 字节跳动旗下火山引擎都覆盖哪些领域
  • 四、GPU是如何成为当前电脑中不可或缺的一部分的,opengl在其中起到了什么效果
  • 基于SpringMVC的动态时钟设计
  • youyu:91501
  • 学习黑客Nmap 是什么?
  • React从基础入门到高级实战:React 基础入门 - 简介与开发环境搭建
  • [逻辑回归]机器学习-part11
  • 相机--基础
  • Widget进阶
  • Python训练营打卡 Day25
  • 5.2.3 使用配置文件方式整合MyBatis
  • 《数据结构初阶》【八大排序——巅峰决战】
  • 【论文精读】2022 CVPR--RealBasicVSR现实世界视频超分辨率(RealWorld VSR)
  • 【Harmony OS】作业四 布局
  • Pluto实验报告——基于FM的音频信号传输并解调恢复
  • 机器学习中的维度、过拟合、降维
  • 360搜索首页/优化推广网站怎么做
  • 上海做电缆桥架的公司网站/今日国际新闻摘抄
  • 邹城做网站/百度下载安装2019
  • 什么网站可以做自考试题/qq排名优化网站
  • 怎么更换网站logo/广告公司注册
  • 网站建设报告实训步骤/seo专业技术培训