当前位置: 首页 > news >正文

用Python和FastAPI构建一个完整的企业级AI Agent微服务脚手架

步骤1:项目架构设计

目录结构:

ai-agent-service/
├── app/
│   ├── __init__.py
│   ├── main.py                 # FastAPI应用入口
│   ├── core/                   # 核心配置和工具
│   │   ├── __init__.py
│   │   ├── config.py          # 配置管理
│   │   ├── security.py        # 安全工具
│   │   └── logging.py         # 日志配置
│   ├── api/                   # API路由
│   │   ├── __init__.py
│   │   ├── endpoints/         # 各个端点
│   │   │   ├── __init__.py
│   │   │   ├── chat.py        # 聊天端点
│   │   │   ├── tools.py       # 工具端点
│   │   │   └── admin.py       # 管理端点
│   │   └── dependencies.py    # 依赖注入
│   ├── services/              # 业务逻辑层
│   │   ├── __init__.py
│   │   ├── llm_service.py     # LLM服务
│   │   ├── chat_service.py   # 聊天服务
│   │   └── tool_service.py   # 工具服务
│   ├── models/               # 数据模型
│   │   ├── __init__.py
│   │   ├── schemas.py        # Pydantic模型
│   │   └── database.py       # 数据库模型
│   ├── tools/               # 工具系统
│   │   ├── __init__.py
│   │   ├── base.py          # 工具基类
│   │   └── registry.py      # 工具注册
│   └── utils/               # 工具函数
│       ├── __init__.py
│       └── helpers.py
├── tests/                   # 测试文件
├── docker/                 # Docker配置
├── k8s/                   # Kubernetes配置
├── scripts/               # 部署脚本
├── requirements.txt       # Python依赖
├── Dockerfile
├── docker-compose.yml
└── README.md

步骤2:核心依赖和配置

requirements.txt:
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-dotenv==1.0.0
python-multipart==0.0.6
sqlalchemy==2.0.23
alembic==1.12.1
psycopg2-binary==2.9.9
redis==5.0.1
httpx==0.25.2
aiofiles==23.2.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
prometheus-client==0.19.0
opentelemetry-api==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-instrumentation-fastapi==0.41b0
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0

app/core/config.py:

import os
from typing import Optional
from pydantic_settings import BaseSettings
from dotenv import load_dotenvload_dotenv()class Settings(BaseSettings):"""应用配置类"""# 应用配置APP_NAME: str = "AI Agent Service"VERSION: str = "1.0.0"DEBUG: bool = False# API配置API_V1_STR: str = "/api/v1"PROJECT_NAME: str = "ai-agent-service"# 安全配置SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-here")ALGORITHM: str = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES: int = 30# 数据库配置DATABASE_URL: str = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/ai_agent")REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379")# LLM配置OPENAI_API_KEY: Optional[str] = os.getenv("OPENAI_API_KEY")OPENAI_BASE_URL: Optional[str] = os.getenv("OPENAI_BASE_URL")MODEL_NAME: str = os.getenv("MODEL_NAME", "gpt-3.5-turbo")# 速率限制RATE_LIMIT_PER_MINUTE: int = 60class Config:case_sensitive = Trueenv_file = ".env"settings = Settings()

app/core/logging.py:

import logging
import sys
from loguru import logger
from app.core.config import settingsdef setup_logging():"""配置结构化日志"""# 移除默认处理器logger.remove()# 添加控制台处理器logger.add(sys.stdout,level="DEBUG" if settings.DEBUG else "INFO",format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",backtrace=True,diagnose=settings.DEBUG)# 添加文件处理器(生产环境)if not settings.DEBUG:logger.add("logs/ai_agent.log",rotation="10 MB",retention="30 days",level="INFO",format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}")return logger# 创建全局logger实例
log = setup_logging()

步骤3:数据模型设计

app/models/schemas.py:

from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enumclass MessageRole(str, Enum):USER = "user"ASSISTANT = "assistant"SYSTEM = "system"TOOL = "tool"class ToolType(str, Enum):FUNCTION = "function"API = "api"DATABASE = "database"class ChatRequest(BaseModel):"""聊天请求模型"""message: str = Field(..., description="用户消息")conversation_id: Optional[str] = Field(None, description="会话ID")tools: Optional[List[str]] = Field(None, description="要使用的工具列表")stream: bool = Field(False, description="是否流式响应")class Config:json_schema_extra = {"example": {"message": "你好,请帮我查询天气","conversation_id": "conv_123","tools": ["weather", "calculator"],"stream": False}}class ChatResponse(BaseModel):"""聊天响应模型"""message: str = Field(..., description="AI回复")conversation_id: str = Field(..., description="会话ID")tool_calls: Optional[List[Dict[str, Any]]] = Field(None, description="工具调用")usage: Optional[Dict[str, int]] = Field(None, description="使用统计")class Config:json_schema_extra = {"example": {"message": "你好!我是AI助手,很高兴为您服务。","conversation_id": "conv_123","tool_calls": None,"usage": {"prompt_tokens": 10, "completion_tokens": 20}}}class ToolCall(BaseModel):"""工具调用模型"""tool_name: str = Field(..., description="工具名称")parameters: Dict[str, Any] = Field(..., description="工具参数")class Config:json_schema_extra = {"example": {"tool_name": "weather","parameters": {"city": "北京"}}}class ToolResponse(BaseModel):"""工具响应模型"""tool_name: str = Field(..., description="工具名称")result: Any = Field(..., description="工具执行结果")success: bool = Field(..., description="执行是否成功")error_message: Optional[str] = Field(None, description="错误信息")class Conversation(BaseModel):"""会话模型"""id: str = Field(..., description="会话ID")user_id: str = Field(..., description="用户ID")title: str = Field(..., description="会话标题")created_at: datetime = Field(..., description="创建时间")updated_at: datetime = Field(..., description="更新时间")message_count: int = Field(..., description="消息数量")class HealthCheck(BaseModel):"""健康检查响应"""status: str = Field(..., description="服务状态")version: str = Field(..., description="服务版本")timestamp: datetime = Field(..., description="检查时间")class ErrorResponse(BaseModel):"""错误响应模型"""error: str = Field(..., description="错误类型")message: str = Field(..., description="错误信息")details: Optional[Dict[str, Any]] = Field(None, description="错误详情")

步骤4:路由和API设计

app/main.py:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from contextlib import asynccontextmanager
import timefrom app.core.config import settings
from app.core.logging import log
from app.api.endpoints import chat, tools, admin
from app.api.dependencies import get_current_user
from app.models.schemas import HealthCheck@asynccontextmanager
async def lifespan(app: FastAPI):"""应用生命周期管理"""# 启动时执行log.info("🚀 Starting AI Agent Service")yield# 关闭时执行log.info("🛑 Shutting down AI Agent Service")app = FastAPI(title=settings.PROJECT_NAME,version=settings.VERSION,description="企业级AI Agent微服务",docs_url="/docs" if settings.DEBUG else None,redoc_url="/redoc" if settings.DEBUG else None,lifespan=lifespan
)# 中间件配置
app.add_middleware(CORSMiddleware,allow_origins=["*"] if settings.DEBUG else ["https://yourdomain.com"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)app.add_middleware(TrustedHostMiddleware,allowed_hosts=["*"] if settings.DEBUG else ["yourdomain.com"]
)@app.middleware("http")
async def add_process_time_header(request, call_next):"""添加请求处理时间头"""start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(process_time)return response# 健康检查端点
@app.get("/health", response_model=HealthCheck, tags=["health"])
async def health_check():return HealthCheck(status="healthy",version=settings.VERSION,timestamp=time.time())# 注册路由
app.include_router(chat.router,prefix=settings.API_V1_STR,tags=["chat"],dependencies=[Depends(get_current_user)]
)app.include_router(tools.router,prefix=settings.API_V1_STR,tags=["tools"],dependencies=[Depends(get_current_user)]
)app.include_router(admin.router,prefix=settings.API_V1_STR + "/admin",tags=["admin"],dependencies=[Depends(get_current_user)]
)@app.get("/")
async def root():return {"message": f"Welcome to {settings.APP_NAME}"}if __name__ == "__main__":import uvicornuvicorn.run("app.main:app",host="0.0.0.0",port=8000,reload=settings.DEBUG,log_level="info" if settings.DEBUG else "warning")

app/api/endpoints/chat.py:

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from typing import Optionalfrom app.services.chat_service import ChatService
from app.models.schemas import ChatRequest, ChatResponse, ErrorResponse
from app.api.dependencies import get_chat_service
from app.core.logging import logrouter = APIRouter()@router.post("/chat",response_model=ChatResponse,responses={400: {"model": ErrorResponse},429: {"model": ErrorResponse},500: {"model": ErrorResponse}}
)
async def chat_endpoint(request: ChatRequest,chat_service: ChatService = Depends(get_chat_service)
):"""处理聊天请求"""try:log.info(f"Processing chat request for conversation: {request.conversation_id}")if request.stream:# 流式响应处理return StreamingResponse(chat_service.stream_chat(request),media_type="text/event-stream")else:# 普通响应return await chat_service.process_chat(request)except Exception as e:log.error(f"Chat processing error: {str(e)}")raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail=f"Chat processing failed: {str(e)}")@router.get("/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):"""获取会话历史"""# 实现获取会话逻辑return {"conversation_id": conversation_id, "messages": []}@router.delete("/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):"""删除会话"""# 实现删除会话逻辑return {"message": f"Conversation {conversation_id} deleted"}

步骤5:服务层设计

app/services/chat_service.py:

from typing import AsyncGenerator, Optional
from app.models.schemas import ChatRequest, ChatResponse
from app.services.llm_service import LLMService
from app.core.logging import logclass ChatService:"""聊天服务"""def __init__(self, llm_service: LLMService):self.llm_service = llm_serviceasync def process_chat(self, request: ChatRequest) -> ChatResponse:"""处理聊天请求"""try:# 处理消息逻辑response = await self.llm_service.generate_response(message=request.message,conversation_id=request.conversation_id,tools=request.tools)return ChatResponse(message=response["content"],conversation_id=response["conversation_id"],tool_calls=response.get("tool_calls"),usage=response.get("usage"))except Exception as e:log.error(f"Chat processing error: {str(e)}")raiseasync def stream_chat(self, request: ChatRequest) -> AsyncGenerator[str, None]:"""流式聊天响应"""try:async for chunk in self.llm_service.stream_response(message=request.message,conversation_id=request.conversation_id,tools=request.tools):yield f"data: {chunk}\n\n"except Exception as e:log.error(f"Stream chat error: {str(e)}")yield f"data: {{'error': '{str(e)}'}}\n\n"

app/services/llm_service.py:

import httpx
from typing import AsyncGenerator, List, Optional, Dict, Any
from app.core.config import settings
from app.core.logging import logclass LLMService:"""LLM服务基类"""async def generate_response(self, message: str, conversation_id: Optional[str] = None,tools: Optional[List[str]] = None) -> Dict[str, Any]:"""生成响应"""raise NotImplementedErrorasync def stream_response(self, message: str, conversation_id: Optional[str] = None,tools: Optional[List[str]] = None) -> AsyncGenerator[str, None]:"""流式生成响应"""raise NotImplementedErrorclass OpenAIService(LLMService):"""OpenAI服务实现"""def __init__(self):self.api_key = settings.OPENAI_API_KEYself.base_url = settings.OPENAI_BASE_URL or "https://api.openai.com/v1"self.model = settings.MODEL_NAMEself.client = httpx.AsyncClient(base_url=self.base_url,headers={"Authorization": f"Bearer {self.api_key}"},timeout=30.0)async def generate_response(self, message: str, conversation_id: Optional[str] = None, tools: Optional[List[str]] = None) -> Dict[str, Any]:try:messages = [{"role": "user", "content": message}]response = await self.client.post("/chat/completions",json={"model": self.model,"messages": messages,"max_tokens": 1000,"temperature": 0.7})if response.status_code != 200:raise Exception(f"OpenAI API error: {response.text}")data = response.json()return {"content": data["choices"][0]["message"]["content"],"conversation_id": conversation_id or f"conv_{hash(message)}","usage": data.get("usage")}except Exception as e:log.error(f"OpenAI service error: {str(e)}")raiseasync def stream_response(self, message: str, conversation_id: Optional[str] = None, tools: Optional[List[str]] = None) -> AsyncGenerator[str, None]:try:messages = [{"role": "user", "content": message}]async with self.client.stream("POST","/chat/completions",json={"model": self.model,"messages": messages,"max_tokens": 1000,"temperature": 0.7,"stream": True}) as response:async for line in response.aiter_lines():if line.startswith("data: "):data = line[6:]if data != "[DONE]":yield dataexcept Exception as e:log.error(f"OpenAI stream error: {str(e)}")yield f'{{"error": "{str(e)}"}}'

步骤6:工具系统设计

app/tools/base.py:

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from app.core.logging import logclass ToolParameter(BaseModel):"""工具参数定义"""name: str = Field(..., description="参数名称")type: str = Field(..., description="参数类型")description: str = Field(..., description="参数描述")required: bool = Field(True, description="是否必需")class BaseTool(ABC):"""工具基类"""def __init__(self):self.name = self.__class__.__name__.lower().replace('tool', '')self.description = "A general purpose tool"self.parameters = []@abstractmethodasync def execute(self, **kwargs) -> Any:"""执行工具"""passdef get_schema(self) -> Dict[str, Any]:"""获取工具定义"""return {"name": self.name,"description": self.description,"parameters": [param.dict() for param in self.parameters]}class CalculatorTool(BaseTool):"""计算器工具"""def __init__(self):super().__init__()self.name = "calculator"self.description = "Perform mathematical calculations"self.parameters = [ToolParameter(name="expression",type="string",description="Mathematical expression to evaluate",required=True)]async def execute(self, expression: str) -> float:"""执行计算"""try:# 安全评估数学表达式allowed_chars = set('0123456789+-*/.() ')if not all(c in allowed_chars for c in expression):raise ValueError("Invalid characters in expression")result = eval(expression)  # 注意:生产环境需要更安全的方式log.info(f"Calculator tool executed: {expression} = {result}")return resultexcept Exception as e:log.error(f"Calculator error: {str(e)}")raise ValueError(f"Calculation failed: {str(e)}")

app/tools/registry.py:

from typing import Dict, List, Optional
from app.tools.base import BaseTool
from app.core.logging import logclass ToolRegistry:"""工具注册表"""def __init__(self):self._tools: Dict[str, BaseTool] = {}def register_tool(self, tool: BaseTool):"""注册工具"""self._tools[tool.name] = toollog.info(f"Tool registered: {tool.name}")def get_tool(self, name: str) -> Optional[BaseTool]:"""获取工具"""return self._tools.get(name)def list_tools(self) -> List[Dict]:"""列出所有工具"""return [tool.get_schema() for tool in self._tools.values()]async def execute_tool(self, name: str, **kwargs) -> Any:"""执行工具"""tool = self.get_tool(name)if not tool:raise ValueError(f"Tool not found: {name}")return await tool.execute(**kwargs)# 全局工具注册表实例
tool_registry = ToolRegistry()# 注册默认工具
def register_default_tools():"""注册默认工具"""from app.tools.base import CalculatorToolcalculator = CalculatorTool()tool_registry.register_tool(calculator)# 初始化时注册工具
register_default_tools()

步骤7:身份认证和授权

app/core/security.py:

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
from app.core.logging import logpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password: str, hashed_password: str) -> bool:"""验证密码"""return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password: str) -> str:"""生成密码哈希"""return pwd_context.hash(password)def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:"""创建访问令牌"""to_encode = data.copy()if expires_delta:expire = datetime.utcnow() + expires_deltaelse:expire = datetime.utcnow() + timedelta(minutes=15)to_encode.update({"exp": expire})encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)return encoded_jwtdef verify_token(token: str) -> Optional[dict]:"""验证令牌"""try:payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])return payloadexcept JWTError:return None

app/api/dependencies.py:

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.core.security import verify_token
from app.core.logging import log
from app.services.llm_service import LLMService, OpenAIService
from app.services.chat_service import ChatServicesecurity = HTTPBearer()async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):"""获取当前用户"""token = credentials.credentialspayload = verify_token(token)if payload is None:log.warning("Invalid authentication token")raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate": "Bearer"},)# 这里可以添加用户验证逻辑return payloaddef get_llm_service() -> LLMService:"""获取LLM服务实例"""return OpenAIService()def get_chat_service(llm_service: LLMService = Depends(get_llm_service)) -> ChatService:"""获取聊天服务实例"""return ChatService(llm_service)

Docker配置

Dockerfile:

FROM python:3.11-slimWORKDIR /app# 安装系统依赖
RUN apt-get update && apt-get install -y \gcc \&& rm -rf /var/lib/apt/lists/*# 复制依赖文件
COPY requirements.txt .# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码
COPY . .# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser# 暴露端口
EXPOSE 8000# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml:
version: ‘3.8’

services:
ai-agent:
build: .
ports:
- “8000:8000”
environment:
- DEBUG=true
- DATABASE_URL=postgresql://user:pass@db:5432/ai_agent
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- ./logs:/app/logs

db:
image: postgres:13
environment:
- POSTGRES_DB=ai_agent
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data

redis:
image: redis:7-alpine
volumes:
- redis_data:/data

volumes:
postgres_data:
redis_data:


这个脚手架提供了:
• ✅ 完整的项目结构• ✅ 配置管理• ✅ 数据模型• ✅ API路由• ✅ 服务层• ✅ 工具系统• ✅ 身份认证• ✅ 日志系统• ✅ Docker配置步骤8:数据库模型和配置增强app/models/database.py:
```python
from sqlalchemy import Column, String, Text, DateTime, Integer, Boolean, JSON, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy import create_engine
from datetime import datetime
import uuid
from app.core.config import settingsBase = declarative_base()def generate_uuid():return str(uuid.uuid4())class User(Base):"""用户模型"""__tablename__ = "users"id = Column(String, primary_key=True, default=generate_uuid)username = Column(String(50), unique=True, index=True, nullable=False)email = Column(String(100), unique=True, index=True, nullable=False)hashed_password = Column(String(255), nullable=False)is_active = Column(Boolean, default=True)is_superuser = Column(Boolean, default=False)created_at = Column(DateTime, default=datetime.utcnow)updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)# 关系conversations = relationship("Conversation", back_populates="user")api_keys = relationship("APIKey", back_populates="user")class Conversation(Base):"""会话模型"""__tablename__ = "conversations"id = Column(String, primary_key=True, default=generate_uuid)user_id = Column(String, ForeignKey("users.id"), nullable=False)title = Column(String(200), nullable=False)created_at = Column(DateTime, default=datetime.utcnow)updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)message_count = Column(Integer, default=0)# 关系user = relationship("User", back_populates="conversations")messages = relationship("Message", back_populates="conversation")class Message(Base):"""消息模型"""__tablename__ = "messages"id = Column(String, primary_key=True, default=generate_uuid)conversation_id = Column(String, ForeignKey("conversations.id"), nullable=False)role = Column(String(20), nullable=False)  # user, assistant, system, toolcontent = Column(Text, nullable=False)tool_calls = Column(JSON, nullable=True)tool_results = Column(JSON, nullable=True)token_count = Column(Integer, default=0)created_at = Column(DateTime, default=datetime.utcnow)# 关系conversation = relationship("Conversation", back_populates="messages")class APIKey(Base):"""API密钥模型"""__tablename__ = "api_keys"id = Column(String, primary_key=True, default=generate_uuid)user_id = Column(String, ForeignKey("users.id"), nullable=False)name = Column(String(100), nullable=False)key_hash = Column(String(255), nullable=False, unique=True, index=True)is_active = Column(Boolean, default=True)last_used = Column(DateTime, nullable=True)created_at = Column(DateTime, default=datetime.utcnow)expires_at = Column(DateTime, nullable=True)# 关系user = relationship("User", back_populates="api_keys")class ToolCallLog(Base):"""工具调用日志"""__tablename__ = "tool_call_logs"id = Column(String, primary_key=True, default=generate_uuid)user_id = Column(String, ForeignKey("users.id"), nullable=False)tool_name = Column(String(100), nullable=False)parameters = Column(JSON, nullable=True)result = Column(Text, nullable=True)success = Column(Boolean, default=True)error_message = Column(Text, nullable=True)duration_ms = Column(Integer, default=0)created_at = Column(DateTime, default=datetime.utcnow)# 数据库引擎和会话
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)def get_db():"""获取数据库会话"""db = SessionLocal()try:yield dbfinally:db.close()def create_tables():"""创建数据库表"""Base.metadata.create_all(bind=engine)

app/api/endpoints/admin.py:

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Listfrom app.models.database import get_db, User, Conversation, ToolCallLog
from app.models.schemas import Conversation as ConversationSchema
from app.api.dependencies import get_current_user
from app.core.logging import logrouter = APIRouter()@router.get("/users/{user_id}/conversations", response_model=List[ConversationSchema])
async def get_user_conversations(user_id: str,skip: int = 0,limit: int = 100,db: Session = Depends(get_db),current_user: dict = Depends(get_current_user)
):"""获取用户会话列表"""# 检查权限if current_user.get("user_id") != user_id and not current_user.get("is_superuser", False):raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,detail="Not enough permissions")conversations = db.query(Conversation).filter(Conversation.user_id == user_id).offset(skip).limit(limit).all()return conversations@router.get("/metrics/tool-usage")
async def get_tool_usage_metrics(days: int = 7,db: Session = Depends(get_db),current_user: dict = Depends(get_current_user)
):"""获取工具使用统计"""if not current_user.get("is_superuser", False):raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,detail="Admin access required")# 实现工具使用统计查询return {"message": "Tool usage metrics endpoint"}@router.delete("/conversations/{conversation_id}")
async def admin_delete_conversation(conversation_id: str,db: Session = Depends(get_db),current_user: dict = Depends(get_current_user)
):"""管理员删除会话"""if not current_user.get("is_superuser", False):raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,detail="Admin access required")# 实现管理员删除会话逻辑return {"message": f"Conversation {conversation_id} deleted by admin"}

步骤9:监控和可观测性

app/core/monitoring.py:

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client import REGISTRY
from fastapi import Request, Response
import time
from app.core.logging import log# 定义指标
REQUEST_COUNT = Counter('http_requests_total','Total HTTP Requests',['method', 'endpoint', 'status_code']
)REQUEST_DURATION = Histogram('http_request_duration_seconds','HTTP request duration in seconds',['method', 'endpoint']
)ACTIVE_REQUESTS = Gauge('http_requests_active','Active HTTP requests'
)LLM_CALL_COUNT = Counter('llm_calls_total','Total LLM API calls',['provider', 'model', 'status']
)TOOL_CALL_COUNT = Counter('tool_calls_total','Total tool calls',['tool_name', 'status']
)class PrometheusMiddleware:"""Prometheus监控中间件"""def __init__(self, app):self.app = appasync def __call__(self, scope, receive, send):if scope["type"] != "http":return await self.app(scope, receive, send)start_time = time.time()method = scope["method"]path = self._get_path(scope["path"])ACTIVE_REQUESTS.inc()async def send_wrapper(message):if message["type"] == "http.response.start":status_code = message["status"]REQUEST_COUNT.labels(method=method, endpoint=path, status_code=status_code).inc()REQUEST_DURATION.labels(method=method, endpoint=path).observe(time.time() - start_time)await send(message)try:await self.app(scope, receive, send_wrapper)finally:ACTIVE_REQUESTS.dec()def _get_path(self, path: str) -> str:"""规范化路径用于指标"""if path.startswith("/api/v1"):# 提取API版本后的第一部分作为端点parts = path.split("/")if len(parts) > 3:return f"/api/v1/{parts[3]}"return pathdef metrics_endpoint(request: Request) -> Response:"""提供Prometheus指标"""return Response(generate_latest(REGISTRY), media_type="text/plain")def record_llm_call(provider: str, model: str, success: bool = True):"""记录LLM调用"""status = "success" if success else "error"LLM_CALL_COUNT.labels(provider=provider, model=model, status=status).inc()def record_tool_call(tool_name: str, success: bool = True):"""记录工具调用"""status = "success" if success else "error"TOOL_CALL_COUNT.labels(tool_name=tool_name, status=status).inc()

在main.py中添加监控:

from app.core.monitoring import PrometheusMiddleware, metrics_endpoint# 添加监控中间件(在中间件配置部分)
app.add_middleware(PrometheusMiddleware)# 添加指标端点(在路由部分)
@app.get("/metrics")
async def metrics():from app.core.monitoring import metrics_endpointreturn metrics_endpoint

步骤10:缓存和性能优化

app/core/cache.py:

import redis
import json
import pickle
from typing import Any, Optional
from app.core.config import settings
from app.core.logging import logclass CacheManager:"""Redis缓存管理器"""def __init__(self):self.redis_client = redis.from_url(settings.REDIS_URL, decode_responses=False)self.default_ttl = 3600  # 1小时默认TTLasync def get(self, key: str) -> Optional[Any]:"""获取缓存值"""try:data = self.redis_client.get(key)if data:return pickle.loads(data)return Noneexcept Exception as e:log.error(f"Cache get error for key {key}: {e}")return Noneasync def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:"""设置缓存值"""try:serialized_value = pickle.dumps(value)expire_time = ttl if ttl is not None else self.default_ttlself.redis_client.setex(key, expire_time, serialized_value)return Trueexcept Exception as e:log.error(f"Cache set error for key {key}: {e}")return Falseasync def delete(self, key: str) -> bool:"""删除缓存"""try:result = self.redis_client.delete(key)return result > 0except Exception as e:log.error(f"Cache delete error for key {key}: {e}")return Falseasync def get_or_set(self, key: str, default_func, ttl: Optional[int] = None) -> Any:"""获取或设置缓存"""cached = await self.get(key)if cached is not None:return cachedvalue = default_func()await self.set(key, value, ttl)return valueasync def clear_pattern(self, pattern: str) -> int:"""清除匹配模式的缓存"""try:keys = self.redis_client.keys(pattern)if keys:return self.redis_client.delete(*keys)return 0except Exception as e:log.error(f"Cache clear pattern error: {e}")return 0# 全局缓存实例
cache = CacheManager()

步骤11:速率限制

app/core/rate_limiting.py:

from fastapi import HTTPException, status
from app.core.cache import cache
from app.core.logging import log
import time
from typing import Optionalclass RateLimiter:"""速率限制器"""def __init__(self, requests_per_minute: int = 60):self.requests_per_minute = requests_per_minuteasync def check_rate_limit(self, identifier: str, cost: int = 1) -> bool:"""检查速率限制"""cache_key = f"rate_limit:{identifier}"# 获取当前窗口数据window_data = await cache.get(cache_key) or {'count': 0,'window_start': time.time()}current_time = time.time()window_duration = 60  # 60秒窗口# 如果窗口已过期,重置if current_time - window_data['window_start'] > window_duration:window_data = {'count': 0,'window_start': current_time}# 检查是否超过限制if window_data['count'] + cost > self.requests_per_minute:log.warning(f"Rate limit exceeded for {identifier}")return False# 更新计数window_data['count'] += costawait cache.set(cache_key, window_data, window_duration)return Trueasync def get_remaining_requests(self, identifier: str) -> int:"""获取剩余请求数"""cache_key = f"rate_limit:{identifier}"window_data = await cache.get(cache_key) or {'count': 0,'window_start': time.time()}return max(0, self.requests_per_minute - window_data['count'])def rate_limit_middleware(requests_per_minute: int = 60):"""速率限制中间件工厂"""limiter = RateLimiter(requests_per_minute)async def middleware(identifier: str, cost: int = 1):if not await limiter.check_rate_limit(identifier, cost):raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS,detail=f"Rate limit exceeded. Try again in 60 seconds.")return Truereturn middleware# 默认速率限制器
default_limiter = RateLimiter()

步骤12:测试框架

tests/conftest.py:

import pytest
import asyncio
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPoolfrom app.main import app
from app.models.database import Base, get_db# 测试数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"engine = create_engine(SQLALCHEMY_DATABASE_URL,connect_args={"check_same_thread": False},poolclass=StaticPool,
)TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)@pytest.fixture(scope="function")
def db_session():"""创建测试数据库会话"""Base.metadata.create_all(bind=engine)session = TestingSessionLocal()try:yield sessionfinally:session.close()Base.metadata.drop_all(bind=engine)@pytest.fixture(scope="function")
def client(db_session):"""创建测试客户端"""def override_get_db():try:yield db_sessionfinally:passapp.dependency_overrides[get_db] = override_get_dbwith TestClient(app) as test_client:yield test_clientapp.dependency_overrides.clear()@pytest.fixture(scope="session")
def event_loop():"""创建事件循环"""loop = asyncio.get_event_loop_policy().new_event_loop()yield looploop.close()

tests/test_chat.py:

import pytest
from app.models.schemas import ChatRequestdef test_health_check(client):"""测试健康检查端点"""response = client.get("/health")assert response.status_code == 200data = response.json()assert data["status"] == "healthy"def test_chat_endpoint(client):"""测试聊天端点"""# 注意:需要先设置认证或跳过认证request_data = {"message": "Hello, test message","stream": False}response = client.post("/api/v1/chat", json=request_data)# 由于认证,这里可能会返回401assert response.status_code in [200, 401, 403]def test_chat_streaming(client):"""测试流式聊天"""request_data = {"message": "Hello, streaming test","stream": True}response = client.post("/api/v1/chat", json=request_data)assert response.status_code in [200, 401, 403]@pytest.mark.asyncio
async def test_tool_registry():"""测试工具注册表"""from app.tools.registry import tool_registryfrom app.tools.base import CalculatorTool# 测试工具注册calculator = CalculatorTool()tool_registry.register_tool(calculator)# 测试工具获取tool = tool_registry.get_tool("calculator")assert tool is not Noneassert tool.name == "calculator"def test_metrics_endpoint(client):"""测试指标端点"""response = client.get("/metrics")assert response.status_code == 200assert "http_requests_total" in response.text

步骤13:部署配置增强

docker-compose.prod.yml:

version: '3.8'services:ai-agent:build: .ports:- "8000:8000"environment:- DEBUG=false- DATABASE_URL=postgresql://user:pass@db:5432/ai_agent- REDIS_URL=redis://redis:6379- SECRET_KEY=your-production-secret-keydepends_on:- db- redisvolumes:- ./logs:/app/logsrestart: unless-stoppedhealthcheck:test: ["CMD", "curl", "-f", "http://localhost:8000/health"]interval: 30stimeout: 10sretries: 3db:image: postgres:13environment:- POSTGRES_DB=ai_agent- POSTGRES_USER=user- POSTGRES_PASSWORD=passvolumes:- postgres_data:/var/lib/postgresql/data- ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sqlrestart: unless-stoppedhealthcheck:test: ["CMD-SHELL", "pg_isready -U user -d ai_agent"]interval: 30stimeout: 10sretries: 3redis:image: redis:7-alpinevolumes:- redis_data:/datarestart: unless-stoppedhealthcheck:test: ["CMD", "redis-cli", "ping"]interval: 30stimeout: 10sretries: 3nginx:image: nginx:alpineports:- "80:80"- "443:443"volumes:- ./nginx/nginx.conf:/etc/nginx/nginx.conf- ./ssl:/etc/nginx/ssldepends_on:- ai-agentrestart: unless-stoppedvolumes:postgres_data:redis_data:

nginx/nginx.conf:
events {
worker_connections 1024;
}

http {
upstream ai_agent {
server ai-agent:8000;
}

server {listen 80;server_name your-domain.com;return 301 https://$server_name$request_uri;
}server {listen 443 ssl http2;server_name your-domain.com;ssl_certificate /etc/nginx/ssl/cert.pem;ssl_certificate_key /etc/nginx/ssl/key.pem;# 安全头add_header X-Frame-Options DENY;add_header X-Content-Type-Options nosniff;add_header X-XSS-Protection "1; mode=block";# 速率限制limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;location / {limit_req zone=api burst=20 nodelay;proxy_pass http://ai_agent;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;}location /metrics {# 内部访问指标allow 127.0.0.1;deny all;proxy_pass http://ai_agent;}location /health {proxy_pass http://ai_agent;access_log off;}
}

}


步骤14:环境配置和脚本.env.example:
```bash
# 应用配置
DEBUG=false
APP_NAME="AI Agent Service"
VERSION=1.0.0# 安全配置
SECRET_KEY=your-super-secret-key-here-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30# 数据库配置
DATABASE_URL=postgresql://user:pass@localhost:5432/ai_agent
REDIS_URL=redis://localhost:6379# LLM配置
OPENAI_API_KEY=your-openai-api-key
OPENAI_BASE_URL=https://api.openai.com/v1
MODEL_NAME=gpt-3.5-turbo# 速率限制
RATE_LIMIT_PER_MINUTE=60

scripts/start.sh:

#!/bin/bash# AI Agent服务启动脚本set -eecho "🚀 Starting AI Agent Service..."# 检查环境变量
if [ -z "$DATABASE_URL" ]; thenecho "❌ DATABASE_URL is not set"exit 1
fiif [ -z "$OPENAI_API_KEY" ]; thenecho "⚠️  OPENAI_API_KEY is not set, some features may not work"
fi# 等待数据库就绪
echo "⏳ Waiting for database..."
while ! nc -z $(echo $DATABASE_URL | sed 's/.*@//' | cut -d: -f1) $(echo $DATABASE_URL | sed 's/.*://' | cut -d/ -f1); dosleep 1
doneecho "✅ Database is ready"# 运行数据库迁移
echo "📦 Running database migrations..."
python -c "
from app.models.database import create_tables
create_tables()
print('Database tables created/verified')
"# 启动服务
echo "🎯 Starting Uvicorn server..."
exec uvicorn app.main:app \--host 0.0.0.0 \--port 8000 \--workers 4 \--log-level info

scripts/deploy.sh:

#!/bin/bash# 部署脚本set -eENV=${1:-staging}
IMAGE_TAG="ai-agent-service:latest"echo "🚀 Deploying AI Agent Service to $ENV"# 构建Docker镜像
echo "📦 Building Docker image..."
docker build -t $IMAGE_TAG .# 根据环境选择配置
if [ "$ENV" = "production" ]; thenCOMPOSE_FILE="docker-compose.prod.yml"
elseCOMPOSE_FILE="docker-compose.yml"
fi# 部署服务
echo "🎯 Deploying services..."
docker-compose -f $COMPOSE_FILE down
docker-compose -f $COMPOSE_FILE up -decho "✅ Deployment completed!"
echo "📊 Health check: curl http://localhost/health"

步骤15:完整的使用指南

README.md:

# AI Agent微服务脚手架企业级AI Agent微服务框架,基于FastAPI构建。## 特性- 🚀 高性能异步架构
- 🔐 JWT身份认证和RBAC授权
- 🛠️ 可扩展的工具系统
- 📊 完整的监控和指标
- 🐳 Docker容器化部署
- 📝 结构化日志记录
- 🧪 完整的测试覆盖
- ⚡ 速率限制和缓存
- 🔒 安全最佳实践## 快速开始### 环境设置1. 复制环境配置:
```bash
cp .env.example .env
  1. 编辑.env文件,配置数据库和API密钥。

开发环境

# 安装依赖pip install -r requirements.txt# 启动开发服务器uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

Docker部署

# 开发环境docker-compose up -d# 生产环境docker-compose -f docker-compose.prod.yml up -d

API文档

启动服务后访问:

  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc
  • 指标: http://localhost:8000/metrics

项目结构

app/
├── api/           # API路由和端点
├── core/          # 核心配置和工具
├── models/        # 数据模型
├── services/      # 业务逻辑
├── tools/         # 工具系统
└── utils/         # 工具函数## 开发指南### 添加新工具1. 在`app/tools/`中创建新工具类
2. 继承`BaseTool`基类
3. 在`app/tools/registry.py`中注册工具### 添加新API端点1. 在`app/api/endpoints/`中创建新路由文件
2. 在`app/main.py`中注册路由## 监控和运维服务提供完整的监控指标:
- HTTP请求统计
- LLM调用监控
- 工具使用统计
- 性能指标## 许可证MIT License

完整的启动流程

  1. 环境准备:
# 克隆项目
git clone <your-repo>
cd ai-agent-service# 设置环境
cp .env.example .env
# 编辑.env文件配置你的设置2. 安装和运行:
# 方式1: 使用Docker(推荐)
docker-compose up -d# 方式2: 本地开发
pip install -r requirements.txt
uvicorn app.main:app --reload3. 验证部署:
curl http://localhost:8000/health
curl http://localhost:8000/docs

这个完整的脚手架提供了企业级AI Agent微服务所需的所有组件。您可以根据具体需求进行定制和扩展。

http://www.dtcms.com/a/573852.html

相关文章:

  • 青岛网站域名备案查询个人网站做哪些内容
  • Leet热题100--208. 实现 Trie (前缀树)--中等
  • 应用分析网站网站社区建设
  • 【上海海事大学主办】第六届智能电网与能源工程国际学术会议(SGEE 2025)
  • 每月网站开发费用网站改版如何做301
  • Will Al Replace Humans? From Stage to Symbiosis.
  • Springcloud核心组件之Sentinel详解
  • 饰品企业网站建设程序开发的步骤
  • 聊城网站建设科技公司网站自己的
  • 计算机视觉·TagCLIP
  • 做网站流量是什么wordpress自定义表
  • 静态页优秀网站南通网站制作公司
  • C# 串口通讯中 SerialPort 类的关键参数和使用方法
  • STM32利用AES加密数据、解密数据
  • STM32在LVGL上实现移植FatFs文件系统(保姆级详细教程)
  • 二十三、STM32的ADC(三)(ADC多通道)
  • 刷网站建设免费模板下载个人简历
  • MTK平台WiFi学习--BeToCQ 测试须知
  • 【C++】哈希表详解(开放定址法+哈希桶)
  • 住房与住房建设部网站首页热力图 wordpress
  • MySQL 锁详解
  • Spring AOP和事物
  • 系列文章<九>(从LED显示屏的偏色问题问题到手机影像):从LED冬奥会、奥运会及春晚等大屏,到手机小屏,快来挖一挖里面都有什么
  • linux上从 MySQL 官方二进制包安装 MySQL
  • 网络通信---OSI七层模型
  • 淘宝客如何做淘宝客网站网站特色分析
  • 问题:编译jetson-inference,找不到-lnpymath
  • redis集群下如何使用lua脚本
  • 剪贴板管理工具,高效管理复制内容
  • 2.1 python装饰器基础:从语法糖到高阶函数