智能化问题分析(Cherry Stdio+ MCP)
背景
当前系统日志分散在Kibana和SkyWalking中,故障排查需要人工关联分析,效率低下,需要建立相关的智能体进行快速定位。
涉及开发语言
java 、python
业务目标
- 实现跨系统的日志追踪关联分析
- 将平均故障定位时间(MTTD)缩短40%
- 建立智能化的根因分析能力
分层架构图
功能流程图
工具:Cherry Stdio 工具作为MCP的客户端。
整体项目步骤介绍
1,了解整体项目的路由策略,目的后续根据url需要查询出所有的代码,例如controller->service->dao
2,将工程代码进行解析,目的了解项目的结构和为库系统做数据准备
3, 了解kinbana 和skywalking的数据存储位置,需要获取相应的链接。
4,涉及到两个项目,一个是java项目(analyseProjectCode),一个是python项目(logAnalyse-MCP)
`analyseProjectCode` 是一个 Java 项目分析工具,能够扫描和解析 Java 源代码及 MyBatis XML 映射文件,并将分析结果存储到数据库中。它还支持建立方法之间的调用关系。
logAnalyse-MCP是MCP Server 端,向LLM提供工具
项目整体演示
1,将工程数据利用java项目进行项目分析
2,启动MCP Server端
3,打开cherry stdio 客户端,并添加MCP服务器
4,根据场景进行分析
1,错误日志分析
在kinbana上获取到traceId,前提是日志中有traceId。就不截图了。
19f0ab40118e4c198cff8aa2d729fe32.1838.17515966802584869
直接发送给LLM,【这是我的traceId19f0ab40118e4c198cff8aa2d729fe32.1838.17515966802584869 帮我分析错误】
后续的建议修复都会有~~
2,性能日志分析
在skywalking 上获取到耗时最长的接口的traceId
764ffdf3596445ea9d21e44c1f913c1e.303.17534333595660373
向LLM发送【请帮分析一下764ffdf3596445ea9d21e44c1f913c1e.303.17534333595660373 性能报告】
跨系统的性能排查:这个接口是跨系统的。
其他的性能排查:
python核心logAnalyseHttp.py 代码:
from __future__ import annotations
import argparse
import asyncio
import json
from typing import Any, AsyncIterator, Callable
import httpx
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponseimport logAnalyse2# ---------------------------------------------------------------------------
# Server constants
# ---------------------------------------------------------------------------
SERVER_NAME = "LogAnalyse"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2024-11-05"# ---------------------------------------------------------------------------
# FastAPI app and tools registry
# ---------------------------------------------------------------------------
app = FastAPI(title="LogAnalyse HTTP-Stream v8")# ---------------------------------------------------------------------------
# Dummy tool implementations (to be replaced with actual logic)
# ---------------------------------------------------------------------------
async def query_error_logs(trace_id: str) -> dict:print(f"traceId:{trace_id}")result = await logAnalyse2.query_error_logs(trace_id)print(result)return resultasync def query_skywalking_spans(trace_id: str) -> dict:return await logAnalyse2.query_skywalking_spans(trace_id)
async def query_common_code(project_code: str = None, humanFlag: str = None) -> dict:return await logAnalyse2.query_common_code(project_code, humanFlag)async def query_code_by_className_method(project_code: str = None, fullClassName_method: str = None) -> dict:return await logAnalyse2.query_code_by_className_method(project_code, fullClassName_method)async def query_table_create_sql(dbName: str = None) -> dict:return await logAnalyse2.query_table_create_sql(dbName)async def query_url_code(project_code: str = None, url: str = None, http_method: str = None) -> dict:return await logAnalyse2.query_url_code(project_code, url, http_method)# Tool registry for dispatch
TOOL_FUNCTIONS: dict[str, Callable[..., Any]] = {"query_error_logs": query_error_logs,"query_skywalking_spans": query_skywalking_spans,"query_common_code": query_common_code,"query_code_by_className_method": query_code_by_className_method,"query_table_create_sql": query_table_create_sql,"query_url_code": query_url_code
}TOOLS_REGISTRY = {"tools": [{"name": "query_error_logs","description": "通过 trace_id 获取错误日志信息,主要用途是收集错误日志。","inputSchema": {"type": "object","properties": {"trace_id": {"type": "string","description": "SkyWalking 的 trace_id"}},"required": ["trace_id"]}},{"name": "query_skywalking_spans","description": "通过 trace_id 查询 SkyWalking GraphQL 接口,返回调用链中的数据,主要用途是做性能分析。","inputSchema": {"type": "object","properties": {"trace_id": {"type": "string","description": "SkyWalking 的 trace_id"}},"required": ["trace_id"]}},{"name": "query_common_code","description": "如果错误日志中检查到使用 Feign 拦截器或异步线程池调用(Async),则传入 FeignConfig 或 AsyncConfig 获取相关代码实现。","inputSchema": {"type": "object","properties": {"project_code": {"type": "string","description": "项目编号"},"humanFlag": {"type": "string","description": "类名,如 FeignConfig 或 AsyncConfig"}},"required": ["humanFlag"]}},{"name": "query_code_by_className_method","description": "根据类全路径和方法名查询方法实现代码,并递归查询其调用的方法实现。","inputSchema": {"type": "object","properties": {"project_code": {"type": "string","description": "项目编号"},"fullClassName_method": {"type": "string","description": "类全路径与方法名,格式如 com.xxx.UserService.findById"}},"required": ["fullClassName_method"]}},{"name": "query_table_create_sql","description": "根据数据库名称查询所有表的建表语句。","inputSchema": {"type": "object","properties": {"dbName": {"type": "string","description": "数据库名称"}},"required": ["dbName"]}},{"name": "query_url_code","description": "根据项目编号、URL 和 HTTP 方法查询接口实现代码路径信息。","inputSchema": {"type": "object","properties": {"project_code": {"type": "string","description": "项目编号"},"url": {"type": "string","description": "请求 URL,如 /v1/jyht/cad/anno/modify/users"},"http_method": {"type": "string","description": "HTTP 方法,如 POST、GET"}},"required": ["url", "http_method"]}}],"nextCursor": "qinqing"
}@app.get("/mcp")
async def mcp_initialize_via_get():return {"jsonrpc": "2.0","id": 0,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True, "tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},"instructions": "Use the tool to query logs, spans, or code."}}@app.post("/mcp")
async def mcp_endpoint(request: Request):try:body = await request.json()print("💡 收到请求:", json.dumps(body, ensure_ascii=False, indent=2))except Exception:return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700,"message": "Parse error"}}req_id = body.get("id", 1)method = body.get("method")print(f"🔧 方法: {method}")if method == "notifications/initialized":return Response(status_code=status.HTTP_204_NO_CONTENT)if method is None:return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP server online."}}if method == "initialize":return {"jsonrpc": "2.0","id": req_id,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True, "tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},"instructions": "Use the tool to query logs, spans, or code."}}if method == "tools/list":return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}if method == "tools/call":params = body.get("params", {})tool_name = params.get("name")args = params.get("arguments", {})if tool_name not in TOOL_FUNCTIONS:return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Tool '{tool_name}' not found"}}try:result = await TOOL_FUNCTIONS[tool_name](**args)return {"jsonrpc": "2.0","id": req_id,"result": {"content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],"isError": False}}except Exception as e:return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(e)}}return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "Method not found"}}# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:parser = argparse.ArgumentParser(description="LogAnalyse MCP HTTP-Stream v8")parser.add_argument("--host", default="127.0.0.1")parser.add_argument("--port", type=int, default=8000)args = parser.parse_args()import uvicornuvicorn.run(app, host=args.host, port=args.port, log_level="info")if __name__ == "__main__":main()
java 工程代码:analyseProjectCode
待完善的点:
- url 根据通配符进行匹配