当前位置: 首页 > news >正文

企业级Agent智能体(智能小秘)之MCP服务认证实现

目录:

    • 一、整体架构概览
    • 二、客户端传 Token 的实现
      • 1. 客户端获取 Token(登录流程)
      • 2. 客户端调用 API 时携带 Token
    • 三、服务端校验 Token 的完整实现
      • 1. 服务端配置文件
      • 2. JWT 工具类
      • 3. 认证拦截器(Middleware)
    • 四、完整可运行的服务端代码
    • 五、客户端的完整实现
    • 七、完整流程总结
      • 1、客户端传 Token 的10个步骤:
      • 2、服务端校验 Token 的8个步骤:

一、整体架构概览

在这里插入图片描述

二、客户端传 Token 的实现

1. 客户端获取 Token(登录流程)

import requests
import jsonclass MCPClient:def __init__(self, base_url):self.base_url = base_urlself.token = Nonedef login(self, username: str, password: str) -> bool:"""第一步:客户端通过用户名密码登录,获取 JWT Token"""login_url = f"{self.base_url}/auth/login"login_data = {"username": username,"password": password}response = requests.post(login_url, json=login_data)if response.status_code == 200:result = response.json()self.token = result["access_token"]print(f"✅ 登录成功,用户: {username}")return Trueelse:print(f"❌ 登录失败: {response.text}")return False

2. 客户端调用 API 时携带 Token

class MCPClient:# ... 上面的初始化代码 ...def call_tool_with_auth(self, tool_name: str, params: dict):"""第二步:在所有需要认证的请求中携带 Token"""if not self.token:raise Exception("请先登录获取 Token")headers = {"Content-Type": "application/json","Authorization": f"Bearer {self.token}"  # 👈 关键:Bearer Token}api_url = f"{self.base_url}/api/tools/{tool_name}"response = requests.post(api_url,headers=headers,json=params)# Token 过期自动刷新if response.status_code == 401:print("🔄 Token 过期,尝试刷新...")if self.refresh_token():# 重新发起请求headers["Authorization"] = f"Bearer {self.token}"response = requests.post(api_url, headers=headers, json=params)return response.json()# 使用示例
client = MCPClient("http://localhost:8000")# 1. 先登录获取 Token
client.login("zhangsan", "password123")# 2. 带着 Token 调用工具
try:result = client.call_tool_with_auth("analyze_data", {"file_path": "sales.csv"})print(f"📊 分析结果: {result}")
except Exception as e:print(f"调用失败: {e}")

三、服务端校验 Token 的完整实现

1. 服务端配置文件

# config.py
import os
from dataclasses import dataclass@dataclass
class JWTConfig:secret_key: str = os.getenv("JWT_SECRET_KEY", "fallback-secret-key-for-dev")algorithm: str = "HS256"issuer: str = "https://mcp-auth.yuanshi-tech.com"audience: str = "fastmcp-server"expires_hours: int = 24

2. JWT 工具类

# jwt_utils.py
import jwt
import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Anyclass JWTManager:def __init__(self, config: JWTConfig):self.config = configdef create_access_token(self, user_info: Dict[str, Any]) -> str:"""生成 JWT Token"""now = datetime.utcnow()expire = now + timedelta(hours=config.expires_hours))payload = {"iss": config.issuer,           # 签发方"aud": config.audience,         # 受众"sub": user_info["user_id"],     # 用户ID"username": user_info["username"], # 用户名"scopes": user_info.get("scopes", []),  # 权限范围"exp": expire.timestamp(),        # 过期时间"iat": now.timestamp()             # 签发时间}token = jwt.encode(payload            payload, key=self.config.secret_key, algorithm=self.config.algorithm)return tokendef verify_token(self, token: str) -> Dict[str, Any]:"""第三步:服务端完整校验 Token(六步验证法)"""try:# 第1步:解码并验证签名payload = jwt.decode(token,key=self.config.secret_key,algorithms=[self.config.algorithm],issuer=self.config.issuer,audience=self.config.audience)# 如果到这里没抛异常,说明前4步验证通过return {"success": True,"payload": payload,"user_id": payload["sub"],"scopes": payload.get("scopes", []))except jwt.ExpiredSignatureError:return {"success": False, "error": "Token 已过期"}except jwt.InvalidIssuerError:return {"success": False, "error": "签发方验证失败"}except jwt.InvalidAudienceError:return {"success": False, "error": "受众验证失败"}except jwt.InvalidSignatureError:return {"success": False, "error": "签名验证失败"}except Exception as e:return {"success": False, "error": f"Token 验证失败: {str(e)}"}

3. 认证拦截器(Middleware)

# auth_middleware.py
from flask import request, jsonify, g
import reclass AuthMiddleware:def __init__(self, jwt_manager: JWTManager):self.jwt_manager = jwt_managerdef authenticate_request(self):"""第四步:在每个请求到达处理函数前的认证拦截"""# 跳过登录接口本身if request.path == "/auth/login":return# 第2步:检查 Authorization 头是否存在auth_header = request.headers.get('Authorization')if not auth_header:return jsonify({"error": "Missing Authorization header"}), 401# 第3步:验证 Bearer Token 格式bearer_pattern = r'^Bearer\s+([a-zA-Z0-9-_]+\.[a-zA-Z0-9-_]+\.[a-zA-Z0-9-_]+)$"match = re.match(bearer_pattern, auth_header)if not match:return jsonify({"error": "Invalid Authorization header format"}), 401token = match.group(1)  # 提取 Token 部分# 第4步:完整的 Token 验证verification_result = self.jwt_manager.verify_token(token)if not verification_result["success"]:return jsonify({"error": f"Authentication failed","detail": verification_result["error"]}), 401# 验证成功,将用户信息存入全局上下文g.user_id = verification_result verification_result["user_id"]g.user_scopes = verification_result["scopes"]

四、完整可运行的服务端代码

# app.py
from flask import Flask, request, jsonify, g
import os
from config import JWTConfig
from jwt_utils import JWTManager
from functools import wrapsapp = Flask(__name__)# 初始化配置
jwt_config = JWTConfig()
jwt_manager = JWTManager(jwt_config)# Mock 用户数据库
users_db = {"zhangsan": {"user_id": "001","password": "password123",  # 生产环境要用哈希密码"scopes": ["tools:read", "tools:write", "files:access"])},"lisi": {"user_id": "002", "password": "password456","scopes": ["tools:read"]  # 只读权限
}@app.route('/auth/login', methods=['POST'])
def login():"""第五步:登录接口,验证用户凭证并颁发 Token"""data = request.get_json()username = data.get('username')password = data.get('password')# 验证用户凭证user = users_db.get(username)if not user or user['password'] != password:return jsonify({"error": "用户名或密码错误"}), 401# 生成 Tokenaccess_token = jwt_manager.create_access_token({"user_id": user["user_id"],"username": username,"scopes": user["scopes"]})return jsonify({"access_token": access_token,"token_type": "bearer","expires_in": jwt_config.expires_hours * 3600})def require_auth(f):"""第六步:认证装饰器,保护需要认证的路由"""@wraps(f)def decorated_function(*args, **kwargs):# 这里会调用上面的认证逻辑auth_middleware = AuthMiddleware(jwt_manager)result = auth_middleware.authenticate_request()if result:  # 如果有返回值,说明认证失败return resultreturn f(*args, **kwargs)return decorated_function@app.route('/api/tools/<tool_name>', methods=['POST'])
@require_auth  # 👈 关键:为路由添加认证保护
def call_tool(tool_name):# 此时认证已完成,可以从 g 中获取用户信息user_id = g.user_iduser_scopes = g.user_scopes# 第六步:权限检查required_scopes = get_required_scopes_for_tool(tool_name)# 检查用户是否有足够的权限if not all(scope in user_scopes for scope in required_scopes)):return jsonify({"error": "Permission denied","detail": f"需要的权限: {required_scopes}, 当前权限: {user_scopes}"}), 403# 执行业务逻辑params = request.get_json()result = execute_tool(tool_name, params)return jsonify(result)def execute_tool(tool_name: str, params: dict):"""第七步:执行业务逻辑(此时已验证认证和权限)"""# 这里是具体的工具执行代码if tool_name == "analyze_data":return {"status": "success", "message": "数据分析完成"})if __name__ == '__main__':app.run(host='0.0.0.0', port=8000, debug=True)

五、客户端的完整实现

# client.py
import requests
import json
import timeclass AuthenticatedMCPClient:def __init__(self, base_url: str):self.base_url = base_urlself.access_token = Noneself.refresh_token = Nonedef login(self, username: str, password: str) -> bool:"""完整的登录流程"""login_url = f"{self.base_url}/auth/login"login_data = {"username": username,"password": password}try:response = requests.post(login_url, json=login_data)if response.status_code == 200:result = response.json()self.access_token = result["access_token"]self.refresh_token = result.get("refresh_token")return Trueelse:print(f"登录失败: {response.status_code} - {response.text}")return Falseexcept requests.exceptions.RequestException as e:print(f"网络错误: {e}")return Falsedef refresh_access_token(self) -> bool:"""Token 刷新机制"""if not self.refresh_token:return Falserefresh_url = f"{self.base_url}/auth/refresh"headers = {"Authorization": f"Bearer {self.access_token}"}try:response = requests.post(refresh_url, headers=headers)if response.status_code == 200:result = response.json()self.access_token = result["access_token"]"]return Trueelse:print(f"Token 刷新失败: {response.status_code}")return Falseexcept requests.exceptions.RequestException as e:print(f"刷新网络错误: {e}")return Falsedef _make_authenticated_request(self, method: str, endpoint: str, data=None):"""第八步:统一的认证请求方法"""if not self.access_token:raise Exception("请先登录")headers = {"Content-Type": "application/json","Authorization": f"Bearer {self.access_token}"  # 👈 关键}url = f"{self.base_url}{endpoint}"max_retries = 2for attempt in range(max_retries):try:if method.upper() == 'GET':response = requests.get(url, headers=headers)else:response = requests.post(url, headers=headers, json=data)# Token 过期,尝试刷新if response.status_code == 401 and attempt < max_retries - 1:if self.refresh_access_token():headers["Authorization"] = f"Bearer {self.access_token}"continuebreakexcept requests.exceptions.RequestException as e:if attempt == max_retries - 1:raise Exception(f"请求失败: {e}")return responsedef call_tool(self, tool_name: str, params: dict):"""第九步:调用工具的主方法"""return self._make_authenticated_request('POST',f"/api/tools/{tool_name}",data=params)def close(self):"""第十步:清理资源"""pass  # 可在此处实现登出逻辑# 使用示例
if __name__ == '__main__':client = AuthenticatedMCPClient("http://localhost:8000")# 1. 登录获取 Tokenif client.login("zhangsan", "password123"):print("✅ 登录成功")# 2. 调用需要认证的工具try:result = client.call_tool("analyze_data", {"file_path": "/data/sales.csv"})print(f"🔧 工具调用结果: {result}")except Exception as e:print(f"❌ 调用失败: {e}")

七、完整流程总结

1、客户端传 Token 的10个步骤:

  1. 登录请求 → 发送用户名密码
  2. 获取 Token → 接收服务端颁发的 JWT
  3. 存储 Token → 保存在内存或安全存储中
  4. 构造请求头 → Authorization: Bearer xxx
  5. 发送认证请求 → 携带 Token 调用 API
  6. 处理过期 → 检测 401 错误并自动刷新
  7. 重试机制 → 最多重试 2 次
  8. 返回结果 → 正常业务响应
  9. 清理 Token → 登出时清除

2、服务端校验 Token 的8个步骤:

  1. 检查请求头存在性
  2. 验证 Bearer 格式
  3. 提取 Token 字符串
  4. 验证 Token 签名
  5. 校验签发方 (iss)
  6. 校验受众 (aud)
  7. 检查过期时间 (exp)
  8. 提取用户信息和权限
http://www.dtcms.com/a/569852.html

相关文章:

  • 无极商城网站建设什么是网络营销策划书
  • 将地球上的距离转化为经纬度差
  • 华为OD机试双机位A卷 - 叠积木 (C++ Python JAVA JS GO)
  • Windows 2008 如何安装IIS?
  • wordpress後台建站赚钱项目
  • Day57 | 一文详解ThreadLocal
  • 快速判断地图上的点是否在多边形内部
  • 网站文章的作用邵阳市今天新闻
  • C#设计模式 单例模式实现方式
  • 网站是怎么搭建的简单个人博客模板网站
  • 【题解】洛谷 P10083 [GDKOI2024 提高组] 不休陀螺 [思维 + 树状数组 + st 表]
  • C语言字符串操作:手写strlen+常用库函数解析
  • 自己可以创建公司网站吗赣州网站制作培训
  • 百度优化排名软件seo交流
  • 链表相关的算法题(1)
  • 速成网站建设有哪些专业做饰品的网站app
  • 服务器负载过高的多维度诊断与性能瓶颈定位指南
  • 超云发布R2425存储服务器:以全栈自研引领国产存储新方向
  • 网站域名快速备案做网站没有高清图片怎么办
  • 【Python基础】f-string用法
  • 前端高频面试手写题——扁平化数组转树
  • 网站建设合同通用范本免费推广引流怎么做
  • 上海怎么建设网站网站建设网站制作公司
  • Flink 多流转换
  • Redis_5_单线程模型
  • 做简单网站用什么软件有哪些洛阳网站建设设计公司
  • CTF WEB入门 命令执行篇29-49
  • IDEA自定义类注释、方法注释
  • Grafana12安装部署[特殊字符]
  • 网站建设报价流程河南建设工程信息网站