510-Spring AI Alibaba Graph Stream Node 示例

本案例将引导您一步步构建一个 Spring Boot 应用,演示如何利用 Spring AI Alibaba 的 Graph Stream Node 功能,实现流式图处理和节点扩展。
1. 案例目标
我们将创建一个包含核心功能的 Web 应用:
- 查询扩展 (
/graph/stream/expand):通过图节点处理,将用户输入的查询扩展为多个不同版本的查询变体,以扩大搜索空间并提高找到相关信息的几率。 - 流式响应处理:利用 Spring AI Alibaba 的流式处理能力,实时返回节点处理结果,提供更好的用户体验。
2. 技术栈与核心依赖
- Spring Boot 3.x
- Spring AI Alibaba (用于对接阿里云 DashScope 通义大模型)
- Spring AI Alibaba Graph Core (用于图处理和节点管理)
- Maven (项目构建工具)
在 pom.xml 中,你需要引入以下核心依赖:
<dependencies><!-- Spring AI Alibaba 核心启动器,集成 DashScope --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter-dashscope</artifactId></dependency><!-- Spring AI Chat 客户端自动配置 --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-autoconfigure-model-chat-client</artifactId></dependency><!-- Spring AI Alibaba Graph 核心库 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-graph-core</artifactId><version>1.0.0.3</version></dependency><!-- Spring Web 用于构建 RESTful API --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>3. 项目配置
在 src/main/resources/application.yml 文件中,配置你的 DashScope API Key 和其他设置。
server:port: 8080
spring:application:name: stream-nodeai:dashscope:api-key: ${AI_DASHSCOPE_API_KEY}chat:options:model: qwen-max重要提示:请将 AI_DASHSCOPE_API_KEY 环境变量设置为你从阿里云获取的有效 API Key。你也可以直接将其写在配置文件中,但这不推荐用于生产环境。
4. 核心代码实现
4.1 图节点配置 (GraphNodeStreamConfiguration.java)
配置图节点和流程,定义状态图的结构。
package com.alibaba.cloud.ai.graph.config;import com.alibaba.cloud.ai.graph.GraphRepresentation;
import com.alibaba.cloud.ai.graph.KeyStrategy;
import com.alibaba.cloud.ai.graph.KeyStrategyFactory;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.exception.GraphStateException;
import com.alibaba.cloud.ai.graph.node.ExpanderNode;
import com.alibaba.cloud.ai.graph.state.strategy.ReplaceStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;@Configuration
public class GraphNodeStreamConfiguration {private static final Logger logger = LoggerFactory.getLogger(GraphNodeStreamConfiguration.class);@Beanpublic StateGraph streamGraph(ChatClient.Builder chatClientBuilder) throws GraphStateException {KeyStrategyFactory keyStrategyFactory = () -> {HashMap keyStrategyHashMap = new HashMap<>();// 用户输入keyStrategyHashMap.put("query", new ReplaceStrategy());keyStrategyHashMap.put("expander_number", new ReplaceStrategy());keyStrategyHashMap.put("expander_content", new ReplaceStrategy());return keyStrategyHashMap;};StateGraph stateGraph = new StateGraph(keyStrategyFactory).addNode("expander", node_async(new ExpanderNode(chatClientBuilder))).addEdge(StateGraph.START, "expander").addEdge("expander", StateGraph.END);// 添加 PlantUML 打印GraphRepresentation representation = stateGraph.getGraph(GraphRepresentation.Type.PLANTUML,"expander flow");logger.info("\n=== expander UML Flow ===");logger.info(representation.content());logger.info("==================================\n");return stateGraph;}
}4.2 扩展节点实现 (ExpanderNode.java)
实现查询扩展节点,将用户输入的查询扩展为多个不同版本的查询变体。
package com.alibaba.cloud.ai.graph.node;import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.streaming.StreamingChatGenerator;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.PromptTemplate;
import reactor.core.publisher.Flux;import java.util.Arrays;
import java.util.List;
import java.util.Map;public class ExpanderNode implements NodeAction {private static final PromptTemplate DEFAULT_PROMPT_TEMPLATE = new PromptTemplate("You are an expert at information retrieval and search optimization.\nYour task is to generate {number} different versions of the given query.\n\nEach variant must cover different perspectives or aspects of the topic,\nwhile maintaining the core intent of the original query. The goal is to\nexpand the search space and improve the chances of finding relevant information.\n\nDo not explain your choices or add any other text.\nProvide the query variants separated by newlines.\n\nOriginal query: {query}\n\nQuery variants:\n");private final ChatClient chatClient;private final Integer NUMBER = 3;public ExpanderNode(ChatClient.Builder chatClientBuilder) {this.chatClient = chatClientBuilder.build();}@Overridepublic Map apply(OverAllState state) throws Exception {String query = state.value("query", "");Integer expanderNumber = state.value("expander_number", this.NUMBER);Flux chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("number", expanderNumber).param("query", query)).stream().chatResponse();AsyncGenerator generator = StreamingChatGenerator.builder().startingNode("expander_llm_stream").startingState(state).mapResult(response -> {String text = response.getResult().getOutput().getText();List queryVariants = Arrays.asList(text.split("\n"));return Map.of("expander_content", queryVariants);}).build(chatResponseFlux);return Map.of("expander_content", generator);}
}4.3 流式控制器 (GraphStreamController.java)
实现 REST API 接口,处理流式响应。
package com.alibaba.cloud.ai.graph.controller;import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.controller.GraphProcess.GraphProcess;
import com.alibaba.cloud.ai.graph.exception.GraphRunnerException;
import com.alibaba.cloud.ai.graph.exception.GraphStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping("/graph/stream")
public class GraphStreamController {private static final Logger logger = LoggerFactory.getLogger(GraphStreamController.class);private final CompiledGraph compiledGraph;public GraphStreamController(@Qualifier("streamGraph")StateGraph stateGraph) throws GraphStateException {this.compiledGraph = stateGraph.compile();}@GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,@RequestParam(value = "expander_number", defaultValue = "3", required = false) Integer expanderNumber,@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId) throws GraphRunnerException {RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();Map objectMap = new HashMap<>();objectMap.put("query", query);objectMap.put("expander_number", expanderNumber);GraphProcess graphProcess = new GraphProcess(this.compiledGraph);Sinks.Many> sink = Sinks.many().unicast().onBackpressureBuffer();AsyncGenerator resultFuture = compiledGraph.stream(objectMap, runnableConfig);graphProcess.processStream(resultFuture, sink);return sink.asFlux().doOnCancel(() -> logger.info("Client disconnected from stream")).doOnError(e -> logger.error("Error occurred during streaming", e));}
}4.4 流式处理类 (GraphProcess.java)
处理流式响应的数据格式和发送。
package com.alibaba.cloud.ai.graph.controller.GraphProcess;import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Sinks;import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class GraphProcess {private static final Logger logger = LoggerFactory.getLogger(GraphProcess.class);private final ExecutorService executor = Executors.newSingleThreadExecutor();private CompiledGraph compiledGraph;public GraphProcess(CompiledGraph compiledGraph) {this.compiledGraph = compiledGraph;}public void processStream(AsyncGenerator generator, Sinks.Many> sink) {executor.submit(() -> {generator.forEachAsync(output -> {try {logger.info("output = {}", output);String nodeName = output.node();String content;if (output instanceof StreamingOutput streamingOutput) {content = JSON.toJSONString(Map.of(nodeName, streamingOutput.chunk()));} else {JSONObject nodeOutput = new JSONObject();nodeOutput.put("data", output.state().data());nodeOutput.put("node", nodeName);content = JSON.toJSONString(nodeOutput);}sink.tryEmitNext(ServerSentEvent.builder(content).build());} catch (Exception e) {throw new CompletionException(e);}}).thenAccept(v -> {// 正常完成sink.tryEmitComplete();}).exceptionally(e -> {sink.tryEmitError(e);return null;});});}
}5. 运行与测试
- 启动应用:运行
StreamNodeApplication主程序。 - 使用浏览器或 API 工具(如 Postman, curl)进行测试。
测试:查询扩展
访问以下 URL,测试查询扩展功能:
http://localhost:8080/graph/stream/expand?query=如何学习人工智能?&expander_number=3预期响应(流式输出):
响应将包含多个不同版本的查询变体,每个变体从不同角度覆盖原始查询的主题,同时保持原始查询的核心意图。
6. 实现思路与扩展建议
实现思路
本案例的核心思想是"图节点流式处理"。我们利用 Spring AI Alibaba 的图处理能力,构建了一个包含以下特点的系统:
- 节点化处理:将查询扩展功能封装为独立的节点,便于维护和扩展。
- 流式响应:通过 Server-Sent Events (SSE) 实现实时数据流传输,提升用户体验。
- 状态管理:使用 StateGraph 管理节点间的状态传递,确保数据一致性。
- 异步处理:利用 Reactor 框架实现非阻塞的异步处理,提高系统吞吐量。
扩展建议
- 多节点协作:可以添加更多节点,如检索节点、排序节点、过滤节点等,构建更复杂的图处理流程。
- 条件分支:根据查询内容或用户需求,动态选择不同的处理路径。
- 节点缓存:对于频繁使用的节点结果,可以增加缓存层,避免重复计算。
- 性能监控:添加节点处理时间、成功率等监控指标,便于优化系统性能。
- 集成向量数据库:将扩展后的查询与向量数据库结合,实现更智能的检索增强生成(RAG)系统。
