目录: 一、整体架构概览 二、客户端传 Token 的实现 1. 客户端获取 Token(登录流程) 2. 客户端调用 API 时携带 Token 三、服务端校验 Token 的完整实现 1. 服务端配置文件 2. JWT 工具类 3. 认证拦截器(Middleware) 四、完整可运行的服务端代码 五、客户端的完整实现 七、完整流程总结 1、客户端传 Token 的10个步骤: 2、服务端校验 Token 的8个步骤:
一、整体架构概览
二、客户端传 Token 的实现
1. 客户端获取 Token(登录流程)
import requests
import jsonclass MCPClient : def __init__ ( self, base_url) : self. base_url = base_urlself. token = None def login ( self, username: str , password: str ) - > bool : """第一步:客户端通过用户名密码登录,获取 JWT Token""" login_url = f" { self. base_url} /auth/login" login_data = { "username" : username, "password" : password} response = requests. post( login_url, json= login_data) if response. status_code == 200 : result = response. json( ) self. token = result[ "access_token" ] print ( f"✅ 登录成功,用户: { username} " ) return True else : print ( f"❌ 登录失败: { response. text} " ) return False
2. 客户端调用 API 时携带 Token
class MCPClient : def call_tool_with_auth ( self, tool_name: str , params: dict ) : """第二步:在所有需要认证的请求中携带 Token""" if not self. token: raise Exception( "请先登录获取 Token" ) headers = { "Content-Type" : "application/json" , "Authorization" : f"Bearer { self. token} " } api_url = f" { self. base_url} /api/tools/ { tool_name} " response = requests. post( api_url, headers= headers, json= params) if response. status_code == 401 : print ( "🔄 Token 过期,尝试刷新..." ) if self. refresh_token( ) : headers[ "Authorization" ] = f"Bearer { self. token} " response = requests. post( api_url, headers= headers, json= params) return response. json( )
client = MCPClient( "http://localhost:8000" )
client. login( "zhangsan" , "password123" )
try : result = client. call_tool_with_auth( "analyze_data" , { "file_path" : "sales.csv" } ) print ( f"📊 分析结果: { result} " )
except Exception as e: print ( f"调用失败: { e} " )
三、服务端校验 Token 的完整实现
1. 服务端配置文件
import os
from dataclasses import dataclass@dataclass
class JWTConfig : secret_key: str = os. getenv( "JWT_SECRET_KEY" , "fallback-secret-key-for-dev" ) algorithm: str = "HS256" issuer: str = "https://mcp-auth.yuanshi-tech.com" audience: str = "fastmcp-server" expires_hours: int = 24
2. JWT 工具类
import jwt
import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Anyclass JWTManager : def __init__ ( self, config: JWTConfig) : self. config = configdef create_access_token ( self, user_info: Dict[ str , Any] ) - > str : """生成 JWT Token""" now = datetime. utcnow( ) expire = now + timedelta( hours= config. expires_hours) ) payload = { "iss" : config. issuer, "aud" : config. audience, "sub" : user_info[ "user_id" ] , "username" : user_info[ "username" ] , "scopes" : user_info. get( "scopes" , [ ] ) , "exp" : expire. timestamp( ) , "iat" : now. timestamp( ) } token = jwt. encode( payload payload, key= self. config. secret_key, algorithm= self. config. algorithm) return tokendef verify_token ( self, token: str ) - > Dict[ str , Any] : """第三步:服务端完整校验 Token(六步验证法)""" try : payload = jwt. decode( token, key= self. config. secret_key, algorithms= [ self. config. algorithm] , issuer= self. config. issuer, audience= self. config. audience) return { "success" : True , "payload" : payload, "user_id" : payload[ "sub" ] , "scopes" : payload. get( "scopes" , [ ] ) ) except jwt. ExpiredSignatureError: return { "success" : False , "error" : "Token 已过期" } except jwt. InvalidIssuerError: return { "success" : False , "error" : "签发方验证失败" } except jwt. InvalidAudienceError: return { "success" : False , "error" : "受众验证失败" } except jwt. InvalidSignatureError: return { "success" : False , "error" : "签名验证失败" } except Exception as e: return { "success" : False , "error" : f"Token 验证失败: { str ( e) } " }
3. 认证拦截器(Middleware)
from flask import request, jsonify, g
import reclass AuthMiddleware : def __init__ ( self, jwt_manager: JWTManager) : self. jwt_manager = jwt_managerdef authenticate_request ( self) : """第四步:在每个请求到达处理函数前的认证拦截""" if request. path == "/auth/login" : return auth_header = request. headers. get( 'Authorization' ) if not auth_header: return jsonify( { "error" : "Missing Authorization header" } ) , 401 bearer_pattern = r'^ Bearer\s+ ( [ a- zA- Z0- 9 - _] + \. [ a- zA- Z0- 9 - _] + \. [ a- zA- Z0- 9 - _] + ) $"match = re. match( bearer_pattern, auth_header) if not match: return jsonify( { "error" : "Invalid Authorization header format" } ) , 401 token = match. group( 1 ) verification_result = self. jwt_manager. verify_token( token) if not verification_result[ "success" ] : return jsonify( { "error" : f"Authentication failed" , "detail" : verification_result[ "error" ] } ) , 401 g. user_id = verification_result verification_result[ "user_id" ] g. user_scopes = verification_result[ "scopes" ]
四、完整可运行的服务端代码
from flask import Flask, request, jsonify, g
import os
from config import JWTConfig
from jwt_utils import JWTManager
from functools import wrapsapp = Flask( __name__)
jwt_config = JWTConfig( )
jwt_manager = JWTManager( jwt_config)
users_db = { "zhangsan" : { "user_id" : "001" , "password" : "password123" , "scopes" : [ "tools:read" , "tools:write" , "files:access" ] ) } , "lisi" : { "user_id" : "002" , "password" : "password456" , "scopes" : [ "tools:read" ]
} @app. route ( '/auth/login' , methods= [ 'POST' ] )
def login ( ) : """第五步:登录接口,验证用户凭证并颁发 Token""" data = request. get_json( ) username = data. get( 'username' ) password = data. get( 'password' ) user = users_db. get( username) if not user or user[ 'password' ] != password: return jsonify( { "error" : "用户名或密码错误" } ) , 401 access_token = jwt_manager. create_access_token( { "user_id" : user[ "user_id" ] , "username" : username, "scopes" : user[ "scopes" ] } ) return jsonify( { "access_token" : access_token, "token_type" : "bearer" , "expires_in" : jwt_config. expires_hours * 3600 } ) def require_auth ( f) : """第六步:认证装饰器,保护需要认证的路由""" @wraps ( f) def decorated_function ( * args, ** kwargs) : auth_middleware = AuthMiddleware( jwt_manager) result = auth_middleware. authenticate_request( ) if result: return resultreturn f( * args, ** kwargs) return decorated_function@app. route ( '/api/tools/<tool_name>' , methods= [ 'POST' ] )
@require_auth
def call_tool ( tool_name) : user_id = g. user_iduser_scopes = g. user_scopesrequired_scopes = get_required_scopes_for_tool( tool_name) if not all ( scope in user_scopes for scope in required_scopes) ) : return jsonify( { "error" : "Permission denied" , "detail" : f"需要的权限: { required_scopes} , 当前权限: { user_scopes} " } ) , 403 params = request. get_json( ) result = execute_tool( tool_name, params) return jsonify( result) def execute_tool ( tool_name: str , params: dict ) : """第七步:执行业务逻辑(此时已验证认证和权限)""" if tool_name == "analyze_data" : return { "status" : "success" , "message" : "数据分析完成" } ) if __name__ == '__main__' : app. run( host= '0.0.0.0' , port= 8000 , debug= True )
五、客户端的完整实现
import requests
import json
import timeclass AuthenticatedMCPClient : def __init__ ( self, base_url: str ) : self. base_url = base_urlself. access_token = None self. refresh_token = None def login ( self, username: str , password: str ) - > bool : """完整的登录流程""" login_url = f" { self. base_url} /auth/login" login_data = { "username" : username, "password" : password} try : response = requests. post( login_url, json= login_data) if response. status_code == 200 : result = response. json( ) self. access_token = result[ "access_token" ] self. refresh_token = result. get( "refresh_token" ) return True else : print ( f"登录失败: { response. status_code} - { response. text} " ) return False except requests. exceptions. RequestException as e: print ( f"网络错误: { e} " ) return False def refresh_access_token ( self) - > bool : """Token 刷新机制""" if not self. refresh_token: return False refresh_url = f" { self. base_url} /auth/refresh" headers = { "Authorization" : f"Bearer { self. access_token} " } try : response = requests. post( refresh_url, headers= headers) if response. status_code == 200 : result = response. json( ) self. access_token = result[ "access_token" ] "] return True else : print ( f"Token 刷新失败: { response. status_code} " ) return False except requests. exceptions. RequestException as e: print ( f"刷新网络错误: { e} " ) return False def _make_authenticated_request ( self, method: str , endpoint: str , data= None ) : """第八步:统一的认证请求方法""" if not self. access_token: raise Exception( "请先登录" ) headers = { "Content-Type" : "application/json" , "Authorization" : f"Bearer { self. access_token} " } url = f" { self. base_url} { endpoint} " max_retries = 2 for attempt in range ( max_retries) : try : if method. upper( ) == 'GET' : response = requests. get( url, headers= headers) else : response = requests. post( url, headers= headers, json= data) if response. status_code == 401 and attempt < max_retries - 1 : if self. refresh_access_token( ) : headers[ "Authorization" ] = f"Bearer { self. access_token} " continue break except requests. exceptions. RequestException as e: if attempt == max_retries - 1 : raise Exception( f"请求失败: { e} " ) return responsedef call_tool ( self, tool_name: str , params: dict ) : """第九步:调用工具的主方法""" return self. _make_authenticated_request( 'POST' , f"/api/tools/ { tool_name} " , data= params) def close ( self) : """第十步:清理资源""" pass
if __name__ == '__main__' : client = AuthenticatedMCPClient( "http://localhost:8000" ) if client. login( "zhangsan" , "password123" ) : print ( "✅ 登录成功" ) try : result = client. call_tool( "analyze_data" , { "file_path" : "/data/sales.csv" } ) print ( f"🔧 工具调用结果: { result} " ) except Exception as e: print ( f"❌ 调用失败: { e} " )
七、完整流程总结
1、客户端传 Token 的10个步骤:
登录请求 → 发送用户名密码 获取 Token → 接收服务端颁发的 JWT 存储 Token → 保存在内存或安全存储中 构造请求头 → Authorization: Bearer xxx 发送认证请求 → 携带 Token 调用 API 处理过期 → 检测 401 错误并自动刷新 重试机制 → 最多重试 2 次 返回结果 → 正常业务响应 清理 Token → 登出时清除
2、服务端校验 Token 的8个步骤:
检查请求头存在性 验证 Bearer 格式 提取 Token 字符串 验证 Token 签名 校验签发方 (iss) 校验受众 (aud) 检查过期时间 (exp) 提取用户信息和权限