FastAPI(未结束)
异步处理
概念
并发:通过任务切换,让多个任务 “看起来” 同时执行(宏观上并行,微观上串行)
并行:多个任务真正同时执行(宏观和微观上都并行)
同步:任务按顺序执行,前一个任务完成后,才开始执行下一个任务,中间会等待。
异步:任务无需等待前一个任务完成,发起任务后立即继续执行后续操作,任务结果通过回调、事件通知等方式处理。
进程【】 (视频转码、科学计算)
线程【】 (IO任务、网络请求、磁盘读写)
协程【asyncio】 (在一个线程中运行,性能比线程好)
实现方式:
WSGI:同步。通过多进程+多线程的方式来实现并发的。
ASGI:异步。通过多进程+主线程(不需要多线程)+协程的方式来实现并发的。
协程异步
async :定义协程(coroutine) 的关键字
await:暂停当前协程的执行,等待另一个协程 / 任务完成,期间事件循环可以调度其他任务(实现非阻塞)
asyncio.run(main()):事件循环调度
import asyncio
import time# 定义协程函数(异步任务)
async def async_task(name: str, delay: int):print(f"任务 {name} 开始,将等待 {delay} 秒")# 模拟耗时I/O操作(非阻塞等待)await asyncio.sleep(delay) # 注意:必须用asyncio.sleep,而非time.sleepprint(f"任务 {name} 完成")return f"{name} 结果"# 主协程(程序入口)
async def main():# 方式1:串行执行(按顺序等待每个任务)print("=== 串行执行 ===")start = time.time()await async_task("A", 1)await async_task("B", 2)print(f"串行耗时: {time.time() - start:.2f}秒\n")# 方式2:并行执行(同时运行多个任务)print("=== 并行执行 ===")start = time.time()# 创建任务(将协程封装为可调度的任务)task1 = asyncio.create_task(async_task("C", 1))task2 = asyncio.create_task(async_task("D", 2))# 等待所有任务完成并获取结果result1 = await task1result2 = await task2print(f"任务结果: {result1}, {result2}")print(f"并行耗时: {time.time() - start:.2f}秒")# 运行事件循环
if __name__ == "__main__":asyncio.run(main()) # Python 3.7+ 新增,自动管理事件循环
第一个FastAPI程序
代码
from fastapi import FastAPI
from typing import Optional
import uvicornapp = FastAPI()@app.get("/")
async def aa():print("aaaaaaa")return {"message":"Hello World"}@app.get("/item/{item_id}")
async def bb(item_id : int, q : Optional[str] = None)-> dict:print("bbbbbbbb00")return {"message":"Hello World"}if __name__ == "__main__":uvicorn.run(app, host = "0.0.0.0", port = 8080)
库介绍
【typing】库
typing 是 Python 标准库中用于类型提示(Type Hints) 的模块
用于在代码中显式声明变量【a : int】、函数参数和返回值【()->str】的类型。
【uvicorn】库
uvicorn 是一个基于 ASGI(Asynchronous Server Gateway Interface) 的高性能异步 Web 服务器,
专为运行异步 Python Web 应用(如 FastAPI、Starlette 等)设计。
参数(typing )
类型提示
定义:参数名:数据类型
基本用法
# 基本语法
username: str = "admin"# 返回值类型
def aa(old: int)->int:return oldfrom typing import List, Dict, Tuple, Set, Optional, Any, Union, Literal
# list
unList1: List[str] = ["a", "b", "c"]
unList2: List[str|int] = [1, 2, "3", "4"]# Dict
unDict: Dict[str, str|int] = {"name":"zhangsan", "age":18}# Tuple
unTuple: Tuple[str, int, str] = ("a", 1, "b")# Set
unSet: Set[str] = {"a", "b", "c"}
联合类型:
# Union
# 可以选择多种类型
xxUnion: Union[str, int] = 1# Optional
# 可以设置默认值,如果没有发送值,则使用默认值
xxOptional: Optional[str] = None
数据校验(Pydantic)
是一个用于数据验证和设置管理的 Python 库,基于类型注解,广泛用于数据解析、验证和序列化
from pydantic import BaseModel,PositiveIntclass User(BaseModel):id: intname: str = "zhangsan" # 默认值phone: stridcard: PositiveInt # PositiveInt 正整数
请求数据
路径参数(Path)
位置:嵌入在 URL 路径中,通常用于标识资源的唯一标识(如 ID、名称等)。
特点:是 URL 的一部分,不可省略,格式一般为 /{参数名}。
from fastapi import FastAPI, Path
app =FastAPI()
@app.get("/aa/{name}")
async def aa(name: str = Path(max_length=10, description='用户名'))-> str:return {"message": {name}}
注意:1、由于路径操作是按顺序依次运行的,因此,一定要在 /users/{user_id} 之前声明 /users/me
查询参数(Query)
位置:跟在 URL 后面,以 ? 开头,多个参数用 & 分隔。
特点:可选参数,常用于过滤、分页、排序等场景。
from fastapi import FastAPI, Path, Query
async def bb(category: str, page: int = Query(default=1, description="页码"), limit: int = Query(default=10, description="每页数量", ge=5)):return {"category": category, "page": page, "limit": limit}
请求体参数(Body, Field)
from pydantic import BaseModel, Field
from fastapi import FastAPI, Path, Query, Body
class User(BaseModel):name: str = Field(default=None, title="The description of the item", max_length=300)age: intemail: str@app.put("/cc/{id}")
async def cc(id: int, user: User):return {"user_id": id, "user": user.model_dump()}
model_dump():转字典
Cookie 参数
from fastapi import FastAPI, Path, Query, Body, Cookie
位置:在请求的 Cookie 中(存储在客户端的小型数据)。
特点:常用于身份验证、会话管理等。
from fastapi import FastAPI, Path, Query, Body, Cookie
from fastapi.responses import JSONResponse # 处理响应
@app.get("/dd")
async def dd(username: str = Cookie()):print(username)return username@app.get("/ee")
async def ee(username: str = Cookie()):response = JSONResponse(content={"message": "set cookie"})response.set_cookie('token', value='abcdefg')return response
请求头参数
位置:在 HTTP 请求头中(键值对形式)。
特点:常用于传递元数据(如认证信息、内容类型、语言等)。
from fastapi import FastAPI, Path, Query, Body, Cookie, Header
@app.get("/ff")
async def ff(host: str = Header()):return host
Annotated
在 Python 的 typing 模块中,Annotated 是一个特殊的类型提示工具,用于为类型添加额外的元数据(metadata),同时保留原始的类型信息。
语法:Annotated[type, metadata1, metadata2, ...]
- 第一个参数 type 是原始的类型注解(如 int、str 等)
- 后续参数是附加的元数据,可以是任意类型(字符串、数字、对象等)
文件上传
from fastapi import FastAPI, File, UploadFileapp = FastAPI()@app.post("/files/")
async def create_file(file: bytes = File()):return {"file_size": len(file)}@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):return {"filename": file.filename}
依赖注入
预处理(Before)
from fastapi import Depends, FastAPI, HTTPExceptionapp = FastAPI()# 依赖项函数
def common_parameters(q: str = None, skip: int = 0, limit: int = 100):return {"q": q, "skip": skip, "limit": limit}# 路由操作函数
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):return commons
后处理(After)
from fastapi import Depends, FastAPI, HTTPExceptionapp = FastAPI()# 依赖项函数
def common_parameters(q: str = None, skip: int = 0, limit: int = 100):return {"q": q, "skip": skip, "limit": limit}# 路由操作函数
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):return commons# 后处理函数
async def after_request():# 这里可以执行一些后处理逻辑,比如记录日志pass# 后处理依赖项
@app.get("/items/", response_model=dict)
async def read_items_after(request: dict = Depends(after_request)):return {"message": "Items returned successfully"}
异常处理(HTTPException)
from fastapi import FastAPI, HTTPException
app = FastAPI()
items = {"foo": "The Foo Wrestlers"}@app.get("/items/{item_id}")
async def read_item(item_id: str):if item_id not in items:raise HTTPException(status_code=404, detail="Item not found")return {"item": items[item_id]}
模块化(APIRouter)
APIRouter 是一个用于组织和拆分路由的工具,能帮助你将大型应用的路由按功能模块拆分,使代码结构更清晰、可维护性更强。
模块(test1,test2)
from fastapi import APIRouter, Header, HTTPException, Depends# 登录验证
async def login_required(token: str = Header()):if token != 'xxx':raise HTTPException(status_code=401, detail='Token is invalid')router = APIRouter(prefix="/older", tags=["olders"], dependencies=[Depends(login_required)])@router.get("/aa")
async def aa(token: str = Header()):print(token)return token
from fastapi import APIRouter, Header, HTTPException, Dependsrouter = APIRouter(prefix="/user", tags=["User"])@router.get("/aa/{name}")
async def aa(name: str):print(name)return name
主程序(main)
from fastapi import FastAPI
from mymodel import test1, test2
import uvicorn
app = FastAPI()
app.include_router(test1.router)
app.include_router(test2.router)if __name__ == "__main__":uvicorn.run(app, host="localhost", port=8888)
数据连接(pymysql)
import pymysql
from pymysql.cursors import DictCursorclass MySQLUtil:def __init__(self, host="localhost", user="root", password="", database="", port=3306):self.host = hostself.user = userself.password = passwordself.database = databaseself.port = portself.conn = Noneself.cursor = Nonedef connect(self):"""建立数据库连接"""try:self.conn = pymysql.connect(host=self.host,user=self.user,password=self.password,database=self.database,port=self.port,charset="utf8mb4",cursorclass=DictCursor # 默认返回字典)self.cursor = self.conn.cursor()return Trueexcept Exception as e:print(f"连接失败:{e}")return Falsedef query(self, sql, params=None):"""执行查询,返回结果列表"""if not self.conn:if not self.connect():return Nonetry:self.cursor.execute(sql, params or ())return self.cursor.fetchall()except Exception as e:print(f"查询失败:{e}")return Nonedef execute(self, sql, params=None):"""执行增删改,返回影响行数"""if not self.conn:if not self.connect():return -1try:rows = self.cursor.execute(sql, params or ())self.conn.commit()return rowsexcept Exception as e:self.conn.rollback()print(f"执行失败:{e}")return -1def close(self):"""关闭连接"""if self.cursor:self.cursor.close()if self.conn:self.conn.close()# 使用示例
if __name__ == "__main__":db = MySQLUtil(host="localhost",user="root",password="your_password",database="test_db")# 查询users = db.query("SELECT * FROM users WHERE age > %s", (18,))print("查询结果:", users)# 新增rows = db.execute("INSERT INTO users (name, age) VALUES (%s, %s)", ("David", 22))print("新增影响行数:", rows)# 关闭连接db.close()