基于FastAPI框架的日志模块设计
以下是一个基于FastAPI框架设计的日志模块,结合SQLite数据库实现增删改查功能的完整实现方案:
1. 项目结构
your_project/
├── app/
│ ├── logs/ # 日志模块目录
│ │ ├── models.py # 数据库模型定义
│ │ ├── schemas.py # Pydantic数据验证模型
│ │ ├── crud.py # 数据库操作逻辑
│ │ └── routes.py # API路由定义
│ ├── database.py # 数据库连接配置
│ └── main.py # FastAPI主入口
├── logs.db # SQLite数据库文件
└── requirements.txt
2. 核心代码实现
2.1 数据库配置 (app/database.py
)
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmakerSQLALCHEMY_DATABASE_URL = "sqlite:///./logs.db"engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()def get_db():db = SessionLocal()try:yield dbfinally:db.close()
2.2 数据模型 (app/logs/models.py
)
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime
from database import Baseclass Log(Base):__tablename__ = "logs"id = Column(Integer, primary_key=True, index=True)level = Column(String(10), index=True) # 日志级别 (INFO, WARNING, ERROR)message = Column(String(500))timestamp = Column(DateTime, default=datetime.utcnow)source = Column(String(50), default="application") # 日志来源
2.3 Pydantic模型 (app/logs/schemas.py
)
from pydantic import BaseModel
from datetime import datetimeclass LogCreate(BaseModel):level: strmessage: strsource: str = "application"class LogResponse(LogCreate):id: inttimestamp: datetimeclass Config:orm_mode = True
2.4 CRUD操作 (app/logs/crud.py
)
from sqlalchemy.orm import Session
from . import models, schemasdef create_log(db: Session, log: schemas.LogCreate):db_log = models.Log(**log.dict())db.add(db_log)db.commit()db.refresh(db_log)return db_logdef get_logs(db: Session, skip: int = 0, limit: int = 100):return db.query(models.Log).offset(skip).limit(limit).all()def get_log_by_id(db: Session, log_id: int):return db.query(models.Log).filter(models.Log.id == log_id).first()def update_log(db: Session, log_id: int, log: schemas.LogCreate):db_log = db.query(models.Log).filter(models.Log.id == log_id).first()if db_log:for key, value in log.dict().items():setattr(db_log, key, value)db.commit()db.refresh(db_log)return db_logdef delete_log(db: Session, log_id: int):db_log = db.query(models.Log).filter(models.Log.id == log_id).first()if db_log:db.delete(db_log)db.commit()return Truereturn False
2.5 API路由 (app/logs/routes.py
)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ..database import get_db
from . import schemas, crudrouter = APIRouter(prefix="/logs", tags=["logs"])@router.post("/", response_model=schemas.LogResponse)
def create_log_entry(log: schemas.LogCreate, db: Session = Depends(get_db)):return crud.create_log(db, log)@router.get("/", response_model=list[schemas.LogResponse])
def read_logs(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):return crud.get_logs(db, skip=skip, limit=limit)@router.get("/{log_id}", response_model=schemas.LogResponse)
def read_log(log_id: int, db: Session = Depends(get_db)):db_log = crud.get_log_by_id(db, log_id)if not db_log:raise HTTPException(status_code=404, detail="Log not found")return db_log@router.put("/{log_id}", response_model=schemas.LogResponse)
def update_log_entry(log_id: int, log: schemas.LogCreate, db: Session = Depends(get_db)):updated_log = crud.update_log(db, log_id, log)if not updated_log:raise HTTPException(status_code=404, detail="Log not found")return updated_log@router.delete("/{log_id}")
def delete_log_entry(log_id: int, db: Session = Depends(get_db)):success = crud.delete_log(db, log_id)if not success:raise HTTPException(status_code=404, detail="Log not found")return {"message": "Log deleted successfully"}
2.6 主入口 (app/main.py
)
from fastapi import FastAPI
from .database import engine
from .logs.models import Log
from .logs.routes import router as logs_routerLog.metadata.create_all(bind=engine)app = FastAPI()
app.include_router(logs_router)@app.get("/")
def root():return {"message": "Logging System API"}
3. 使用方式
3.1 启动服务
uvicorn app.main:app --reload
3.2 API测试示例
# 创建日志
curl -X POST "http://localhost:8000/logs/" \
-H "Content-Type: application/json" \
-d '{"level": "ERROR", "message": "Database connection failed"}'# 查询所有日志
curl "http://localhost:8000/logs/"# 更新日志
curl -X PUT "http://localhost:8000/logs/1" \
-H "Content-Type: application/json" \
-d '{"level": "WARNING", "message": "Connection timeout"}'# 删除日志
curl -X DELETE "http://localhost:8000/logs/1"
4. 高级功能扩展建议
- 日志过滤:
# 在crud.py中添加
def filter_logs(db: Session, level: str = None, source: str = None):query = db.query(models.Log)if level:query = query.filter(models.Log.level == level)if source:query = query.filter(models.Log.source == source)return query.all()
- 自动记录请求日志:
# 在main.py中添加中间件
@app.middleware("http")
async def log_requests(request: Request, call_next):start_time = time.time()response = await call_next(request)process_time = (time.time() - start_time) * 1000log_data = {"level": "INFO","message": f"{request.method} {request.url} - {response.status_code}","source": "http"}# 异步写入日志(需配置数据库会话)with SessionLocal() as db:crud.create_log(db, schemas.LogCreate(**log_data))return response
- 日志分页查询:
# 在routes.py中改进GET方法
@router.get("/", response_model=list[schemas.LogResponse])
def read_logs(page: int = 1,per_page: int = 20,db: Session = Depends(get_db)
):skip = (page - 1) * per_pagereturn crud.get_logs(db, skip=skip, limit=per_page)
该设计实现了完整的日志管理功能,同时保持了FastAPI的异步特性优势。通过SQLAlchemy ORM层,可以轻松切换其他数据库(如PostgreSQL/MySQL)。