FastAPI × SQLAlchemy 2.0 Async:从“能跑”到“可压测”的完整工程实践
一句话总结
用 SQLAlchemy 2.0 AsyncIO 模式,把 FastAPI 的并发优势兑现成 真正的数据库吞吐;再叠上连接池、事务、迁移、测试四件套,直接上线不踩坑。
1. 为什么要“异步 ORM”?
| 场景 | 同步 SQLAlchemy | 异步 SQLAlchemy |
|---|---|---|
| 100 个并发上传 | 开 100 线程 → 100 个连接 → DB 被打爆 | 单线程 20 连接即可跑满 CPU |
| 请求等待 I/O | 线程上下文切换 8 ms | 协程切换 0.3 ms |
| 代码风格 | 到处 run_in_threadpool | 原生 await 一路到底 |
一句话:同步模式把 FastAPI 的异步事件循环拖回解放前。
2. 最小可运行版本(MVP)
安装依赖
pip install "fastapi[all]" \"sqlalchemy[asyncio]>=2.0" \asyncpg alembic pydantic[email]
数据库以 PostgreSQL 为例,MySQL 换成
asyncmy即可。
项目骨架
app/├─ api/│ └─ user.py├─ core/│ ├─ db.py│ └─ config.py├─ models/│ └─ user.py├─ schemas/│ └─ user.py└─ main.py
3. 核心代码:Session 生命周期一条龙
app/core/config.py
from pydantic import BaseSettingsclass Settings(BaseSettings):database_url: str = "postgresql+asyncpg://user:pass@localhost:5432/demo"pool_size: int = 20max_overflow: int = 0echo_sql: bool = Falseclass Config:env_file = ".env"settings = Settings()
app/core/db.py
from sqlalchemy.ext.asyncio import (AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
)class AsyncDatabaseSession:def __init__(self, url: str, *, pool_size: int = 20, max_overflow: int = 0, echo: bool = False):self.engine: AsyncEngine = create_async_engine(url,pool_size=pool_size,max_overflow=max_overflow,echo=echo,pool_pre_ping=True, # 心跳保活)self.session_factory = async_sessionmaker(self.engine,expire_on_commit=False, # 防止懒加载异常class_=AsyncSession,)async def close(self):await self.engine.dispose()db = AsyncDatabaseSession(settings.database_url,pool_size=settings.pool_size,echo=settings.echo_sql,
)
main.py
from fastapi import FastAPI
from app.core.db import db
from app.api import userapp = FastAPI(title="Async SQLAlchemy Demo")app.include_router(user.router)@app.on_event("startup")
async def startup():# 可选:建表# from app.models import Base# async with db.engine.begin() as conn:# await conn.run_sync(Base.metadata.create_all)pass@app.on_event("shutdown")
async def shutdown():await db.close()
4. 依赖注入:每次请求一个 Session,自动回滚
app/core/deps.py
from typing import AsyncGenerator
from app.core.db import db
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Dependsasync def get_session() -> AsyncGenerator[AsyncSession, None]:async with db.session_factory() as session:try:yield sessionexcept Exception:await session.rollback()raisefinally:await session.close()
用
yield+rollback保证请求级事务;抛异常自动回滚,正常则 commit。
5. Model / Schema / CRUD 一条龙
app/models/user.py
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass Base(DeclarativeBase):passclass User(Base):__tablename__ = "users"id: Mapped[int] = mapped_column(primary_key=True, index=True)email: Mapped[str] = mapped_column(String(320), unique=True, index=True)full_name: Mapped[str | None]
app/schemas/user.py
from pydantic import BaseModel, EmailStrclass UserCreate(BaseModel):email: EmailStrfull_name: str | None = Noneclass UserRead(BaseModel):id: intemail: EmailStrfull_name: str | Noneclass Config:orm_mode = True
app/api/user.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models import User
from app.schemas import UserCreate, UserRead
from app.core.deps import get_sessionrouter = APIRouter(prefix="/users", tags=["users"])@router.post("", response_model=UserRead)
async def create_user(payload: UserCreate, session: AsyncSession = Depends(get_session)):user = User(**payload.dict())session.add(user)await session.flush() # 获取 idawait session.commit()await session.refresh(user)return user@router.get("/{uid}", response_model=UserRead)
async def read_user(uid: int, session: AsyncSession = Depends(get_session)):user = await session.get(User, uid)if not user:raise HTTPException(404, "User not found")return user
6. 迁移:Alembic 同样能异步
初始化
alembic init -t async migrations
修改 alembic.ini 中的 sqlalchemy.url 为 postgresql+asyncpg://...
migrations/env.py
from app.core.config import settings
from app.models import Base
target_metadata = Base.metadatadef do_run_migrations(connection):context.configure(connection=connection, target_metadata=target_metadata)with context.begin_transaction():context.run_migrations()async def run_async_migrations():from sqlalchemy.ext.asyncio import AsyncEngineconnectable = AsyncEngine(create_async_engine(settings.database_url))async with connectable.connect() as connection:await connection.run_sync(do_run_migrations)await connectable.dispose()
生成 / 升级
alembic revision --autogenerate -m "init"
alembic upgrade head
7. 测试:pytest-asyncio + 异步数据库事务
tests/conftest.py
import pytest
from httpx import AsyncClient
from app.main import app
from app.core.db import db as db_instance
from sqlalchemy.pool import StaticPool
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine@pytest.fixture(scope="session")
async def engine() -> AsyncEngine:# 内存 SQLite 也可以异步,但 PostgreSQL 更真实engine = create_async_engine("postgresql+asyncpg://test:test@localhost:5432/test",poolclass=StaticPool,)yield engineawait engine.dispose()@pytest.fixture
async def session(engine: AsyncEngine):conn = await engine.begin()sess = db_instance.session_factory(bind=conn)yield sessawait sess.close()await conn.rollback()await conn.close()@pytest.fixture
async def client() -> AsyncGenerator[AsyncClient, None]:async with AsyncClient(app=app, base_url="http://test") as c:yield c
tests/test_user.py
import pytest
from sqlalchemy import select
from app.models import User@pytest.mark.asyncio
async def test_create_user(client, session):res = await client.post("/users", json={"email": "a@b.com", "full_name": "abc"})assert res.status_code == 201data = res.json()assert data["email"] == "a@b.com"user = await session.get(User, data["id"])assert user is not None
8. 性能调优 checklist
| 参数 | 建议值 | 说明 |
|---|---|---|
pool_size | CPU 核心 × 2 | 20 并发已能压到 10k RPS |
max_overflow | 0 | 防止突发连接打爆 DB |
pool_pre_ping=True | 必须 | 网络闪断后自动重连 |
expire_on_commit=False | 必须 | 否则 commit 后属性失效 |
echo=False | 生产关闭 | 减少序列化开销 |
9. 常见错误速查表
| 异常 | 原因 | 解法 |
|---|---|---|
greenlet_spawn has not been called | 用了同步引擎 | create_async_engine |
DetachedInstanceError | 会话关闭后访问属性 | expire_on_commit=False + await session.refresh() |
InterfaceError: connection already closed | 协程间复用 Session | 一个请求一个 Session,禁止全局单例 |
ImportError: asyncmy | MySQL 驱动未装 | pip install asyncmy |
10. 结语
FastAPI 的异步生态里,数据库是最后一道闸门。
用上 SQLAlchemy 2.0 AsyncIO 之后,I/O 等待不再是瓶颈,压测曲线直接多一个量级。
把本文的 db.py + deps.py 复制走,10 分钟就能让老项目原地起飞。Happy async coding!
