ContextVars 在 FastAPI 中的使用
ContextVars 在 FastAPI 依赖项 Depends 中使用:只在异步依赖项中使用 ContextVars
相关讨论和实践:
- https://github.com/fastapi/fastapi/discussions/13382
- https://github.com/fastapi/fastapi/discussions/8628
- https://github.com/fastapi/fastapi/discussions/9006
FastAPI 是一个异步优先框架,其假定同步依赖项中包含阻塞代码(否则为什么不使用异步),会将同步依赖项放到线程池执行(异步依赖项是在当前事件循环执行),以避免阻塞当前事件循环。这样对同步依赖项和异步依赖项不同的处理方式导致依赖项中使用 ContextVars 时,异步依赖项中 ContextVars 符合预期逻辑,而同步依赖项由于依赖在额外线程池生成,ContextVars 不符合预期。
验证代码:
import asyncio
import time
from contextvars import ContextVar
from random import randint
from typing import Annotated, Any
import uvicorn
from fastapi import Depends, FastAPI
app = FastAPI()
test_var: ContextVar[str | None] = ContextVar("test_var", default=None)
def sf() -> tuple[str | None, str]:
"""
Sync fixture to set `test_var`.
"""
old_value = test_var.get()
new_value = f"sf-{randint(0, 100):03d}"
test_var.set(new_value)
return old_value, new_value
async def af() -> tuple[str | None, str]:
"""
Async fixture to set `test_var`.
"""
old_value = test_var.get()
new_value = f"af-{randint(0, 100):03d}"
test_var.set(new_value)
return old_value, new_value
@app.get("/sr-{oid}")
def get_sr(oid: int) -> dict[str, Any]:
"""
Synchronous request with test_var.
"""
old_value = test_var.get()
test_var.set(f"sr-{oid:03d}")
sleep = randint(1, 5)
time.sleep(sleep)
return {
"input": f"{oid:03d}",
"old": old_value,
"new": test_var.get(),
"sleep": sleep,
}
@app.get("/ar-{oid}")
async def get_ar(oid: int) -> dict[str, Any]:
"""
Asynchronous request with explicit test_var.
"""
old_value = test_var.get()
test_var.set(f"ar-{oid:03d}")
sleep = randint(1, 5)
await asyncio.sleep(sleep)
return {
"input": f"{oid:03d}",
"old": old_value,
"new": test_var.get(),
"sleep": sleep,
}
@app.get("/sr/sf")
def get_sr_sf(
old_new: Annotated[tuple[str | None, None], Depends(sf)],
) -> dict[str, Any]:
"""
Synchronous request with sync fixture.
"""
old, new = old_new
sleep = randint(1, 5)
time.sleep(sleep)
return {
"old": old,
"new": new,
"test_var": test_var.get(),
"sleep": sleep,
}
@app.get("/sr/af")
def get_sr_af(
old_new: Annotated[tuple[str | None, None], Depends(af)],
) -> dict[str, Any]:
"""
Synchronous request with async fixture.
"""
old, new = old_new
sleep = randint(1, 5)
time.sleep(sleep)
return {
"old": old,
"new": new,
"test_var": test_var.get(),
"sleep": sleep,
}
@app.get("/ar/sf")
async def get_ar_sf(
old_new: Annotated[tuple[str | None, None], Depends(sf)],
) -> dict[str, Any]:
"""
Asynchronous request with sync fixture.
"""
old, new = old_new
sleep = randint(1, 5)
await asyncio.sleep(sleep)
return {
"old": old,
"new": new,
"test_var": test_var.get(),
"sleep": sleep,
}
@app.get("/ar/af")
async def get_ar_af(
old_new: Annotated[tuple[str | None, None], Depends(af)],
) -> dict[str, Any]:
"""
Asynchronous request with async fixture.
"""
old, new = old_new
sleep = randint(1, 5)
await asyncio.sleep(sleep)
return {
"old": old,
"new": new,
"test_var": test_var.get(),
"sleep": sleep,
}
if __name__ == "__main__":
test_var.set("init")
uvicorn.run(app, host="localhost", port=8000)
数据库连接使用 ContextVars 而不是 SQLAlchemy async_scoped_session
在 FastAPI 中通常会使用 Depends 将数据库连接注入 endpoint,但是若 endpoint 函数调用的某个函数需要 endpoint 中的这个数据库连接,朴素的方法是通过函数参数传递,这样就导致一个问题,只要被调用方需要调用方的数据库连接,就必须增加一个数据库连接的函数入参,此时可以使用 ContextVars 和 SQLAlchemy async_scoped_session 优化这一点。
https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#asyncio-scoped-session
SQLAlchemy generally does not recommend the “scoped” pattern for new development as it relies upon mutable global state that must also be explicitly torn down when work within the thread or task is complete. Particularly when using asyncio, it’s likely a better idea to pass the AsyncSession directly to the awaitable functions that need it.
SQLAlchemy 不推荐新项目使用 scoped 方式获取数据库连接。故使用 ContextVars。
题外
FastAPI 中依赖注入问题
FastAPI 原生支持的 Depends 只能在 Web endpoint 函数中使用,但其余场景(Worker、CLI、Testing)无法使用该依赖注入方式。
相关讨论和实践:
- https://github.com/fastapi/fastapi/discussions/7720
fastapi-injectable:
https://j-sui.com/2024/10/26/use-fastapi-depends-outside-fastapi-routes/
https://github.com/JasperSui/fastapi-injectable
依赖注入框架
https://github.com/sfermigier/awesome-dependency-injection-in-python
异步测试
https://anyio.readthedocs.io/en/stable/testing.html
同步与异步函数互转
https://juejin.cn/post/7091839981953482789
另外一个使用案例
https://blog.csdn.net/qq_36815042/article/details/129308934