507-Spring AI Alibaba Graph Human Node 功能完整案例
本案例基于 Spring AI Alibaba Graph 框架实现,演示了如何在 AI 工作流中引入人类反馈节点,实现人机协作的工作流程。在实际业务场景中,经常会遇到需要人类介入的场景,人类的不同操作将影响工作流不同的走向。
1. 案例目标
我们将创建一个包含三个核心节点的 Web 应用:
- 扩展节点 (
expander
):AI 模型流式对问题进行扩展输出,生成多个不同版本的查询变体。 - 人类反馈节点 (
human_feedback
):通过对用户的反馈,决定是直接结束工作流,还是接着执行翻译节点。 - 翻译节点 (
translate
):将问题翻译为英文或其他指定语言。
2. 技术栈与核心依赖
- Spring Boot 3.x
- Spring AI Alibaba (用于对接阿里云 DashScope 通义大模型)
- Spring AI Alibaba Graph (用于构建 AI 工作流)
- Maven (项目构建工具)
在 pom.xml
中,你需要引入以下核心依赖:
<dependencies><!-- Spring AI Alibaba 核心启动器,集成 DashScope --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter-dashscope</artifactId></dependency><!-- Spring AI Chat 客户端自动配置 --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-autoconfigure-model-chat-client</artifactId></dependency><!-- Spring AI Alibaba Graph 核心库 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-graph-core</artifactId><version>1.0.0.3</version></dependency><!-- Spring Web 用于构建 RESTful API --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
3. 项目配置
在 src/main/resources/application.yml
文件中,配置你的 DashScope API Key 和模型选项。
server:port: 8080
spring:application:name: human-nodeai:dashscope:api-key: ${AI_DASHSCOPE_API_KEY} # 建议使用环境变量,更安全chat:options:model: qwen-max
重要提示:请将 AI_DASHSCOPE_API_KEY
环境变量设置为你从阿里云获取的有效 API Key。你也可以直接将其写在配置文件中,但这不推荐用于生产环境。
4. 工作流设计
本案例实现了一个简单的人机协作工作流,包含三个节点,流程如下:
- 从
START
开始,首先进入expander
节点 expander
节点执行完毕后,进入human_feedback
节点- 在
human_feedback
节点,根据用户反馈决定下一步:- 如果反馈为
true
,则进入translate
节点 - 如果反馈为
false
,则直接进入END
结束流程
- 如果反馈为
translate
节点执行完毕后,进入END
结束流程
4.1 工作流配置
在 GraphHumanConfiguration.java
中,我们定义了整个工作流的结构和节点之间的关系:
@Bean
public StateGraph humanGraph(ChatClient.Builder chatClientBuilder) throws GraphStateException {KeyStrategyFactory keyStrategyFactory = () -> {HashMap<String, KeyStrategy> keyStrategyHashMap = new HashMap<>();// 用户输入keyStrategyHashMap.put("query", new ReplaceStrategy());keyStrategyHashMap.put("thread_id", new ReplaceStrategy());keyStrategyHashMap.put("expander_number", new ReplaceStrategy());keyStrategyHashMap.put("expander_content", new ReplaceStrategy());// 人类反馈keyStrategyHashMap.put("feed_back", new ReplaceStrategy());keyStrategyHashMap.put("human_next_node", new ReplaceStrategy());// 是否需要翻译keyStrategyHashMap.put("translate_language", new ReplaceStrategy());keyStrategyHashMap.put("translate_content", new ReplaceStrategy());return keyStrategyHashMap;};StateGraph stateGraph = new StateGraph(keyStrategyFactory).addNode("expander", node_async(new ExpanderNode(chatClientBuilder))).addNode("translate", node_async(new TranslateNode(chatClientBuilder))).addNode("human_feedback", node_async(new HumanFeedbackNode())).addEdge(StateGraph.START, "expander").addEdge("expander", "human_feedback").addConditionalEdges("human_feedback", AsyncEdgeAction.edge_async((new HumanFeedbackDispatcher())), Map.of("translate", "translate", StateGraph.END, StateGraph.END)).addEdge("translate", StateGraph.END);return stateGraph;
}
5. 节点实现
5.1 扩展节点 (ExpanderNode)
扩展节点接收用户查询,并使用 AI 模型生成多个不同版本的查询变体:
public class ExpanderNode implements NodeAction {private static final PromptTemplate DEFAULT_PROMPT_TEMPLATE = new PromptTemplate("You are an expert at information retrieval and search optimization.\n" +"Your task is to generate {number} different versions of the given query.\n\n" +"Each variant must cover different perspectives or aspects of the topic,\n" +"while maintaining the core intent of the original query. The goal is to\n" +"expand the search space and improve the chances of finding relevant information.\n\n" +"Do not explain your choices or add any other text.\n" +"Provide the query variants separated by newlines.\n\n" +"Original query: {query}\n\n" +"Query variants:\n");private final ChatClient chatClient;public ExpanderNode(ChatClient.Builder chatClientBuilder) {this.chatClient = chatClientBuilder.build();}@Overridepublic Map<String, Object> apply(OverAllState state) {String query = state.value("query", "");Integer expanderNumber = state.value("expander_number", this.NUMBER);Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("number", expanderNumber).param("query", query)).stream().chatResponse();AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder().startingNode("expander_llm_stream").startingState(state).mapResult(response -> {String text = response.getResult().getOutput().getText();List<String> queryVariants = Arrays.asList(text.split("\n"));return Map.of("expander_content", queryVariants);}).build(chatResponseFlux);return Map.of("expander_content", generator);}
}
5.2 人类反馈节点 (HumanFeedbackNode)
人类反馈节点根据用户输入的反馈,决定工作流的下一步走向:
public class HumanFeedbackNode implements NodeAction {@Overridepublic Map<String, Object> apply(OverAllState state) {HashMap<String, Object> resultMap = new HashMap<>();String nextStep = StateGraph.END;Map<String, Object> feedBackData = state.humanFeedback().data();boolean feedback = (boolean) feedBackData.getOrDefault("feed_back", true);if (feedback) {nextStep = "translate";}resultMap.put("human_next_node", nextStep);return resultMap;}
}
5.3 翻译节点 (TranslateNode)
翻译节点将用户查询翻译为指定的目标语言(默认为英文):
public class TranslateNode implements NodeAction {private static final PromptTemplate DEFAULT_PROMPT_TEMPLATE = new PromptTemplate("Given a user query, translate it to {targetLanguage}.\n" +"If the query is already in {targetLanguage}, return it unchanged.\n" +"If you don't know the language of the query, return it unchanged.\n" +"Do not add explanations nor any other text.\n\n" +"Original query: {query}\n\n" +"Translated query:\n");private final ChatClient chatClient;private final String TARGET_LANGUAGE = "English";public TranslateNode(ChatClient.Builder chatClientBuilder) {this.chatClient = chatClientBuilder.build();}@Overridepublic Map<String, Object> apply(OverAllState state) {String query = state.value("query", "");String targetLanguage = state.value("translate_language", TARGET_LANGUAGE);Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("targetLanguage", targetLanguage).param("query", query)).stream().chatResponse();AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder().startingNode("translate_llm_stream").startingState(state).mapResult(response -> {String text = response.getResult().getOutput().getText();List<String> queryVariants = Arrays.asList(text.split("\n"));return Map.of("translate_content", queryVariants);}).build(chatResponseFlux);return Map.of("translate_content", generator);}
}
6. 控制器实现
6.1 GraphHumanController
控制器提供了两个主要的 API 端点,用于启动和恢复工作流:
@RestController
@RequestMapping("/graph/human")
public class GraphHumanController {private final CompiledGraph compiledGraph;@Autowiredpublic GraphHumanController(@Qualifier("humanGraph") StateGraph stateGraph) throws GraphStateException {SaverConfig saverConfig = SaverConfig.builder().register(SaverConstant.MEMORY, new MemorySaver()).build();this.compiledGraph = stateGraph.compile(CompileConfig.builder().saverConfig(saverConfig).interruptBefore("human_feedback").build());}// 启动工作流@GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,@RequestParam(value = "expander_number", defaultValue = "3", required = false) Integer expanderNumber,@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId) throws GraphRunnerException {RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();Map<String, Object> objectMap = new HashMap<>();objectMap.put("query", query);objectMap.put("expander_number", expanderNumber);GraphProcess graphProcess = new GraphProcess(this.compiledGraph);Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();AsyncGenerator<NodeOutput> resultFuture = compiledGraph.stream(objectMap, runnableConfig);graphProcess.processStream(resultFuture, sink);return sink.asFlux().doOnCancel(() -> logger.info("Client disconnected from stream")).doOnError(e -> logger.error("Error occurred during streaming", e));}// 恢复工作流@GetMapping(value = "/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> resume(@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId,@RequestParam(value = "feed_back", defaultValue = "true", required = false) boolean feedBack) throws GraphRunnerException {RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();StateSnapshot stateSnapshot = this.compiledGraph.getState(runnableConfig);OverAllState state = stateSnapshot.state();state.withResume();Map<String, Object> objectMap = new HashMap<>();objectMap.put("feed_back", feedBack);state.withHumanFeedback(new OverAllState.HumanFeedback(objectMap, ""));Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();GraphProcess graphProcess = new GraphProcess(this.compiledGraph);AsyncGenerator<NodeOutput> resultFuture = compiledGraph.streamFromInitialNode(state, runnableConfig);graphProcess.processStream(resultFuture, sink);return sink.asFlux().doOnCancel(() -> logger.info("Client disconnected from stream")).doOnError(e -> logger.error("Error occurred during streaming", e));}
}
7. 运行与测试
- 启动应用:运行你的 Spring Boot 主程序。
- 使用浏览器或 API 工具(如 Postman, curl)进行测试。
测试 1:启动工作流
访问以下 URL,启动工作流,让 AI 扩展查询:
http://localhost:8080/graph/human/expand?query=什么是人工智能?&expander_number=3&thread_id=test123
预期响应(流式输出):
工作流将首先执行扩展节点,生成多个不同版本的查询变体,然后在人类反馈节点处暂停,等待用户反馈。
测试 2:提供反馈并继续执行翻译节点
访问以下 URL,提供正向反馈(feed_back=true),继续执行翻译节点:
http://localhost:8080/graph/human/resume?thread_id=test123&feed_back=true
预期响应(流式输出):
工作流将从人类反馈节点继续执行,进入翻译节点,将原始查询翻译为英文。
测试 3:提供反馈并结束工作流
访问以下 URL,提供负向反馈(feed_back=false),直接结束工作流:
http://localhost:8080/graph/human/resume?thread_id=test123&feed_back=false
预期响应(流式输出):
工作流将从人类反馈节点直接进入结束状态,不执行翻译节点。
8. 实现思路与扩展建议
实现思路
本案例的核心思想是"人机协作的工作流"。通过引入人类反馈节点,我们可以在 AI 工作流中实现以下功能:
- 流程控制:人类可以决定工作流的走向,实现更灵活的业务逻辑。
- 质量保证:在关键节点引入人类审核,确保 AI 输出的质量。
- 状态持久化:通过 MemorySaver 保存工作流状态,支持中断和恢复。
- 流式处理:所有节点都支持流式输出,提供更好的用户体验。
扩展建议
- 多人类节点:可以在工作流中添加多个人类反馈节点,实现更复杂的人机协作场景。
- 并行处理:设计并行节点,同时执行多个任务,然后由人类节点决定如何合并结果。
- 条件分支:增加更复杂的条件逻辑,根据不同的反馈内容走向不同的分支。
- 集成外部系统:将人类反馈与外部系统(如审批系统、工单系统)集成,实现完整的业务流程。
- 可视化界面:开发一个前端界面,提供更友好的方式供用户提供反馈和查看工作流状态。