Fast API 中的用户认证:深入理解 JWT(JSON Web Tokens)
在当今高度互联的软件生态系统中,API(应用程序编程接口)扮演着至关重要的角色,它们是不同服务和应用之间进行通信的桥梁。然而,随着API的广泛应用,确保数据和用户身份的安全变得至关重要。本文将深入探讨API认证的核心概念,特别是如何在FastAPI中利用JWT(JSON Web Tokens)结合SQLModel,实现安全、高效的用户认证。
1. API 认证的重要性及认证类型
API认证是验证请求来源身份的过程,它确保只有经过授权的用户或系统才能访问受保护的资源,并执行特定操作。设想一个社交媒体应用,用户需要登录才能创建帖子、删除帖子或投票。如果没有适当的认证机制,任何人都可能冒充他人或执行未经授权的操作,这将导致严重的安全漏洞。
API认证主要有两种主流方式:
-
Session-based Authentication (基于会话的认证)
- 这种方式在传统的Web应用中非常常见。当用户登录时,服务器会创建一个“会话”(Session),并在其后端服务器或API上存储该会话的信息,以跟踪用户是否已登录。服务器通常会向客户端(如浏览器)发送一个包含Session ID的Cookie。客户端在后续的每个请求中都会携带这个Cookie,服务器通过查找Session ID来验证用户身份。
- 优点: 相对简单,服务器端可以轻松管理用户状态(如登录/登出)。
- 缺点:
- 有状态性: 服务器需要维护每个用户的会话状态,这在分布式系统或微服务架构中会增加复杂性,因为会话数据可能需要跨多个服务器共享。
- 可扩展性: 增加了后端服务器的负担,难以水平扩展。
- CSRF攻击: 容易受到跨站请求伪造(CSRF)攻击。
-
Token-based Authentication (基于令牌的认证)
- Token-based认证,特别是JWT,是API认证的现代趋势。与Session-based认证不同,它是一种“无状态”(Stateless)认证机制。这意味着后端API服务器本身不存储任何关于用户登录状态的信息。相反,当用户成功登录后,API会生成一个令牌(Token),并将其发送给客户端。客户端负责存储这个令牌(通常在本地存储或Cookie中),并在后续的每个需要认证的请求中,将这个令牌包含在请求头(如
Authorization
头)中发送给API。API收到请求后,会验证令牌的有效性,如果有效,则处理请求。 - 优点:
- 无状态性: API服务器无需维护会话状态,大大提高了可扩展性和性能。
- 跨域: 更容易实现跨域(Cross-Origin)认证。
- 安全性: 通过数字签名保证令牌的完整性,防止篡改。
- 移动友好: 更适合移动应用程序。
- Token-based认证,特别是JWT,是API认证的现代趋势。与Session-based认证不同,它是一种“无状态”(Stateless)认证机制。这意味着后端API服务器本身不存储任何关于用户登录状态的信息。相反,当用户成功登录后,API会生成一个令牌(Token),并将其发送给客户端。客户端负责存储这个令牌(通常在本地存储或Cookie中),并在后续的每个需要认证的请求中,将这个令牌包含在请求头(如
2. JWT(JSON Web Tokens)机制深度解析
JWT不仅仅是一个随机字符串,它是一个紧凑且自包含的令牌,用于在各方之间安全地传输信息。它由三部分组成,用点号(.
)分隔:Header(头部)、Payload(载荷)和Signature(签名)。
2.1 JWT 的结构
-
Header (头部)
Header通常包含两部分信息:令牌的类型(typ
),通常是"JWT";以及所使用的签名算法(alg
),例如HS256
或RS256
。
示例:{"alg": "HS256","typ": "JWT" }
这部分信息经过Base64URL编码后,构成JWT的第一部分。
-
Payload (载荷)
Payload包含令牌的“声明”(Claims),即关于实体(通常是用户)和其他数据的陈述。这些声明可以是注册的(如iss
发行者、exp
过期时间),也可以是公共的或私有的。
重要提示:JWT的Payload是经过Base64URL编码的,但它不是加密的! 这意味着任何获取到JWT的人都可以解码并查看其中的内容。 因此,Payload中绝不应包含任何敏感信息,例如用户的原始密码或银行卡号。通常,Payload会包含用户ID(user_id
)、用户角色(role
)等非敏感信息,这些信息在API处理请求时可能需要用到。
示例:{"user_id": 123,"role": "user","exp": 1678886400 // 过期时间戳 (示例) }
这部分信息经过Base64URL编码后,构成JWT的第二部分。
-
Signature (签名)
Signature是JWT最重要的部分,它用于验证令牌的完整性,确保令牌在传输过程中未被篡改。签名是通过将Header和Payload的Base64URL编码字符串,与一个只有服务器知道的“秘密密钥”(Secret Key)一起,使用Header中指定的算法(如HS256)进行哈希(或HMAC)计算而生成的。
签名计算公式大致如下:Signature = HASH_Algorithm(Base64URL(Header) + "." + Base64URL(Payload), Secret_Key)
这个签名构成JWT的第三部分。客户端不需要知道Secret Key,因为它只用于服务器验证令牌的有效性。
2.2 JWT 的工作原理 (无状态认证)
JWT实现无状态认证的流程如下:
- 用户登录: 客户端向API的
/login
端点发送用户的凭据(例如电子邮件和密码)。 - 验证凭据: API接收凭据后,会查询数据库验证用户的身份。如果凭据正确,API会使用一个秘密密钥生成一个JWT。
- 返回令牌: API将生成的JWT(以及令牌类型,如
Bearer
)作为响应的一部分返回给客户端。 - 客户端存储令牌: 客户端接收到JWT后,将其存储在本地(例如,浏览器端的localStorage或SessionStorage,或移动应用的内存中)。
- 访问受保护资源: 在后续需要认证的请求中,客户端会将此JWT放置在HTTP请求的
Authorization
头中(格式通常为Bearer <token>
)发送给API。 - API验证令牌: API接收到请求后,会从请求头中提取JWT。它会使用相同的秘密密钥和签名算法,重新计算JWT的签名,并与接收到的签名进行比较。如果签名匹配,并且令牌未过期,API就认为该令牌是有效的,并提取Payload中的信息(如
user_id
)来识别用户身份。 - 处理请求: 验证通过后,API会处理请求并返回相应数据。
由于JWT包含了验证所需的所有信息(通过签名),API服务器无需在自己的数据库中存储任何会话信息,从而实现了真正的无状态。
2.3 JWT 的安全性考虑
尽管JWT提供了强大的认证能力,但理解其安全特性至关重要:
- JWT 未加密: 如前所述,JWT的Payload是可读的,即使经过Base64URL编码,也并非加密。因此,永远不要在Payload中放置敏感信息(如用户密码、敏感个人数据)。
- 签名保证完整性: 签名机制的主要目的是确保令牌的完整性(Integrity),而非机密性。它能防止令牌在传输过程中被篡改。如果攻击者试图更改Payload中的任何数据(例如,将用户ID从123改为456),由于他们不知道秘密密钥,将无法生成一个有效的签名,API在验证时会立即发现令牌被篡改并拒绝请求。
- 秘密密钥的保密性: 秘密密钥是JWT认证安全的核心。它必须严格保密,只存储在服务器端,绝不能暴露给客户端或存储在版本控制系统中。一旦秘密密钥泄露,攻击者就可以伪造有效的JWT,从而完全绕过认证机制。
- 过期时间: JWT应设置合理的过期时间(
exp
声明)。这可以限制令牌的有效性,即使令牌被盗,其有效时间也有限。一旦令牌过期,API将拒绝该令牌,用户需要重新登录获取新的令牌。
3. 密码哈希:使用 passlib
和 bcrypt
在用户注册时,绝不能将用户的原始密码以纯文本形式存储在数据库中。FastAPI生态推荐使用passlib
库和bcrypt
哈希算法来实现密码的安全存储。
-
安装依赖:
pip install "passlib[bcrypt]" # 或者分开安装: # pip install passlib bcrypt
(通常
pip install fastapi[all]
会包含bcrypt
) -
配置
CryptContext
:
在您的utils.py
(或其他工具模块) 中:# app/utils.py from passlib.context import CryptContext# 定义密码上下文,指定使用bcrypt算法 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def hash_password(password: str): # 建议函数名更明确return pwd_context.hash(password)def verify_password(plain_password: str, hashed_password: str):return pwd_context.verify(plain_password, hashed_password)
hash_password()
用于生成密码哈希,verify_password()
用于验证明文密码与哈希是否匹配,这是一个单向过程。
4. FastAPI与SQLModel:用户认证实战
现在,我们将这些概念整合到FastAPI应用中,使用SQLModel作为ORM来实现用户注册和登录。
4.1 用户注册(创建用户记录与哈希密码)
用户注册流程:接收用户凭据 -> 哈希密码 -> 存储到数据库。
-
定义Pydantic API Schema (
schemas.py
):
用于验证请求体和塑造响应。# app/schemas.py from pydantic import BaseModel, EmailStr from datetime import datetime from typing import Optionalclass UserCreate(BaseModel):email: EmailStrpassword: strclass UserOut(BaseModel): # 用于API响应,不包含密码id: intemail: EmailStrcreated_at: datetime # SQLModel会自动处理datetime对象class Config:from_attributes = True # 允许从ORM/SQLModel对象属性填充
-
定义SQLModel数据库模型 (
models.py
):# app/models.py from typing import Optional from sqlmodel import SQLModel, Field from datetime import datetime from sqlalchemy import text # 用于服务器端默认值class User(SQLModel, table=True):__tablename__ = "users" # 显式指定表名,好习惯id: Optional[int] = Field(default=None, primary_key=True)email: str = Field(unique=True, index=True, nullable=False) # 邮箱唯一且建立索引password: str = Field(nullable=False) # 存储哈希后的密码created_at: datetime = Field(default_factory=datetime.utcnow, # Pydantic层面默认值sa_column_kwargs={"server_default": text("now()")}, # 数据库层面默认值nullable=False)
【注】:
default_factory=datetime.utcnow
是Pydantic层面的默认值,而sa_column_kwargs={"server_default": text("now()")}
是数据库服务器端的默认值。两者可以并存,通常数据库默认值更可靠。 -
实现用户注册路由 (
routers/users.py
):# app/routers/users.py from fastapi import APIRouter, Depends, status, HTTPException from sqlmodel import Session # 使用SQLModel的Session# 假设 get_db 依赖已在 database.py from ..database import get_db from .. import models, schemas, utils # utils 包含密码哈希函数router = APIRouter(prefix="/users",tags=["Users"] )@router.post("/", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut) def register_user(user_payload: schemas.UserCreate, db: Session = Depends(get_db)):# 1. 检查用户是否已存在 (可选但推荐)# existing_user = db.exec(select(models.User).where(models.User.email == user_payload.email)).first()# if existing_user:# raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")# 2. 密码哈希hashed_pwd = utils.hash_password(user_payload.password) # 使用工具函数# 3. 创建新的用户记录 (SQLModel实例)user_payload.password = hashed_pwddb_user = models.User(**user_data_for_db)db.add(db_user)db.commit()db.refresh(db_user) # 刷新以获取数据库生成的id和created_atreturn db_user # FastAPI会用schemas.UserOut进行响应塑形
此路由在创建用户前哈希密码。若Pydantic验证失败(如邮件格式错误),会自动返回422。成功后,
schemas.UserOut
确保密码不被返回。
4.2 用户登录(验证凭据并生成JWT)
用户登录流程:验证凭据 -> 凭据正确 -> 生成JWT -> 返回JWT。
-
定义Pydantic API Schema (
schemas.py
):# app/schemas.py (继续添加) # ... (UserCreate, UserOut 已定义) ...class Token(BaseModel): # 用于包装JWT响应access_token: strtoken_type: strclass TokenData(BaseModel): # 用于JWT Payload内部结构定义 (可选,但有助于类型安全)id: Optional[int] = None # JWT Payload中的用户ID
【注】
TokenData
中的id
类型与models.User.id
(通常是int
)保持一致更佳。 -
配置JWT生成与验证逻辑 (
oauth2.py
):# app/oauth2.py from jose import JWTError, jwt from datetime import datetime, timedelta, timezone # 确保导入timezone from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlmodel import Session # 使用SQLModel的Sessionfrom . import schemas, models # schemas.TokenData, models.User from .database import get_db # 假设get_db返回SQLModel Session from .core.config import settings # 假设配置在 core.configoauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") # "login" 是登录端点的相对路径SECRET_KEY = settings.SECRET_KEY # 从配置中获取 ALGORITHM = settings.ALGORITHM ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTESdef create_access_token(data: dict) -> str:to_encode = data.copy()# 使用 timezone.utc 确保是时区感知的UTC时间expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({"exp": expire.timestamp()}) # JWT标准通常用Unix时间戳# 或 to_encode.update({"exp": expire}) # python-jose也能处理datetime对象encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)return encoded_jwtdef verify_access_token(token: str, credentials_exception: HTTPException) -> schemas.TokenData:try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])user_id: Optional[int] = payload.get("user_id") # 假设payload中存储的是user_idif user_id is None:raise credentials_exception# 将str类型的id(如果payload存的是str)转为intreturn schemas.TokenData(id=int(user_id))except JWTError: # 包括过期、签名错误等raise credentials_exceptionexcept ValueError: # 处理int转换失败raise credentials_exceptiondef get_current_active_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> models.User: # 返回SQLModel的User实例credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate": "Bearer"},)token_data = verify_access_token(token, credentials_exception)# 使用SQLModel的 session.get() 通过主键获取用户user = db.get(models.User, token_data.id)if user is None:raise credentials_exception# 你可以在这里添加用户是否激活的检查 (if not user.is_active: ...)return user
-
实现用户登录路由 (
routers/auth.py
):# app/routers/auth.py from fastapi import APIRouter, Depends, status, HTTPException from fastapi.security import OAuth2PasswordRequestForm from sqlmodel import Session, select # 导入selectfrom .. import schemas, models, utils, oauth2 from ..database import get_dbrouter = APIRouter(tags=["Authentication"])@router.post("/login", response_model=schemas.Token) def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ):# OAuth2PasswordRequestForm 将email存储在username字段statement = select(models.User).where(models.User.email == form_data.username)user = db.exec(statement).first()if not user or not utils.verify_password(form_data.password, user.password):raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, # 用401更合适detail="Incorrect email or password",headers={"WWW-Authenticate": "Bearer"},)access_token = oauth2.create_access_token(data={"user_id": user.id})return {"access_token": access_token, "token_type": "bearer"}
-
在
main.py
中引入路由:# app/main.py from fastapi import FastAPI # 假设你的路由组织在 app.routers 包下 from .routers import posts_router, users_router, auth_router # 使用更明确的导入名 from .database import create_db_and_tables # 假设SQLModel表创建函数app = FastAPI()@app.on_event("startup") def on_startup():create_db_and_tables() # 创建SQLModel定义的表app.include_router(posts_router.router) # 假设路由实例名为 router app.include_router(users_router.router) app.include_router(auth_router.router)@app.get("/") def read_root():return {"message": "Welcome to my API!"}
4.3 保护 API 端点
任何需要用户登录才能访问的API端点,只需在路径操作函数中添加oauth2.get_current_active_user
作为依赖项。
# app/routers/posts_router.py (假设帖子路由文件)
from fastapi import APIRouter, Depends, status, HTTPException, Response
from sqlmodel import Session, select
from typing import Listfrom .. import models, schemas, oauth2
from ..database import get_dbrouter = APIRouter(prefix="/posts",tags=["Posts"]
)@router.post("/", status_code=status.HTTP_201_CREATED, response_model=schemas.Post) # 假设schemas.Post是响应模型
def create_new_post(post_payload: schemas.PostCreate,db: Session = Depends(get_db),current_user: models.User = Depends(oauth2.get_current_active_user) # 注入当前用户
):# current_user 现在是经过认证的 models.User SQLModel实例# 你需要确保 models.Post 有 owner_id 字段来关联用户post_data_for_db = post_payload.model_dump()# 假设 models.Post 有 owner_id 字段# post_data_for_db["owner_id"] = current_user.idnew_post = models.Post(**post_data_for_db)db.add(new_post)db.commit()db.refresh(new_post)return new_post@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_existing_post(post_id: int,db: Session = Depends(get_db),current_user: models.User = Depends(oauth2.get_current_active_user)
):db_post = db.get(models.Post, post_id) # 使用SQLModel的get方法if not db_post:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail=f"Post with id: {post_id} does not exist")# 权限检查:确保当前用户是帖子的所有者# if db_post.owner_id != current_user.id: # 假设 Post 模型有 owner_id# raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,# detail="Not authorized to perform requested action")db.delete(db_post)db.commit()return Response(status_code=status.HTTP_204_NO_CONTENT)
总结
FastAPI结合SQLModel,为构建安全的API提供了一套现代且高效的工具。通过深入理解JWT的无状态工作原理、其三部分结构(Header、Payload、Signature)以及其并非加密而是签名以保证数据完整性的特性,开发者可以有效地利用它实现API认证。结合passlib
和bcrypt
进行密码哈希存储,以及FastAPI强大的依赖注入系统和SQLModel简洁的ORM操作,我们可以构建出安全、高效、可扩展的用户认证系统。这对于任何需要确保用户数据和操作安全的API项目都至关重要。