Fastapi服务在高并发情况下大量超时问题排查
前言
实习生写的服务在压测的时候出现大量超时失败,他第一想法就是要扩大容器节点,我说软件能解决的问题,千万别堆硬件,否则老板迟早被你霍霍完蛋。通过排查,找到了代码中的问题:
在一个异步方法中调用了一个同步方法,这就导致一个容器服务一次只能支持一个并发。
我们直接用代码进行演示。
代码演示
1. 服务代码
import time
import asyncio
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpoolapp = FastAPI()# 模拟一个耗时的同步函数(比如:处理大文件、调用老系统API、复杂计算)
def slow_sync_task(task_id: int):print(f"Task {task_id} started (blocking for 3 seconds)...")time.sleep(3) # 这是阻塞操作!会卡住整个事件循环print(f"Task {task_id} finished.")return f"Result from task {task_id}"# ❌ 错误方式:在 async 接口中直接调用同步方法
@app.get("/bad")
async def bad_endpoint(task_id: int = 1):# 直接调用同步阻塞函数result = slow_sync_task(task_id)return {"method": "bad", "result": result}# ✅ 正确方式1:使用 FastAPI 的 run_in_threadpool
@app.get("/good1")
async def good_endpoint_v1(task_id: int = 1):# 将同步函数放到线程池中执行,不阻塞事件循环result = await run_in_threadpool(slow_sync_task, task_id)return {"method": "good1", "result": result}# ✅ 正确方式2:使用 asyncio.to_thread (Python 3.9+ 推荐)
@app.get("/good2")
async def good_endpoint_v2(task_id: int = 1):# 同样是非阻塞的,更现代的写法result = await asyncio.to_thread(slow_sync_task, task_id)return {"method": "good2", "result": result}# ✅ 额外对比:真正的异步非阻塞(比如模拟网络请求)
async def fake_async_io_task(task_id: int):print(f"Async Task {task_id} started (waiting 3 seconds asynchronously)...")await asyncio.sleep(3) # 这是真正的异步等待,不会阻塞print(f"Async Task {task_id} finished.")return f"Async result from task {task_id}"@app.get("/perfect")
async def perfect_endpoint(task_id: int = 1):result = await fake_async_io_task(task_id)return {"method": "perfect", "result": result}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
2. 测试代码
import asyncio
import httpx
import timeasync def request_task(url, task_id):async with httpx.AsyncClient(timeout=10) as client:print(f"发起请求 {task_id} 到 {url}")start = time.time()response = await client.get(url, params={"task_id": task_id})end = time.time()print(f"✅ 请求 {task_id} 完成,耗时: {end - start:.2f} 秒,结果: {response.json()}")return end - startasync def main():# 测试 BAD 接口(预期:串行,总时间 ~6秒)print("=== 测试 BAD 接口(阻塞) ===")start_time = time.time()tasks = [request_task("http://127.0.0.1:8000/bad", 1),request_task("http://127.0.0.1:8000/bad", 2),]durations = await asyncio.gather(*tasks)total_time = time.time() - start_timeprint(f"🔴 BAD 接口:两个请求总耗时: {total_time:.2f} 秒\n")# 等待一下,避免端口占用await asyncio.sleep(1)# 测试 GOOD 接口(预期:并行,总时间 ~3秒)print("=== 测试 GOOD1 接口(非阻塞) ===")start_time = time.time()tasks = [request_task("http://127.0.0.1:8000/good1", 1),request_task("http://127.0.0.1:8000/good1", 2),]durations = await asyncio.gather(*tasks)total_time = time.time() - start_timeprint(f"🟢 GOOD1 接口:两个请求总耗时: {total_time:.2f} 秒\n")if __name__ == "__main__":asyncio.run(main())
结果:
=== 测试 BAD 接口(阻塞) ===
发起请求 1 到 http://127.0.0.1:8000/bad
发起请求 2 到 http://127.0.0.1:8000/bad
✅ 请求 1 完成,耗时: 3.01 秒,结果: ...
✅ 请求 2 完成,耗时: 3.00 秒,结果: ...
🔴 BAD 接口:两个请求总耗时: 6.02 秒=== 测试 GOOD1 接口(非阻塞) ===
发起请求 1 到 http://127.0.0.1:8000/good1
发起请求 2 到 http://127.0.0.1:8000/good1
✅ 请求 1 完成,耗时: 3.01 秒,结果: ...
✅ 请求 2 完成,耗时: 3.02 秒,结果: ...
🟢 GOOD1 接口:两个请求总耗时: 3.03 秒
3. 总结和建议
- 诊断问题:首先确认哪个同步方法是瓶颈。
- 优先选择异步库:如果是 I/O 操作(网络、数据库),首选方案二是使用对应的异步库(如 httpx, asyncpg)。这是性能最好、最符合异步哲学的方式。
- 通用解决方案:对于无法避免的同步代码(无论是 I/O 还是 CPU,但特别是 I/O),使用 run_in_threadpool 或 asyncio.to_thread。这是 FastAPI 官方推荐的处理遗留同步代码的方法。
- CPU 密集型特殊处理:对于明确的 CPU 密集型任务,考虑 ProcessPoolExecutor。
- 避免混合:绝对不要在 async 路由中直接调用 time.sleep()、requests.get() 等阻塞函数。
通过以上修改,你的 FastAPI 接口就能恢复其高并发处理能力了。
run_in_threadpool vs asyncio.to_thread
run_in_threadpool 和 asyncio.to_thread 的最终效果几乎完全相同——都是将一个同步阻塞函数放到线程中执行,避免阻塞事件循环。但它们在来源、实现和使用场景上有一些关键区别。
1. 区别
| 特性 | run_in_threadpool (FastAPI) | asyncio.to_thread (Python 内置) |
|---|---|---|
| 来源 | FastAPI 框架提供 | Python 3.9+ 标准库内置 (asyncio) |
| 底层 | 基于 concurrent.futures.ThreadPoolExecutor | 基于 loop.run_in_executor |
| Python 版本要求 | 任何支持 FastAPI 的版本 | Python 3.9+ |
| 是否需要额外依赖 | 需要 fastapi | 不需要,标准库 |
| 控制粒度 | 使用 FastAPI 全局线程池 | 可以更灵活地控制(理论上) |
| 推荐程度 | FastAPI 场景下兼容性好 | Python 3.9+ 推荐使用 |
2. 建议
- 如果你在写新的 FastAPI 项目,并且使用 Python 3.9+,请优先使用 asyncio.to_thread。
- 如果你需要兼容旧版本 Python,或者已经在用 FastAPI 的生态,run_in_threadpool 依然是一个安全可靠的选择。
可以把 asyncio.to_thread 看作是 run_in_threadpool 的“官方标准版”。
Python异步和Java异步
这里是实习生提出的一个问题:
Java里面就没有协程的概念,一说异步,大家都认为的开启另一个线程,Python的异步怎么又指的是协程?
🟩 Java 的“异步” = 多线程 + 线程池
在 Java 世界里,由于历史原因和 JVM 的设计,没有原生的协程支持(虽然 Project Loom 在尝试改变这一点,但尚未普及)。所以:
- new Thread(() -> {…}).start()
- ExecutorService 线程池
- CompletableFuture.supplyAsync(…)
这些是 Java 实现“异步”的标准方式。
// Java 示例:异步执行任务
ExecutorService executor = Executors.newFixedThreadPool(10);Future<String> future = executor.submit(() -> {// 耗时操作(如调用外部 API)Thread.sleep(3000);return "Result";
});// 主线程可以继续做别的事
System.out.println("Task submitted, not blocked.");// 稍后获取结果
String result = future.get(); // 如果还没完成,这里会阻塞
✅ Java 开发者的理解:“异步 = 开个线程去做,不阻塞主线程” —— 完全正确!
🟥 Python 的“异步” = 协程 + 事件循环
Python 的 async/await 是从 JavaScript、C# 等语言借鉴的协程模型,而不是传统的多线程模型。
import asyncioasync def slow_task():print("Task started...")await asyncio.sleep(3) # 模拟耗时 I/O,不阻塞事件循环print("Task finished.")return "Result"async def main():print("Submitting task...")# 创建任务,但不立即等待task = asyncio.create_task(slow_task())print("Doing other work...") # 这里可以处理其他协程result = await task # 等待结果return result# 运行事件循环
asyncio.run(main())
在这个例子中:
- 整个过程可能只在一个线程中运行。
- await asyncio.sleep(3) 不是阻塞线程,而是告诉事件循环:“我可以等 3 秒,你先去处理别的协程。”
- 事件循环在等待期间可以调度成千上万个其他协程。
💡 为什么 Python 要搞这么一套“反直觉”的东西?
因为 I/O 密集型场景的性能瓶颈不在 CPU,而在等待。
- 假设你有一个 Web 服务,每个请求要查询数据库(等待 100ms)。
- 用 Java 多线程:每处理一个请求就开一个线程,1000 个并发就需要 1000 个线程,内存和上下文切换开销巨大。
- 用 Python asyncio:一个线程就可以同时管理 1000 个“等待数据库返回”的协程。当一个协程在等 DB 时,事件循环立刻切到下一个有事干的协程。
🚀 结果:Python 的单线程 asyncio 在 I/O 密集型场景下,并发能力常常超过 Java 的多线程模型。
你可以把 Python 的 asyncio 看作是一个超级高效的单线程任务调度器,它通过“协作”而不是“抢占”来实现并发。理解了这一点,你就跨越了最大的思维鸿沟。
