当前位置: 首页 > news >正文

SpringAI Alibaba Graph 流式对话

Spring AI Graph 项目技术博客

项目概述

本次Demo演示的是一个基于 Spring AI 和 Alibaba Cloud AI Graph 的智能对话系统,展示了如何使用图计算的方式构建 AI 应用流程。项目采用响应式编程和流式处理,实现了高效的 AI 对话服务。

核心架构

1. 图计算架构

使用 StateGraph 来定义 AI 处理流程,通过节点(Node)和边(Edge)的方式组织业务逻辑:

START → query → result → END
  • query 节点: 负责调用大语言模型处理用户查询
  • result 节点: 处理最终结果并输出

2. 流式处理

采用 Server-Sent Events (SSE) 技术实现实时流式响应,用户可以实时看到 AI 的回复过程。

3. 异步处理

使用 AsyncGeneratorFlux 实现异步非阻塞的数据处理,提高系统性能。

核心组件详解

DemoBoxOneGraph - 图定义配置

/*** DemoBoxOneGraph - AI对话图计算配置类* * 该类负责配置和构建一个基于Spring AI的智能对话流程图。* 使用Alibaba Cloud AI Graph框架实现节点编排和状态管理。* * 流程图结构:* START → query(查询节点) → result(结果节点) → END* * @author xinggui* @version 1.0* @since 2024*/
@Configuration
@Slf4j
public class DemoBoxOneGraph {/*** 注入OpenAI聊天模型,用于AI对话生成*/@Resourceprivate OpenAiChatModel openAiChatModel;/*** 创建并配置AI对话状态图* * 该方法构建一个完整的对话流程图,包含:* 1. 查询节点:调用大语言模型处理用户输入* 2. 结果节点:处理和输出最终结果* 3. 状态管理:使用ReplaceStrategy策略管理状态更新* 4. 流程可视化:生成Mermaid格式的流程图* * @param openAiChatModel OpenAI聊天模型实例* @return 配置完成的状态图* @throws GraphStateException 当图状态配置出现错误时抛出*/@Bean("demoBoxOnesGraph")public StateGraph stateGraph(OpenAiChatModel openAiChatModel) throws GraphStateException {// 构建聊天客户端,添加日志记录器ChatClient chatClient = ChatClient.builder(openAiChatModel).defaultAdvisors(new SimpleLoggerAdvisor()).build();// 创建状态工厂,定义状态管理策略OverAllStateFactory factory = () -> {OverAllState state = new OverAllState();// 注册查询状态,使用替换策略(新值覆盖旧值)state.registerKeyAndStrategy("query", new ReplaceStrategy());// 注册结果状态,使用替换策略state.registerKeyAndStrategy("result", new ReplaceStrategy());return state;};// 构建状态图,定义节点和边的关系StateGraph stateGraph = new StateGraph("demoBoxOne", factory)// 添加查询节点,使用异步执行方式.addNode("query", AsyncNodeAction.node_async(new DemoBoxOneNode(chatClient)))// 添加结果节点,使用异步执行方式.addNode("result", AsyncNodeAction.node_async(new DemoBoxOneNode.DemoBoxResultNode()))// 定义流程边:START → query → result → END.addEdge(StateGraph.START, "query")      // 从开始到查询节点.addEdge("query", "result")              // 从查询节点到结果节点.addEdge("result", StateGraph.END);      // 从结果节点到结束// 生成并打印PlantUML格式的流程图,便于开发和调试GraphRepresentation representation = stateGraph.getGraph(GraphRepresentation.Type.MERMAID,"expander flow");log.info("\n=== expander UML Flow ===");log.info(representation.content());log.info("==================================\n");return stateGraph;}
}

关键特性:

  • 状态管理策略:使用 ReplaceStrategy 确保状态正确更新
  • 异步节点:所有节点都配置为异步执行
  • 流程可视化:自动生成 Mermaid 格式的流程图

DemoBoxOneNode - 业务逻辑节点

/*** DemoBoxOneNode - AI对话处理节点* * 该类实现了图计算中的核心业务逻辑节点,负责:* 1. 调用大语言模型处理用户查询* 2. 生成流式响应* 3. 管理节点间的状态传递* * 包含两个内部类:* - DemoBoxOneNode: 主要的查询处理节点* - DemoBoxResultNode: 结果输出节点* * @author xinggui* @version 1.0* @since 2024*/
@Slf4j
public class DemoBoxOneNode implements NodeAction {/*** 聊天客户端,用于与大语言模型进行交互*/private final ChatClient chatClient;/*** 构造函数,注入聊天客户端* * @param chatClient 配置好的聊天客户端实例*/public DemoBoxOneNode(ChatClient chatClient) {this.chatClient = chatClient;}/*** 执行节点的主要业务逻辑* * 该方法实现了以下功能:* 1. 从状态中获取用户查询内容* 2. 构建系统提示词和用户查询* 3. 调用大语言模型生成流式响应* 4. 将响应转换为异步生成器* 5. 返回包含生成器的状态映射* * @param state 当前图的状态对象,包含所有节点的共享状态* @return 包含异步生成器的状态映射* @throws Exception 当处理过程中出现错误时抛出*/@Overridepublic Map<String, Object> apply(OverAllState state) throws Exception {// 从状态中获取用户查询,如果没有则使用空字符串String query = (String) state.value("query").orElse("");// 构建流式聊天请求// 系统提示词:定义AI助手的角色和行为// 用户查询:将用户问题传递给AI模型Flux<ChatResponse> chatResponseFlux = chatClient.prompt().system("你是一个Java架构师,主要回答一些Java架构设计选型技术方面的事情。请接下来保持这样方式来回答客户").user((user) -> user.text("请回答用户的问题:").param("query", query)).stream().chatResponse();// 构建流式聊天生成器// 该生成器负责将聊天响应转换为节点输出AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder().startingNode("data")                    // 设置起始节点名称.startingState(state)                     // 设置起始状态.mapResult(chatResponse -> {              // 映射聊天响应到节点输出// 提取AI回复的文本内容String text = chatResponse.getResult().getOutput().getText();// 将文本内容包装在context字段中return Map.of("context", text);}).buildWithChatResponse(chatResponseFlux); // 使用聊天响应流构建生成器// 返回包含生成器的状态映射// 其他节点可以通过"data"键访问这个生成器return Map.of("data", generator);}/*** DemoBoxResultNode - 结果输出节点* * 该内部类负责处理最终的结果输出,主要功能:* 1. 从状态中提取处理结果* 2. 记录日志信息* 3. 返回最终结果状态*/static class DemoBoxResultNode implements NodeAction {/*** 执行结果节点的业务逻辑* * @param state 当前图的状态对象* @return 包含最终结果的状态映射* @throws Exception 当处理过程中出现错误时抛出*/@Overridepublic Map<String, Object> apply(OverAllState state) throws Exception {// 从状态中获取结果内容String result = (String) state.value("result").orElse("");// 记录最终结果到日志log.info("最终的结果result是: {}", result);// 返回包含结果的状态映射return Map.of("result", result);}}
}

核心功能:

  • 大模型调用:集成 OpenAI 模型进行智能对话
  • 流式生成:支持流式响应,实时返回结果
  • 状态传递:在节点间传递处理状态和数据

StreamUtils - 流式处理工具

/*** StreamUtils - 流式处理工具类* * 该类负责将异步生成器的流式输出转换为Server-Sent Events (SSE)格式,* 实现实时流式响应,支持客户端实时接收AI对话的生成过程。* * 主要功能:* 1. 异步处理流式输出* 2. 转换为SSE事件流* 3. 过滤和格式化输出内容* 4. 错误处理和资源管理* * @author xinggui* @version 1.0* @since 2024*/
@Slf4j
public class StreamUtils {/*** 单线程执行器,用于处理异步流式输出* 使用单线程确保事件处理的顺序性和一致性*/private static final ExecutorService executor = Executors.newSingleThreadExecutor();/*** 处理异步生成器的流式输出,转换为SSE事件流* * 该方法实现了以下核心功能:* 1. 异步处理AsyncGenerator的输出* 2. 区分不同类型的节点输出(流式输出和普通输出)* 3. 过滤掉不需要的中间节点事件* 4. 将输出转换为JSON格式的SSE事件* 5. 优雅处理完成和异常情况* * @param resultFuture 异步生成器,包含图计算的流式输出* @param sink SSE事件接收器,用于向客户端发送事件流*/public static void processStream(AsyncGenerator<NodeOutput> resultFuture, Sinks.Many<ServerSentEvent<String>> sink) {// 提交任务到执行器,异步处理流式输出executor.submit(() -> {// 遍历异步生成器的所有输出resultFuture.forEachAsync(output -> {try {// 获取当前输出节点的名称String nodeName = output.node();String content;// 判断输出类型,处理流式输出和普通输出if (output instanceof StreamingOutput streamingOutput) {// 处理流式输出(如聊天响应)// 提取聊天响应的元数据信息ChatResponse chatResponse = streamingOutput.chatResponse();// 将元数据转换为JSON格式content = JSON.toJSONString(Map.of(nodeName, chatResponse.getMetadata()));} else {// 处理普通节点输出JSONObject nodeOutput = new JSONObject();// 提取节点状态数据nodeOutput.put("data", output.state().data());// 记录节点名称nodeOutput.put("node", nodeName);// 转换为JSON字符串content = JSON.toJSONString(nodeOutput);}// 过滤掉"query"节点的事件,避免发送不必要的中间状态// 只发送有意义的输出事件给客户端if (!nodeName.equalsIgnoreCase("query")) {// 创建SSE事件并发送给客户端sink.tryEmitNext(ServerSentEvent.builder(content).build());}// 当节点名是"query"时,直接跳过,不发送任何SSE事件} catch (Exception e) {// 记录处理过程中的错误log.error("error", e);}}).thenAccept(v -> {// 所有输出处理完成后,发送完成事件sink.tryEmitComplete();}).exceptionally(e -> {// 处理过程中出现异常时的错误处理log.error("error", e);return null;});});}
}

主要职责:

  • 流式转换:将 AsyncGenerator 转换为 SSE 事件流
  • 事件过滤:过滤掉不需要的中间节点事件
  • 错误处理:优雅处理流式处理中的异常情况

技术亮点

1. 响应式编程

  • 使用 Spring WebFlux 和 Project Reactor
  • 非阻塞 I/O 处理
  • 背压控制机制

2. 流式 AI 响应

  • 实时流式输出
  • 支持长对话场景
  • 客户端实时接收响应

3. 图计算模式

  • 清晰的数据流向
  • 易于扩展和维护
  • 支持复杂的业务逻辑编排

使用方法

1. 调用接口

  • 流式调用: /code6 (返回 SSE 流)

总结

Spring AI Graph 这个流式响应的Demo展示了AI 应用开发的最佳实践,通过图计算、流式处理和响应式编程的结合,构建了高性能、可扩展的智能对话系统。这种架构模式特别适合需要复杂业务逻辑编排的 AI 应用场景。

http://www.dtcms.com/a/365331.html

相关文章:

  • python sqlalchemy模型的建立
  • 嵌入式硬件学习-2
  • Algorithms library
  • Qoder如何免费续杯,立即参与实践分享,赢 1000Credits
  • 解决windows下火狐浏览器开机会同时启动两个或多个页面
  • 为何quest3设备会强制更新,如何屏蔽更新
  • GoogleNet:更深的网络与更高的效率
  • 大模型的偏见:从训练数据到推理结果,如何检测与修正?
  • Voicemod-免费即时变声器
  • 【程序人生】有梦想就能了不起,就怕你没梦想
  • Redis 集群模式与高可用机制
  • 深度学习篇---Adam优化器
  • 计算机网络模型总概述
  • python抖音弹幕获取方案
  • 考研复习-计算机网络-第二章-物理层
  • 服务器安装vnc服务端
  • 深度学习篇---InceptionNet网络结构
  • Ecovadis评估认证准备期间对于公司员工培训有没有什么技巧?
  • 对轮询的理解
  • 手持式气象观测仪在短期监测项目的作用
  • 深度学习之第六课卷积神经网络 (CNN)如何保存和使用最优模型
  • GOFLY开源客服系统-处理gin框架下的session中间件
  • 【线段树 懒删除堆】P12372 [蓝桥杯 2022 省 Python B] 最优清零方案|普及+
  • 【Python接口自动化】调用飞书机器人
  • TLSF内存算法适配HTOS
  • React实现列表拖拽排序
  • PyTorch实战(6)——模型微调详解
  • 落地页测试case(Android视角)
  • Redis突然挂了,数据丢了多少?就看你用RDB还是AOF
  • SecureCRT v9.5.2 Mac SSH终端操作工具