当前位置: 首页 > news >正文

FastAPI × SQLAlchemy 2.0 Async:从“能跑”到“可压测”的完整工程实践

一句话总结
SQLAlchemy 2.0 AsyncIO 模式,把 FastAPI 的并发优势兑现成 真正的数据库吞吐;再叠上连接池、事务、迁移、测试四件套,直接上线不踩坑


1. 为什么要“异步 ORM”?

场景同步 SQLAlchemy异步 SQLAlchemy
100 个并发上传开 100 线程 → 100 个连接 → DB 被打爆单线程 20 连接即可跑满 CPU
请求等待 I/O线程上下文切换 8 ms协程切换 0.3 ms
代码风格到处 run_in_threadpool原生 await 一路到底

一句话:同步模式把 FastAPI 的异步事件循环拖回解放前


2. 最小可运行版本(MVP)

安装依赖

pip install "fastapi[all]" \"sqlalchemy[asyncio]>=2.0" \asyncpg alembic pydantic[email]

数据库以 PostgreSQL 为例,MySQL 换成 asyncmy 即可。

项目骨架

app/├─ api/│   └─ user.py├─ core/│   ├─ db.py│   └─ config.py├─ models/│   └─ user.py├─ schemas/│   └─ user.py└─ main.py

3. 核心代码:Session 生命周期一条龙

app/core/config.py

from pydantic import BaseSettingsclass Settings(BaseSettings):database_url: str = "postgresql+asyncpg://user:pass@localhost:5432/demo"pool_size: int = 20max_overflow: int = 0echo_sql: bool = Falseclass Config:env_file = ".env"settings = Settings()

app/core/db.py

from sqlalchemy.ext.asyncio import (AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
)class AsyncDatabaseSession:def __init__(self, url: str, *, pool_size: int = 20, max_overflow: int = 0, echo: bool = False):self.engine: AsyncEngine = create_async_engine(url,pool_size=pool_size,max_overflow=max_overflow,echo=echo,pool_pre_ping=True,          # 心跳保活)self.session_factory = async_sessionmaker(self.engine,expire_on_commit=False,      # 防止懒加载异常class_=AsyncSession,)async def close(self):await self.engine.dispose()db = AsyncDatabaseSession(settings.database_url,pool_size=settings.pool_size,echo=settings.echo_sql,
)

main.py

from fastapi import FastAPI
from app.core.db import db
from app.api import userapp = FastAPI(title="Async SQLAlchemy Demo")app.include_router(user.router)@app.on_event("startup")
async def startup():# 可选:建表# from app.models import Base# async with db.engine.begin() as conn:#     await conn.run_sync(Base.metadata.create_all)pass@app.on_event("shutdown")
async def shutdown():await db.close()

4. 依赖注入:每次请求一个 Session,自动回滚

app/core/deps.py

from typing import AsyncGenerator
from app.core.db import db
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Dependsasync def get_session() -> AsyncGenerator[AsyncSession, None]:async with db.session_factory() as session:try:yield sessionexcept Exception:await session.rollback()raisefinally:await session.close()

yield + rollback 保证请求级事务;抛异常自动回滚,正常则 commit。


5. Model / Schema / CRUD 一条龙

app/models/user.py

from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass Base(DeclarativeBase):passclass User(Base):__tablename__ = "users"id: Mapped[int] = mapped_column(primary_key=True, index=True)email: Mapped[str] = mapped_column(String(320), unique=True, index=True)full_name: Mapped[str | None]

app/schemas/user.py

from pydantic import BaseModel, EmailStrclass UserCreate(BaseModel):email: EmailStrfull_name: str | None = Noneclass UserRead(BaseModel):id: intemail: EmailStrfull_name: str | Noneclass Config:orm_mode = True

app/api/user.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models import User
from app.schemas import UserCreate, UserRead
from app.core.deps import get_sessionrouter = APIRouter(prefix="/users", tags=["users"])@router.post("", response_model=UserRead)
async def create_user(payload: UserCreate, session: AsyncSession = Depends(get_session)):user = User(**payload.dict())session.add(user)await session.flush()          # 获取 idawait session.commit()await session.refresh(user)return user@router.get("/{uid}", response_model=UserRead)
async def read_user(uid: int, session: AsyncSession = Depends(get_session)):user = await session.get(User, uid)if not user:raise HTTPException(404, "User not found")return user

6. 迁移:Alembic 同样能异步

初始化

alembic init -t async migrations

修改 alembic.ini 中的 sqlalchemy.urlpostgresql+asyncpg://...

migrations/env.py

from app.core.config import settings
from app.models import Base
target_metadata = Base.metadatadef do_run_migrations(connection):context.configure(connection=connection, target_metadata=target_metadata)with context.begin_transaction():context.run_migrations()async def run_async_migrations():from sqlalchemy.ext.asyncio import AsyncEngineconnectable = AsyncEngine(create_async_engine(settings.database_url))async with connectable.connect() as connection:await connection.run_sync(do_run_migrations)await connectable.dispose()

生成 / 升级

alembic revision --autogenerate -m "init"
alembic upgrade head

7. 测试:pytest-asyncio + 异步数据库事务

tests/conftest.py

import pytest
from httpx import AsyncClient
from app.main import app
from app.core.db import db as db_instance
from sqlalchemy.pool import StaticPool
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine@pytest.fixture(scope="session")
async def engine() -> AsyncEngine:# 内存 SQLite 也可以异步,但 PostgreSQL 更真实engine = create_async_engine("postgresql+asyncpg://test:test@localhost:5432/test",poolclass=StaticPool,)yield engineawait engine.dispose()@pytest.fixture
async def session(engine: AsyncEngine):conn = await engine.begin()sess = db_instance.session_factory(bind=conn)yield sessawait sess.close()await conn.rollback()await conn.close()@pytest.fixture
async def client() -> AsyncGenerator[AsyncClient, None]:async with AsyncClient(app=app, base_url="http://test") as c:yield c

tests/test_user.py

import pytest
from sqlalchemy import select
from app.models import User@pytest.mark.asyncio
async def test_create_user(client, session):res = await client.post("/users", json={"email": "a@b.com", "full_name": "abc"})assert res.status_code == 201data = res.json()assert data["email"] == "a@b.com"user = await session.get(User, data["id"])assert user is not None

8. 性能调优 checklist

参数建议值说明
pool_sizeCPU 核心 × 220 并发已能压到 10k RPS
max_overflow0防止突发连接打爆 DB
pool_pre_ping=True必须网络闪断后自动重连
expire_on_commit=False必须否则 commit 后属性失效
echo=False生产关闭减少序列化开销

9. 常见错误速查表

异常原因解法
greenlet_spawn has not been called用了同步引擎create_async_engine
DetachedInstanceError会话关闭后访问属性expire_on_commit=False + await session.refresh()
InterfaceError: connection already closed协程间复用 Session一个请求一个 Session,禁止全局单例
ImportError: asyncmyMySQL 驱动未装pip install asyncmy

10. 结语

FastAPI 的异步生态里,数据库是最后一道闸门
用上 SQLAlchemy 2.0 AsyncIO 之后,I/O 等待不再是瓶颈,压测曲线直接多一个量级。
把本文的 db.py + deps.py 复制走,10 分钟就能让老项目原地起飞。Happy async coding!

http://www.dtcms.com/a/593772.html

相关文章:

  • 伪装图像生成之——GAN与Diffusion
  • 分布式系统中的CAP理论和BASE理论
  • 做网站建设的怎么赢利网站建设多少费用
  • Python字典--第1关:元组使用:这份菜单能修改吗?
  • 【S2ANet】Align Deep Features for Oriented Object Detection 译读笔记
  • 二维数组及经典案例
  • 【VMware Workstation】虚拟机网络配置流程+MobaXterm连接步骤
  • 西安微网站建设wordpress视频预览插件
  • CodeBuddy + GLM-4.6:儿童诗词宝典全栈开发实战
  • PSU过程11.2.0.4.250415
  • Nanopb基本概念
  • 微网站开发平台 知乎东家乐装修公司简介
  • 基于交替方向乘子法(ADMM)的RPCA MATLAB实现
  • redis删除一个键用del还是unlink
  • 用vue.js做网站百度区域代理
  • 好人一生平安网站哪个好抖音代运营培训
  • 前端基础面试题(Css,Html,Js,Ts)
  • 使用c#强大的SourceGenerator现对象的深克隆
  • 企业移动网站建设网站文件夹命名规则
  • 【动态链接库】一、VS下基本制作与使用
  • 百度网站排名规则长春百度快速优化
  • xpert AI工作流工具本地部署
  • SP30N06NK 30V N沟道MOSFET技术解析与应用指南
  • 深圳建站公司推荐国内平台有哪些
  • 使用DFSDM模拟看门狗做过流保护以及封波应用 LAT1612
  • 远程传输大文件的软件有哪些?
  • 北京建设官方网站渠道网络大厦
  • 鸿蒙 Next 如何使用 AVRecorder 从0到1实现视频录制功能(ArkTS)
  • 动态背景网站北京网站设计制作费用
  • LSTM模型做分类任务2(PyTorch实现)