当前位置: 首页 > news >正文

Spring AI Alibaba(2)——通过Graph实现工作流

Spring AI Alibaba Graph介绍

基于 Spring AI Alibaba Graph 进行工作流的开发。工作流是以相对固化的模式来人为地拆解任务,将一个大任务拆解为包含多个分支的固化流程。工作流的优势是确定性强,模型作为流程中的一个节点起到的更多是一个分类决策、内容生成的职责,因此它更适合意图识别等类别属性强的应用场景。

Spring AI Alibaba Graph 在设计理念上借鉴 LangGraph,并且增加了大量预置 Node、简化了 State 定义过程等,让开发者更容易编写对等低代码平台的工作流、多智能体等。

Graph框架核心概念包括:StateGraph(状态图,用于定义节点和边)、Node(节点,封装具体操作或模型调用)、Edge(边,表示节点间的跳转关系)以及 OverAllState(全局状态,贯穿流程共享数据)。

 本文参考官方文档:工作流-阿里云Spring AI Alibaba官网官网,演示工作流的开发。

 商品评价分类系统的流程

(1)根据评价内容,通过feedback_classifier,确定是正面(positive)还是负面(negative)评价。

(2)如果是正面评价,结束。

(3)如果是负面评价,执行specific_question_classifier,对负面评价进行分类,包括售后(after-sale)、质量(quality)、运输(transportation)、其他(others)。

(4)确定分类后,结束。

代码实现

导入jar

        <!-- 引入 Graph 核心依赖 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-graph-core</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version></dependency>

yml配置

spring:ai:dashscope:api-key: 申请的keychat:options:model: qwen-maxlogging:level:org.springframework.ai.chat.client.advisor: DEBUG

配置类

package com.qfedu.springaialibaba.config;import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.OverAllStateFactory;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.action.EdgeAction;
import com.alibaba.cloud.ai.graph.exception.GraphStateException;
import com.alibaba.cloud.ai.graph.node.QuestionClassifierNode;
import com.alibaba.cloud.ai.graph.state.strategy.ReplaceStrategy;
import com.qfedu.springaialibaba.workflow.RecordingNode;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.advisor.SimpleLoggerAdvisor;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.List;
import java.util.Map;import static com.alibaba.cloud.ai.graph.StateGraph.END;
import static com.alibaba.cloud.ai.graph.StateGraph.START;
import static com.alibaba.cloud.ai.graph.action.AsyncEdgeAction.edge_async;
import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;@Slf4j
@Configuration
public class AiConfig {@Resourceprivate ChatModel chatModel;@Beanpublic ChatClient chatClient() {ChatClient chatClient = ChatClient.builder(chatModel).defaultAdvisors(new SimpleLoggerAdvisor()).build();return chatClient;}@Beanpublic StateGraph workflowGraph() throws GraphStateException {// 用于在每次执行工作流时创建初始的全局状态对象。通过注册若干 Key 及其更新策略来管理上下文数据// 注册了三个状态键:input(输入文本)、classifier_output(分类结果)和 solution(最终处理结论),// 均采用 ReplaceStrategy(每次写入替换旧值)OverAllStateFactory stateFactory = () -> {OverAllState state = new OverAllState();state.registerKeyAndStrategy("input", new ReplaceStrategy());state.registerKeyAndStrategy("classifier_output", new ReplaceStrategy());state.registerKeyAndStrategy("solution", new ReplaceStrategy());return state;};// 定义节点 (Node):创建工作流中的核心节点,包括两个文本分类节点和一个记录节点// QuestionClassifierNode 类用于文本分类任务// 评价正负分类节点,判断用户反馈是正面还是负面。// 它利用 LLM 对输入文本(存储在 "input" 键)进行语义理解,并输出类别结果// 分类结果会写入全局状态的 "classifier_output" 键QuestionClassifierNode feedbackClassifier = QuestionClassifierNode.builder().chatClient(chatClient()).inputTextKey("input").categories(List.of("正面反馈", "负面反馈"))  // 分类内容.classificationInstructions( // 分类的说明List.of("尝试理解用户提供反馈时的感受")).build();// 负面评价具体问题分类节点// 在检测到反馈为负面时被执行。它会根据负面反馈的内容,将问题归类为 售后服务、运输物流、产品质量 或 其他 四种类型之一。// 这个节点复用了输入文本 "input",并将更具体的分类结果写入 "classifier_output"QuestionClassifierNode specificQuestionClassifier = QuestionClassifierNode.builder().chatClient(chatClient()).inputTextKey("input").categories(List.of("售后服务", "运输", "产品质量", "其他")).classificationInstructions(List.of("客户试图从我们这里获得什么样的服务或帮助?根据您的理解对问题进行分类")).build();// 记录结果的节点// 按需自定义的 NodeAction,实现对最终结果的记录和决策RecordingNode recorderNode = new RecordingNode();// 使用 StateGraph 定义工作流StateGraph graph = new StateGraph("Consumer Service Workflow Demo", stateFactory)// 定义节点.addNode("feedback_classifier", node_async(feedbackClassifier)).addNode("specific_question_classifier", node_async(specificQuestionClassifier)).addNode("recorder", node_async(recorderNode))// 定义边(流程顺序),本例表示start节点到feedback_classifier节点的边// 第一个参数表示源节点,第二个参数表示目的节点.addEdge(START, "feedback_classifier")  // 起始节点// 定义带条件的边,本例中,.addConditionalEdges("feedback_classifier",// 边的调度逻辑edge_async(new FeedbackQuestionDispatcher()),// 不同的调度结果,走不同的边,跳转到不同的节点// 本例中,正面的结果调转到recorder节点;负面的结果跳转到specific_question_classifier节点Map.of("正面的", "recorder", "负面的", "specific_question_classifier")).addConditionalEdges("specific_question_classifier",edge_async(new SpecificQuestionDispatcher()),Map.of("售后相关", "recorder", "运输相关", "recorder","质量相关", "recorder", "其他的", "recorder")).addEdge("recorder", END);  // 结束节点return graph;}// 两个分类节点之间的转接逻辑由 FeedbackQuestionDispatcher 和 SpecificQuestionDispatcher 来完成// 作用是在节点执行完后读取全局状态,决定下一步该走哪条边public class FeedbackQuestionDispatcher implements EdgeAction {@Overridepublic String apply(OverAllState state) throws Exception {String classifierOutput = (String) state.value("classifier_output").orElse("");// log.info("classifierOutput: {}", classifierOutput);// 如果反馈结果包含“正面”,走"正面的"对应的边// 不同的边跳转到哪个节点,由StateGraph的addConditionalEdges方法决定if (classifierOutput.contains("正面")) {return "正面的";}return "负面的";}}public class SpecificQuestionDispatcher implements EdgeAction {@Overridepublic String apply(OverAllState state) throws Exception {String classifierOutput = (String) state.value("classifier_output").orElse("");// log.info("classifierOutput: {}", classifierOutput);Map<String, String> classifierMap = new HashMap<>();classifierMap.put("售后", "售后相关");classifierMap.put("质量", "质量相关");classifierMap.put("运输", "运输相关");for (Map.Entry<String, String> entry : classifierMap.entrySet()) {if (classifierOutput.contains(entry.getKey())) {return entry.getValue();}}return "其他的";}}}

注意:重点看workflowGraph()方法

该方法中我们定义了三个状态键,用于存储上下文的数据;

定义了feedbackClassifier节点,用于对商品的评论进行正面反馈或者负面反馈的分类;

定义了specificQuestionClassifier节点,用于对负面反馈的评论,进行进一步分类,确定反馈的评论是"售后服务"、"运输"、 "产品质量"或者"其他"中的哪种问题;

定义了recorderNode,对最终的反馈结果进行决策;

StateGraph对象,用于描述商品评价系统流程图的各个节点的关系和节点间流转条件

记录节点的类

package com.qfedu.springaialibaba.workflow;import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.HashMap;
import java.util.Map;public class RecordingNode implements NodeAction {private static final Logger logger = LoggerFactory.getLogger(RecordingNode.class);// 读取全局状态中的 "classifier_output" 字段值,// 根据不同的值,向solution键写入不同的数据@Overridepublic Map<String, Object> apply(OverAllState state) throws Exception {String feedback = state.value("classifier_output").get().toString();Map<String, Object> updatedState = new HashMap<>();if (feedback.contains("正面")) {// logger.info("Received positive feedback: {}", feedback);updatedState.put("solution", "正面评论,不用进一步处理");} else {// logger.info("Received negative feedback: {}", feedback);updatedState.put("solution", feedback);}return updatedState;}}

控制器类

package com.qfedu.springaialibaba.workflow;import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.exception.GraphStateException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Map;
import java.util.Optional;@RestController
@RequestMapping("/graph")
public class GraphController {private CompiledGraph compiledGraph;public GraphController(@Qualifier("workflowGraph") StateGraph stateGraph) throws GraphStateException {this.compiledGraph = stateGraph.compile();}@GetMapping("/chat")public String simpleChat(@RequestParam("query") String query) throws Exception {Optional<OverAllState> input = compiledGraph.invoke(Map.of("input", query));OverAllState overAllState = input.get();return overAllState.value("solution").toString();}}

正面的反馈

负面的反馈

通过测试,执行结果符合预期

http://www.dtcms.com/a/272585.html

相关文章:

  • Flutter 与 Android 的互通几种方式
  • Linux 中 sed 命令
  • RedisJSON 路径语法深度解析与实战
  • Spring Boot + Javacv-platform:解锁音视频处理的多元场景
  • 【TCP/IP】12. 文件传输协议
  • MySQL索引操作全指南:创建、查看、优化
  • Debian-10编译安装Mysql-5.7.44 笔记250706
  • macOS 上安装 Miniconda + Conda-Forge
  • Jekyll + Chirpy + GitHub Pages 搭建博客
  • 如何使用Java WebSocket API实现客户端和服务器端的通信?
  • 蓝桥杯第十六届(2025)真题深度解析:思路复盘与代码实战
  • MinerU将PDF转成md文件,并分拣图片
  • Alibaba Druid主要配置
  • 图片合并pdf
  • 新手向:实现ATM模拟系统
  • TDengine 数据库建模最佳实践
  • Oracle 视图
  • Tomcat:Java Web应用的幕后英雄
  • 线性探针是什么:是一种用于探测神经网络中特定特征的工具
  • 从零开始搭建深度学习大厦系列-3.卷积神经网络基础(5-9)
  • 李宏毅(深度学习)--(2)
  • 数据库复合索引设计:为什么等值查询列应该放在范围查询列前面?
  • 区间动态规划详解
  • 【JMeter】跨线程组传递参数
  • 在Docker中运行macOS的超方便体验!
  • SpringAI×Ollama:Java生态无缝集成本地大模型实践指南
  • Redis数据库基础概述
  • 8.2.3希尔排序
  • Spring for Apache Pulsar->Reactive Support->Message Production
  • KV Cache原理详解 + 代码理解