当前位置: 首页 > news >正文

基于FastAPI框架的日志模块设计

以下是一个基于FastAPI框架设计的日志模块,结合SQLite数据库实现增删改查功能的完整实现方案:


1. 项目结构

your_project/
├── app/
│   ├── logs/               # 日志模块目录
│   │   ├── models.py       # 数据库模型定义
│   │   ├── schemas.py      # Pydantic数据验证模型
│   │   ├── crud.py         # 数据库操作逻辑
│   │   └── routes.py       # API路由定义
│   ├── database.py        # 数据库连接配置
│   └── main.py            # FastAPI主入口
├── logs.db                 # SQLite数据库文件
└── requirements.txt

2. 核心代码实现

2.1 数据库配置 (app/database.py)
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmakerSQLALCHEMY_DATABASE_URL = "sqlite:///./logs.db"engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()def get_db():db = SessionLocal()try:yield dbfinally:db.close()
2.2 数据模型 (app/logs/models.py)
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime
from database import Baseclass Log(Base):__tablename__ = "logs"id = Column(Integer, primary_key=True, index=True)level = Column(String(10), index=True)  # 日志级别 (INFO, WARNING, ERROR)message = Column(String(500))timestamp = Column(DateTime, default=datetime.utcnow)source = Column(String(50), default="application")  # 日志来源
2.3 Pydantic模型 (app/logs/schemas.py)
from pydantic import BaseModel
from datetime import datetimeclass LogCreate(BaseModel):level: strmessage: strsource: str = "application"class LogResponse(LogCreate):id: inttimestamp: datetimeclass Config:orm_mode = True
2.4 CRUD操作 (app/logs/crud.py)
from sqlalchemy.orm import Session
from . import models, schemasdef create_log(db: Session, log: schemas.LogCreate):db_log = models.Log(**log.dict())db.add(db_log)db.commit()db.refresh(db_log)return db_logdef get_logs(db: Session, skip: int = 0, limit: int = 100):return db.query(models.Log).offset(skip).limit(limit).all()def get_log_by_id(db: Session, log_id: int):return db.query(models.Log).filter(models.Log.id == log_id).first()def update_log(db: Session, log_id: int, log: schemas.LogCreate):db_log = db.query(models.Log).filter(models.Log.id == log_id).first()if db_log:for key, value in log.dict().items():setattr(db_log, key, value)db.commit()db.refresh(db_log)return db_logdef delete_log(db: Session, log_id: int):db_log = db.query(models.Log).filter(models.Log.id == log_id).first()if db_log:db.delete(db_log)db.commit()return Truereturn False
2.5 API路由 (app/logs/routes.py)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ..database import get_db
from . import schemas, crudrouter = APIRouter(prefix="/logs", tags=["logs"])@router.post("/", response_model=schemas.LogResponse)
def create_log_entry(log: schemas.LogCreate, db: Session = Depends(get_db)):return crud.create_log(db, log)@router.get("/", response_model=list[schemas.LogResponse])
def read_logs(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):return crud.get_logs(db, skip=skip, limit=limit)@router.get("/{log_id}", response_model=schemas.LogResponse)
def read_log(log_id: int, db: Session = Depends(get_db)):db_log = crud.get_log_by_id(db, log_id)if not db_log:raise HTTPException(status_code=404, detail="Log not found")return db_log@router.put("/{log_id}", response_model=schemas.LogResponse)
def update_log_entry(log_id: int, log: schemas.LogCreate, db: Session = Depends(get_db)):updated_log = crud.update_log(db, log_id, log)if not updated_log:raise HTTPException(status_code=404, detail="Log not found")return updated_log@router.delete("/{log_id}")
def delete_log_entry(log_id: int, db: Session = Depends(get_db)):success = crud.delete_log(db, log_id)if not success:raise HTTPException(status_code=404, detail="Log not found")return {"message": "Log deleted successfully"}
2.6 主入口 (app/main.py)
from fastapi import FastAPI
from .database import engine
from .logs.models import Log
from .logs.routes import router as logs_routerLog.metadata.create_all(bind=engine)app = FastAPI()
app.include_router(logs_router)@app.get("/")
def root():return {"message": "Logging System API"}

3. 使用方式

3.1 启动服务
uvicorn app.main:app --reload
3.2 API测试示例
# 创建日志
curl -X POST "http://localhost:8000/logs/" \
-H "Content-Type: application/json" \
-d '{"level": "ERROR", "message": "Database connection failed"}'# 查询所有日志
curl "http://localhost:8000/logs/"# 更新日志
curl -X PUT "http://localhost:8000/logs/1" \
-H "Content-Type: application/json" \
-d '{"level": "WARNING", "message": "Connection timeout"}'# 删除日志
curl -X DELETE "http://localhost:8000/logs/1"

4. 高级功能扩展建议

  1. 日志过滤
# 在crud.py中添加
def filter_logs(db: Session, level: str = None, source: str = None):query = db.query(models.Log)if level:query = query.filter(models.Log.level == level)if source:query = query.filter(models.Log.source == source)return query.all()
  1. 自动记录请求日志
# 在main.py中添加中间件
@app.middleware("http")
async def log_requests(request: Request, call_next):start_time = time.time()response = await call_next(request)process_time = (time.time() - start_time) * 1000log_data = {"level": "INFO","message": f"{request.method} {request.url} - {response.status_code}","source": "http"}# 异步写入日志(需配置数据库会话)with SessionLocal() as db:crud.create_log(db, schemas.LogCreate(**log_data))return response
  1. 日志分页查询
# 在routes.py中改进GET方法
@router.get("/", response_model=list[schemas.LogResponse])
def read_logs(page: int = 1,per_page: int = 20,db: Session = Depends(get_db)
):skip = (page - 1) * per_pagereturn crud.get_logs(db, skip=skip, limit=per_page)

该设计实现了完整的日志管理功能,同时保持了FastAPI的异步特性优势。通过SQLAlchemy ORM层,可以轻松切换其他数据库(如PostgreSQL/MySQL)。

相关文章:

  • STM32入门教程——GPIO输出
  • 信息瓶颈理论(Information Bottleneck Theory)中的“最小化信息”是否意味着“最大化抽象能力”?
  • 【超详细教程】安卓模拟器如何添加本地文件?音乐/照片/视频一键导入!
  • Spring的异步
  • Linux系统管理与编程19:自动部署dns
  • 激光雷达点云畸变消除:MCU vs CPU 方案详解
  • 动态类加载方式引入第三方资源jar包
  • 转运机器人可以绕障吗?
  • 前苹果首席设计官回顾了其在苹果的设计生涯、公司文化、标志性产品的背后故事
  • dockerfile: PaddleOCR hubserving api 服务
  • 物联网驱动的共享充电站系统:智能充电的实现原理与技术解析!
  • 【NextPilot日志移植】日志写入流程
  • 智能SQL优化工具集成:从概念到实践
  • 二进制中1的个数
  • JWT的介绍与在Fastapi框架中的应用
  • OpenCV 的 CUDA 模块中用于将一个多通道 GpuMat 图像拆分成多个单通道图像的函数split()
  • OSI 7层模型
  • cURL:通过URL传输数据的命令行工具库介绍
  • 51单片机引脚功能概述
  • QT5.14安装以及新建基础项目
  • 摩根士丹利:对冲基金已加码,八成投资者有意近期增配中国
  • 加强战略矿产出口全链条管控工作部署会召开
  • 2025年度十大IP!IP SH荣膺文化综合类TOP10
  • 教育部基础教育教指委:稳步推进中小学人工智能通识教育
  • 听企业聊感受,《外企聊营商》5月13日起推出
  • 礼来公布头对头研究详细结果:替尔泊肽在所有减重目标中均优于司美格鲁肽