当前位置: 首页 > news >正文

507-Spring AI Alibaba Graph Human Node 功能完整案例

本案例基于 Spring AI Alibaba Graph 框架实现,演示了如何在 AI 工作流中引入人类反馈节点,实现人机协作的工作流程。在实际业务场景中,经常会遇到需要人类介入的场景,人类的不同操作将影响工作流不同的走向。

1. 案例目标

我们将创建一个包含三个核心节点的 Web 应用:

  1. 扩展节点 (expander):AI 模型流式对问题进行扩展输出,生成多个不同版本的查询变体。
  2. 人类反馈节点 (human_feedback):通过对用户的反馈,决定是直接结束工作流,还是接着执行翻译节点。
  3. 翻译节点 (translate):将问题翻译为英文或其他指定语言。

2. 技术栈与核心依赖

  • Spring Boot 3.x
  • Spring AI Alibaba (用于对接阿里云 DashScope 通义大模型)
  • Spring AI Alibaba Graph (用于构建 AI 工作流)
  • Maven (项目构建工具)

在 pom.xml 中,你需要引入以下核心依赖:

<dependencies><!-- Spring AI Alibaba 核心启动器,集成 DashScope --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter-dashscope</artifactId></dependency><!-- Spring AI Chat 客户端自动配置 --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-autoconfigure-model-chat-client</artifactId></dependency><!-- Spring AI Alibaba Graph 核心库 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-graph-core</artifactId><version>1.0.0.3</version></dependency><!-- Spring Web 用于构建 RESTful API --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

3. 项目配置

在 src/main/resources/application.yml 文件中,配置你的 DashScope API Key 和模型选项。

server:port: 8080
spring:application:name: human-nodeai:dashscope:api-key: ${AI_DASHSCOPE_API_KEY} # 建议使用环境变量,更安全chat:options:model: qwen-max

重要提示:请将 AI_DASHSCOPE_API_KEY 环境变量设置为你从阿里云获取的有效 API Key。你也可以直接将其写在配置文件中,但这不推荐用于生产环境。

4. 工作流设计

本案例实现了一个简单的人机协作工作流,包含三个节点,流程如下:

  1. 从 START 开始,首先进入 expander 节点
  2. expander 节点执行完毕后,进入 human_feedback 节点
  3. 在 human_feedback 节点,根据用户反馈决定下一步:
    • 如果反馈为 true,则进入 translate 节点
    • 如果反馈为 false,则直接进入 END 结束流程
  4. translate 节点执行完毕后,进入 END 结束流程

4.1 工作流配置

在 GraphHumanConfiguration.java 中,我们定义了整个工作流的结构和节点之间的关系:

@Bean
public StateGraph humanGraph(ChatClient.Builder chatClientBuilder) throws GraphStateException {KeyStrategyFactory keyStrategyFactory = () -> {HashMap<String, KeyStrategy> keyStrategyHashMap = new HashMap<>();// 用户输入keyStrategyHashMap.put("query", new ReplaceStrategy());keyStrategyHashMap.put("thread_id", new ReplaceStrategy());keyStrategyHashMap.put("expander_number", new ReplaceStrategy());keyStrategyHashMap.put("expander_content", new ReplaceStrategy());// 人类反馈keyStrategyHashMap.put("feed_back", new ReplaceStrategy());keyStrategyHashMap.put("human_next_node", new ReplaceStrategy());// 是否需要翻译keyStrategyHashMap.put("translate_language", new ReplaceStrategy());keyStrategyHashMap.put("translate_content", new ReplaceStrategy());return keyStrategyHashMap;};StateGraph stateGraph = new StateGraph(keyStrategyFactory).addNode("expander", node_async(new ExpanderNode(chatClientBuilder))).addNode("translate", node_async(new TranslateNode(chatClientBuilder))).addNode("human_feedback", node_async(new HumanFeedbackNode())).addEdge(StateGraph.START, "expander").addEdge("expander", "human_feedback").addConditionalEdges("human_feedback", AsyncEdgeAction.edge_async((new HumanFeedbackDispatcher())), Map.of("translate", "translate", StateGraph.END, StateGraph.END)).addEdge("translate", StateGraph.END);return stateGraph;
}

5. 节点实现

5.1 扩展节点 (ExpanderNode)

扩展节点接收用户查询,并使用 AI 模型生成多个不同版本的查询变体:

public class ExpanderNode implements NodeAction {private static final PromptTemplate DEFAULT_PROMPT_TEMPLATE = new PromptTemplate("You are an expert at information retrieval and search optimization.\n" +"Your task is to generate {number} different versions of the given query.\n\n" +"Each variant must cover different perspectives or aspects of the topic,\n" +"while maintaining the core intent of the original query. The goal is to\n" +"expand the search space and improve the chances of finding relevant information.\n\n" +"Do not explain your choices or add any other text.\n" +"Provide the query variants separated by newlines.\n\n" +"Original query: {query}\n\n" +"Query variants:\n");private final ChatClient chatClient;public ExpanderNode(ChatClient.Builder chatClientBuilder) {this.chatClient = chatClientBuilder.build();}@Overridepublic Map<String, Object> apply(OverAllState state) {String query = state.value("query", "");Integer expanderNumber = state.value("expander_number", this.NUMBER);Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("number", expanderNumber).param("query", query)).stream().chatResponse();AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder().startingNode("expander_llm_stream").startingState(state).mapResult(response -> {String text = response.getResult().getOutput().getText();List<String> queryVariants = Arrays.asList(text.split("\n"));return Map.of("expander_content", queryVariants);}).build(chatResponseFlux);return Map.of("expander_content", generator);}
}

5.2 人类反馈节点 (HumanFeedbackNode)

人类反馈节点根据用户输入的反馈,决定工作流的下一步走向:

public class HumanFeedbackNode implements NodeAction {@Overridepublic Map<String, Object> apply(OverAllState state) {HashMap<String, Object> resultMap = new HashMap<>();String nextStep = StateGraph.END;Map<String, Object> feedBackData = state.humanFeedback().data();boolean feedback = (boolean) feedBackData.getOrDefault("feed_back", true);if (feedback) {nextStep = "translate";}resultMap.put("human_next_node", nextStep);return resultMap;}
}

5.3 翻译节点 (TranslateNode)

翻译节点将用户查询翻译为指定的目标语言(默认为英文):

public class TranslateNode implements NodeAction {private static final PromptTemplate DEFAULT_PROMPT_TEMPLATE = new PromptTemplate("Given a user query, translate it to {targetLanguage}.\n" +"If the query is already in {targetLanguage}, return it unchanged.\n" +"If you don't know the language of the query, return it unchanged.\n" +"Do not add explanations nor any other text.\n\n" +"Original query: {query}\n\n" +"Translated query:\n");private final ChatClient chatClient;private final String TARGET_LANGUAGE = "English";public TranslateNode(ChatClient.Builder chatClientBuilder) {this.chatClient = chatClientBuilder.build();}@Overridepublic Map<String, Object> apply(OverAllState state) {String query = state.value("query", "");String targetLanguage = state.value("translate_language", TARGET_LANGUAGE);Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("targetLanguage", targetLanguage).param("query", query)).stream().chatResponse();AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder().startingNode("translate_llm_stream").startingState(state).mapResult(response -> {String text = response.getResult().getOutput().getText();List<String> queryVariants = Arrays.asList(text.split("\n"));return Map.of("translate_content", queryVariants);}).build(chatResponseFlux);return Map.of("translate_content", generator);}
}

6. 控制器实现

6.1 GraphHumanController

控制器提供了两个主要的 API 端点,用于启动和恢复工作流:

@RestController
@RequestMapping("/graph/human")
public class GraphHumanController {private final CompiledGraph compiledGraph;@Autowiredpublic GraphHumanController(@Qualifier("humanGraph") StateGraph stateGraph) throws GraphStateException {SaverConfig saverConfig = SaverConfig.builder().register(SaverConstant.MEMORY, new MemorySaver()).build();this.compiledGraph = stateGraph.compile(CompileConfig.builder().saverConfig(saverConfig).interruptBefore("human_feedback").build());}// 启动工作流@GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,@RequestParam(value = "expander_number", defaultValue = "3", required = false) Integer expanderNumber,@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId) throws GraphRunnerException {RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();Map<String, Object> objectMap = new HashMap<>();objectMap.put("query", query);objectMap.put("expander_number", expanderNumber);GraphProcess graphProcess = new GraphProcess(this.compiledGraph);Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();AsyncGenerator<NodeOutput> resultFuture = compiledGraph.stream(objectMap, runnableConfig);graphProcess.processStream(resultFuture, sink);return sink.asFlux().doOnCancel(() -> logger.info("Client disconnected from stream")).doOnError(e -> logger.error("Error occurred during streaming", e));}// 恢复工作流@GetMapping(value = "/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> resume(@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId,@RequestParam(value = "feed_back", defaultValue = "true", required = false) boolean feedBack) throws GraphRunnerException {RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();StateSnapshot stateSnapshot = this.compiledGraph.getState(runnableConfig);OverAllState state = stateSnapshot.state();state.withResume();Map<String, Object> objectMap = new HashMap<>();objectMap.put("feed_back", feedBack);state.withHumanFeedback(new OverAllState.HumanFeedback(objectMap, ""));Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();GraphProcess graphProcess = new GraphProcess(this.compiledGraph);AsyncGenerator<NodeOutput> resultFuture = compiledGraph.streamFromInitialNode(state, runnableConfig);graphProcess.processStream(resultFuture, sink);return sink.asFlux().doOnCancel(() -> logger.info("Client disconnected from stream")).doOnError(e -> logger.error("Error occurred during streaming", e));}
}

7. 运行与测试

  1. 启动应用:运行你的 Spring Boot 主程序。
  2. 使用浏览器或 API 工具(如 Postman, curl)进行测试

测试 1:启动工作流

访问以下 URL,启动工作流,让 AI 扩展查询:

http://localhost:8080/graph/human/expand?query=什么是人工智能?&expander_number=3&thread_id=test123

预期响应(流式输出)

工作流将首先执行扩展节点,生成多个不同版本的查询变体,然后在人类反馈节点处暂停,等待用户反馈。

测试 2:提供反馈并继续执行翻译节点

访问以下 URL,提供正向反馈(feed_back=true),继续执行翻译节点:

http://localhost:8080/graph/human/resume?thread_id=test123&feed_back=true

预期响应(流式输出)

工作流将从人类反馈节点继续执行,进入翻译节点,将原始查询翻译为英文。

测试 3:提供反馈并结束工作流

访问以下 URL,提供负向反馈(feed_back=false),直接结束工作流:

http://localhost:8080/graph/human/resume?thread_id=test123&feed_back=false

预期响应(流式输出)

工作流将从人类反馈节点直接进入结束状态,不执行翻译节点。

8. 实现思路与扩展建议

实现思路

本案例的核心思想是"人机协作的工作流"。通过引入人类反馈节点,我们可以在 AI 工作流中实现以下功能:

  • 流程控制:人类可以决定工作流的走向,实现更灵活的业务逻辑。
  • 质量保证:在关键节点引入人类审核,确保 AI 输出的质量。
  • 状态持久化:通过 MemorySaver 保存工作流状态,支持中断和恢复。
  • 流式处理:所有节点都支持流式输出,提供更好的用户体验。

扩展建议

  • 多人类节点:可以在工作流中添加多个人类反馈节点,实现更复杂的人机协作场景。
  • 并行处理:设计并行节点,同时执行多个任务,然后由人类节点决定如何合并结果。
  • 条件分支:增加更复杂的条件逻辑,根据不同的反馈内容走向不同的分支。
  • 集成外部系统:将人类反馈与外部系统(如审批系统、工单系统)集成,实现完整的业务流程。
  • 可视化界面:开发一个前端界面,提供更友好的方式供用户提供反馈和查看工作流状态。
http://www.dtcms.com/a/557743.html

相关文章:

  • 遥感生态指数(RSEI):理论发展、方法论争与实践进展
  • cjson 的资源释放函数
  • 第6讲:常用基础与布局Widget(一):Container, Row, Column
  • 什么是网站建设塑业东莞网站建设
  • 小企业网站建设哪里做得好深圳网站搭建
  • 婚恋网站策划页面设计好吗
  • 被禁止访问网站怎么办做招聘网站的怎么引流求职者
  • 【架构艺术】自动化测试平台架构设计的一些通用要点
  • 一个做网站的公司年收入宁波最好的推广平台
  • 建设网站0基础需要学什么海口网站建设维护
  • 农产品销售系统|农产品电商|基于SprinBoot+vue的农产品销售系统(源码+数据库+文档)
  • RAG的17种方式实现方式研究
  • 做时间轴的在线网站对网站建设的调研报告
  • 做受网站在网站上做封面
  • 网站推广优化排名seo网站模板网站
  • 有没有专做游戏脚本的网站制造业营销外贸网站建设
  • 电子商务网站开发实践品牌营销的四大策略
  • 量子机器学习框架设计:基于Cirq的变分量子算法实现
  • 怎么查看网站快照现在建站好么
  • 511-Spring AI Alibaba Graph 智能写作助手示例
  • 在局域网建设网站wordpress 旧文章 最新
  • 俄罗斯全面禁止汽油出口对俄、欧、中能源市场的多维影响分析
  • 公司网站建设费用明细表郑州嵌入式培训机构
  • 电力建设网站进不去青岛市黄岛区城市建设局网站
  • 网站赏析案例wordpress get_categories
  • 李秀满携手国际版权机构推进AI音乐公正机制:「创作者应获得合理回报」
  • 1.2 redis7.0.4安装与配置开机自启动
  • 一流的哈尔滨网站建设网站开发概要设计模板
  • 南头专业企业网站建设公司深圳东门新冠
  • 湛江网站开发公司在线网站建设活动