404-Spring AI Alibaba Graph 可观测性 Langfuse 功能完整案例

本案例将引导您一步步构建一个 Spring Boot 应用,演示如何利用 Spring AI Alibaba Graph 与 Langfuse 集成,实现 AI 图应用的全面可观测性和监控。
1. 案例目标
我们将创建一个包含复杂图结构的 Web 应用,展示以下核心功能:
- 复杂图结构处理:包含并行边、串行边、子图等多种节点类型和边类型。
- 实时流式 AI 响应:通过 StreamingChatNode 实现实时流式 AI 响应。
- Langfuse 可观测性集成:完整集成 Langfuse,实现 AI 应用的全面可观测性。
- OpenTelemetry 追踪和指标收集:通过 OpenTelemetry 收集追踪和指标数据。
- SSE 实时更新支持:支持 Server-Sent Events (SSE) 实时更新。
2. 技术栈与核心依赖
- Spring Boot 3.x
- Spring AI Alibaba (用于对接阿里云 DashScope 通义大模型)
- Spring AI Alibaba Graph (用于复杂图结构的 AI 处理流程)
- Langfuse (用于 AI 可观测性、追踪和分析)
- OpenTelemetry (用于追踪和指标收集)
- Maven (项目构建工具)
在 pom.xml 中,你需要引入以下核心依赖:
<dependencies><!-- Spring AI Alibaba 核心启动器,集成 DashScope --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter-dashscope</artifactId><version>1.0.0.3</version></dependency><!-- Spring AI Alibaba Graph 可观测性启动器 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter-graph-observation</artifactId><version>1.0.0.3</version></dependency><!-- Spring WebFlux 用于响应式编程和 SSE 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- HTTP 客户端 --><dependency><groupId>org.apache.httpcomponents.client5</groupId><artifactId>httpclient5</artifactId><version>5.4.1</version></dependency><!-- OpenTelemetry Spring Boot 启动器 --><dependency><groupId>io.opentelemetry.instrumentation</groupId><artifactId>opentelemetry-spring-boot-starter</artifactId><version>2.9.0</version></dependency><!-- Micrometer Tracing Bridge OpenTelemetry --><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-tracing-bridge-otel</artifactId><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><!-- OpenTelemetry OTLP 导出器 --><dependency><groupId>io.opentelemetry</groupId><artifactId>opentelemetry-exporter-otlp</artifactId></dependency><!-- Spring Boot Actuator 用于健康检查和监控 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
</dependencies>
3. 项目配置
在 src/main/resources/application.yml 文件中,配置你的 DashScope API Key 和 Langfuse 连接信息。
server:port: 8080
spring:application:name: graph-observation-langfuseai:dashscope:api-key: ${AI_DASHSCOPE_API_KEY}chat:options:model: qwen-maxalibaba:graph:observation:enabled: truemanagement:endpoints:web:exposure:include: "*"endpoint:health:# health status check with detailed messagesshow-details: alwaystracing:sampling:# trace information with every requestprobability: 1.0observations:annotations:enabled: trueotel:service:name: spring-ai-alibaba-graph-langfuseresource:attributes:deployment.environment: development# configure exportertraces:exporter: otlpsampler: always_onmetrics:exporter: otlp# logs exportation inhibited for langfuse currently cannot supportlogs:exporter: noneexporter:otlp:endpoint: "https://cloud.langfuse.com/api/public/otel"headers:Authorization: "Basic ${YOUR_BASE64_ENCODED_CREDENTIALS}"protocol: http/protobuf
重要提示:
- 请将
AI_DASHSCOPE_API_KEY环境变量设置为你从阿里云获取的有效 API Key。 - 请将
YOUR_BASE64_ENCODED_CREDENTIALS替换为你的 Base64 编码的 Langfuse 凭据(格式:public_key:secret_key)。
4. Langfuse 配置
4.1 使用 Langfuse 云端服务
- 在 https://cloud.langfuse.com 注册账户
- 创建新项目
- 导航到 Settings → API Keys
- 生成新的 API 密钥对(公钥和私钥)
- 将凭据编码为 Base64:
# Linux/Mac
echo -n "public_key:secret_key" | base64# Windows PowerShell
[System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("public_key:secret_key"))
4.2 使用自托管 Langfuse
- 使用 Docker 部署 Langfuse:
docker compose up -d
- 访问
http://localhost:3000 - 创建项目并生成 API 密钥
- 更新
application.yml中的端点
5. 图结构设计
本示例实现了一个复杂的图结构,包含多种节点类型和边类型:
开始节点 → 并行处理 → 子图处理 → 流式节点 → 汇总节点 → 结束节点
5.1 节点类型
- StartNode:初始处理节点
- ParallelNode1/ParallelNode2:并行处理节点(情感分析和主题分析)
- MergeNode:合并节点,将并行输出合并为子图输入
- SimpleSubGraph:子图节点,包含内部串行处理流程
- StreamingNode:流式节点,实时流式 AI 响应
- SummaryNode:汇总节点,聚合流式输出
- EndNode:结束节点,最终输出格式化
5.2 边类型
- 串行边:严格按顺序执行(如 START → start)
- 并行边:同时执行多个节点(如 start → parallel1 和 start → parallel2)
- 聚合边:等待多个节点完成后再执行下一个节点(如 parallel1/parallel2 → merge)
6. 编写 Java 代码
6.1 GraphConfiguration.java
图配置类,定义图结构和节点关系。
@Configuration
public class GraphConfiguration {@Beanpublic StateGraph observabilityGraph(ChatClient chatClient) throws GraphStateException {// Start node - initial processingChatNode startNode = ChatNode.create("StartNode", "input", "start_output", chatClient,"Please perform initial processing on the input content:");// Parallel nodes - concurrent processingChatNode parallelNode1 = ChatNode.create("ParallelNode1", "start_output", "parallel_output1", chatClient,"Please perform sentiment analysis on the content:");ChatNode parallelNode2 = ChatNode.create("ParallelNode2", "start_output", "parallel_output2", chatClient,"Please perform topic analysis on the content:");// Summary node - aggregates streaming outputChatNode summaryNode = ChatNode.create("SummaryNode", "streaming_output", "summary_output", chatClient,"Please summarize the streaming analysis results:");// Merge node - combine parallel outputs for subgraph inputMergeNode mergeNode = new MergeNode(Lists.newArrayList("parallel_output1", "parallel_output2"), "sub_input");// Streaming node - real-time AI responseStreamingChatNode streamingNode = StreamingChatNode.create("StreamingNode", "final_output", "streaming_output",chatClient, "Please perform detailed analysis on the subgraph results:");// End node - final output formattingChatNode endNode = ChatNode.create("EndNode", "summary_output", "end_output", chatClient,"Please format the final results for output:");// Create subgraphSimpleSubGraph subGraph = new SimpleSubGraph(chatClient);// Build the main graphStateGraph graph = new StateGraph(keyStrategyFactory).addNode("start", node_async(startNode)).addNode("parallel1", node_async(parallelNode1)).addNode("parallel2", node_async(parallelNode2)).addNode("merge", node_async(mergeNode)).addNode("subgraph", subGraph.subGraph()).addNode("streaming", node_async(streamingNode)).addNode("summary", node_async(summaryNode)).addNode("end", node_async(endNode)).addEdge(START, "start").addEdge("start", "parallel1").addEdge("start", "parallel2").addEdge("parallel1", "merge").addEdge("parallel2", "merge").addEdge("merge", "subgraph").addEdge("subgraph", "streaming").addEdge("streaming", "summary").addEdge("summary", "end").addEdge("end", END);return graph;}@Beanpublic CompiledGraph compiledGraph(StateGraph observabilityGraph, CompileConfig observationCompileConfig)throws GraphStateException {return observabilityGraph.compile(observationCompileConfig);}
}
6.2 GraphController.java
图控制器,提供同步图执行接口。
@RestController
@RequestMapping("/graph/observation")
public class GraphController {@Autowiredprivate CompiledGraph compiledGraph;@GetMapping("/execute")public Map<String, Object> execute(@RequestParam(value = "prompt", defaultValue = "Hello World") String input) {try {// Create initial stateMap<String, Object> initialState = new HashMap<>();initialState.put("input", input);// Execute graphOverAllState result = compiledGraph.invoke(initialState).get();// Return resultMap<String, Object> response = new HashMap<>();response.put("success", true);response.put("input", input);response.put("output", result.value("end_output").orElse("No output"));response.put("logs", result.value("logs").orElse("No logs"));return response;}catch (Exception e) {Map<String, Object> errorResponse = new HashMap<>();errorResponse.put("success", false);errorResponse.put("error", e.getMessage());return errorResponse;}}
}
6.3 GraphStreamController.java
图流控制器,提供流式图执行接口。
@RestController
@RequestMapping("/graph/observation")
public class GraphStreamController {@Autowiredprivate CompiledGraph compiledGraph;@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> stream(@RequestParam(value = "prompt", defaultValue = "Hello World") String input,@RequestParam(value = "thread_id", defaultValue = "observability", required = false) String threadId)throws GraphRunnerException {// Create runnable configurationRunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();// Create initial stateMap<String, Object> initialState = new HashMap<>();initialState.put("input", input);// Create graph processorGraphProcess graphProcess = new GraphProcess();// Get streaming outputAsyncGenerator<NodeOutput> resultStream = compiledGraph.stream(initialState, runnableConfig);// Process and return streaming outputreturn graphProcess.processStream(resultStream).doOnCancel(() -> logger.info("Client disconnected from streaming")).doOnError(e -> logger.error("Error occurred during streaming output", e)).doOnComplete(() -> logger.info("Streaming output completed"));}
}
6.4 SimpleSubGraph.java
子图实现,包含内部串行处理流程。
public class SimpleSubGraph implements SubGraphNode {private final ChatClient chatClient;public SimpleSubGraph(ChatClient chatClient) {this.chatClient = chatClient;this.subGraph = createSubGraph();}@Overridepublic String id() {return "simple_subgraph";}@Overridepublic StateGraph subGraph() {return this.subGraph;}private StateGraph createSubGraph() {try {// Create internal nodes for the subgraph (serial processing)ChatNode subNode1 = ChatNode.create("SubGraphNode1", "sub_input", "sub_output1", chatClient,"Please perform the first step processing on the following content:");ChatNode subNode2 = ChatNode.create("SubGraphNode2", "sub_output1", "sub_output2", chatClient,"Please perform the second step processing on the following content:");ChatNode subNode3 = ChatNode.create("SubGraphNode3", "sub_output2", "subgraph_final_output", chatClient,"Please perform the final processing on the following content:");// Build subgraph (pure serial structure)return new StateGraph("Simple SubGraph", keyStrategyFactory).addNode("sub_node1", node_async(subNode1)).addNode("sub_node2", node_async(subNode2)).addNode("sub_node3", node_async(subNode3)).addEdge(START, "sub_node1").addEdge("sub_node1", "sub_node2").addEdge("sub_node2", "sub_node3").addEdge("sub_node3", END);}catch (GraphStateException e) {throw new RuntimeException("Failed to create subgraph", e);}}
}
7. 运行与测试
- 设置环境变量:
# 必需:DashScope API 密钥
export AI_DASHSCOPE_API_KEY=your_dashscope_api_key
- 编译和运行:
mvn clean compile
mvn spring-boot:run
应用将在 http://localhost:8080 启动
测试 1:同步图执行
访问以下 URL,执行同步图处理:
curl "http://localhost:8080/graph/observation/execute?prompt=请分析这段文本:人工智能的发展"
预期响应:
{"success": true,"input": "请分析这段文本:人工智能的发展","output": "经过完整图处理的最终结果","logs": "执行日志信息"
}
测试 2:流式图执行
访问以下 URL,执行流式图处理:
curl "http://localhost:8080/graph/observation/stream?prompt=请分析这段文本:人工智能的发展&thread_id=demo"
预期响应(流式输出):
data: {"type":"node_output","node":"start","data":{...},"timestamp":...}
event: node_output
id: start_...data: {"type":"node_output","node":"parallel1","data":{...},"timestamp":...}
event: node_output
id: parallel1_...data: {"type":"node_output","node":"parallel2","data":{...},"timestamp":...}
event: node_output
id: parallel2_...data: {"type":"node_output","node":"merge","data":{...},"timestamp":...}
event: node_output
id: merge_...data: {"type":"node_output","node":"subgraph","data":{...},"timestamp":...}
event: node_output
id: subgraph_...data: {"type":"streaming","node":"streaming","chunk":"...","timestamp":...}
event: node_output
id: streaming_...data: {"type":"streaming","node":"streaming","chunk":"...","timestamp":...}
event: node_output
id: streaming_...data: {"type":"node_output","node":"summary","data":{...},"timestamp":...}
event: node_output
id: summary_...data: {"type":"node_output","node":"end","data":{...},"timestamp":...}
event: node_output
id: end_...data: {"type":"completed","message":"Graph processing completed"}
event: completed
8. Langfuse 可观测性集成
8.1 启用 Graph 可观测性
在 application.yml 中启用 Graph 可观测性:
spring:ai:alibaba:graph:observation:enabled: true
8.2 注入 observationCompileConfig
在图配置中注入 observationCompileConfig:
@Bean
public CompiledGraph compiledGraph(StateGraph observabilityGraph, CompileConfig observationCompileConfig)throws GraphStateException {return observabilityGraph.compile(observationCompileConfig);
}
8.3 查看 Langfuse 仪表板
- 登录 Langfuse 云端服务或本地部署的 Langfuse 实例
- 导航到 Traces 页面查看图执行追踪
- 导航到 Sessions 页面查看会话数据
- 导航到 Scores 页面查看评分数据
- 导航到 Analytics 页面查看分析报告
9. 实现思路与扩展建议
实现思路
本案例的核心思想是"复杂图结构与可观测性结合"。我们将:
- 复杂图结构:通过 Spring AI Alibaba Graph 实现复杂的图结构,包含并行边、串行边、子图等多种节点类型和边类型。
- 可观测性集成:通过 Langfuse 集成,实现 AI 应用的全面可观测性,包括追踪、指标和日志。
- 实时流式处理:通过 StreamingChatNode 实现实时流式 AI 响应,提供更好的用户体验。
- OpenTelemetry 集成:通过 OpenTelemetry 收集追踪和指标数据,实现与 Langfuse 的无缝集成。
扩展建议
- 自定义节点类型:根据业务需求,实现自定义节点类型,扩展图处理能力。
- 动态图结构:实现动态图结构,根据运行时条件动态调整图结构。
- 多模态处理:集成多模态处理能力,支持文本、图像、音频等多种输入类型。
- 高级可观测性:集成更多可观测性工具,如 Prometheus、Grafana 等,实现更全面的监控和分析。
- 分布式图处理:实现分布式图处理,支持大规模图计算任务。
- 图优化算法:集成图优化算法,提高图处理效率和性能。
