FastAPI入门:安全性
安全性
FastAPI 提供了多种工具,可帮助你以标准的方式轻松、快速地处理安全性,而无需研究和学习所有的安全规范
相关概念
OAuth2:OAuth2是一个规范,它定义了几种处理身份认证和授权的方法。它是一个相当广泛的规范,涵盖了一些复杂的使用场景。它包括了使用「第三方」进行身份认证的方法。
OpenID Connect:OpenID Connect 是另一个基于 OAuth2 的规范。它只是扩展了 OAuth2,并明确了一些在 OAuth2 中相对模糊的内容,以尝试使其更具互操作性
OpenAPI:OpenAPI(以前称为 Swagger)是用于构建 API 的开放规范(现已成为 Linux Foundation 的一部分)。FastAPI 基于 OpenAPI。这就是使多个自动交互式文档界面,代码生成等成为可能的原因。OpenAPI 有一种定义多个安全「方案」的方法。通过使用它们,你可以利用所有这些基于标准的工具,包括这些交互式文档系统。
OpenAPI 定义了以下安全方案:
- apiKey:一个特定于应用程序的密钥,可以来自:
- 查询参数。
- 请求头。
- cookie。
- http:标准的 HTTP 身份认证系统,包括:
- bearer: 一个值为 Bearer 加令牌字符串的 Authorization 请求头。这是从 OAuth2 继承的。
- HTTP Basic 认证方式。
HTTP Digest,等等。
- oauth2:所有的 OAuth2 处理安全性的方式(称为「流程」)。 *以下几种流程适合构建 OAuth 2.0 身份认证的提供者(例如 Google,Facebook,Twitter,GitHub 等): * implicit * clientCredentials * authorizationCode
- 但是有一个特定的「流程」可以完美地用于直接在同一应用程序中处理身份认证:
- password:接下来的几章将介绍它的示例。
- 但是有一个特定的「流程」可以完美地用于直接在同一应用程序中处理身份认证:
- openIdConnect:提供了一种定义如何自动发现 OAuth2 身份认证数据的方法。
- 此自动发现机制是 OpenID Connect 规范中定义的内容。
FastAPI 在 fastapi.security 模块中为每个安全方案提供了几种工具,这些工具简化了这些安全机制的使用方法。
安全第一步
假设后端 API 在某个域。
前端在另一个域,或(移动应用中)在同一个域的不同路径下。
并且,前端要使用后端的 username 与 password 验证用户身份。
from typing import Annotatedfrom fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearerapp = FastAPI()oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")@app.get("/items/")
async def read_items(token: Annotated[str, Depends(oauth2_scheme)]):return {"token": token}
运行后,点击 Authorize 按钮,弹出授权表单,输入 username 与 password 及其它可选字段
下面,我们来看一下简化的运行流程:
- 用户在前端输入 username 与password,并点击回车
- (用户浏览器中运行的)前端把 username 与password 发送至 API 中指定的 URL(使用 tokenUrl=“token” 声明)
- API 检查 username 与password,并用令牌(Token) 响应(暂未实现此功能):
- 令牌只是用于验证用户的字符串
- 一般来说,令牌会在一段时间后过期
- 过时后,用户要再次登录
- 这样一来,就算令牌被人窃取,风险也较低。因为它与永久密钥不同,在绝大多数情况下不会长期有效
- 前端临时将令牌存储在某个位置
- 用户点击前端,前往前端应用的其它部件
- 前端需要从 API 中提取更多数据:
- 为指定的端点(Endpoint)进行身份验证
- 因此,用 API 验证身份时,要发送值为 Bearer + 令牌的请求头 Authorization
- 假如令牌为 foobar,Authorization 请求头就是: Bearer foobar
FastAPI 的 OAuth2PasswordBearer
创建 OAuth2PasswordBearer 的类实例时,要传递 tokenUrl 参数。该参数包含客户端(用户浏览器中运行的前端) 的 URL,用于发送 username 与 password,并获取令牌
在上面的代码中,tokenUrl=“token” 指向的是暂未创建的相对 URL token。这个相对 URL 相当于 ./token。
因为使用的是相对 URL,如果 API 位于 https://example.com/,则指向 https://example.com/token。但如果 API 位于 https://example.com/api/v1/,它指向的就是https://example.com/api/v1/token。
使用相对 URL 非常重要,可以确保应用在遇到使用代理这样的高级用例时,也能正常运行
oauth2_scheme 变量是 OAuth2PasswordBearer 的实例,也是可调用项。
以如下方式调用:
oauth2_scheme(some, parameters)
因此,Depends 可以调用 oauth2_scheme 变量
FastAPI 校验请求中的 Authorization 请求头,核对请求头的值是不是由 Bearer + 令牌组成, 并返回令牌字符串(str)。如果没有找到 Authorization 请求头,或请求头的值不是 Bearer + 令牌。FastAPI 直接返回 401 错误状态码(UNAUTHORIZED)
获取当前用户
创建用户模型
class User(BaseModel):username: stremail: Union[str, None] = Nonefull_name: Union[str, None] = Nonedisabled: Union[bool, None] = None
创建 get_current_user 依赖项
get_current_user 使用 oauth2_scheme 作为依赖项。
与之前直接在路径操作中的做法相同,新的 get_current_user 依赖项从子依赖项 oauth2_scheme 中接收 str 类型的 token
get_current_user 使用创建的(伪)工具函数,该函数接收 str 类型的令牌,并返回 Pydantic 的 User 模型
def fake_decode_token(token):return User(username=token + "fakedecoded", email="john@example.com", full_name="John Doe") async def get_current_user(token: str = Depends(oath2_scheme)):user = fake_decode_token(token)return user
注入当前用户
@app.get("/user/me")
async def read_users_me(current_user: User = Depends(get_current_user)):return current_user
OAuth2 实现简单的 Password 和 Bearer 验证
获取 username 和 password
OAuth2 规范要求使用密码流时,客户端或用户必须以表单数据形式发送 username 和 password 字段。
并且,这两个字段必须命名为 username 和 password ,不能使用 user-name 或 email 等其它名称
Scope(作用域)
OAuth2 还支持客户端发送scope表单字段。scope 是 OAuth2 中的一个重要概念,用于定义访问权限的范围。它指定了令牌可以访问哪些资源或执行哪些操作
获取 username 和 password 的代码
使用 FastAPI 工具获取用户名与密码: 导入 OAuth2PasswordRequestForm,然后,在 /token 路径操作 中,用 Depends 把该类作为依赖项
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
OAuth2PasswordRequestForm 是用以下几项内容声明表单请求体的类依赖项:
- username
- password
- 可选的 scope 字段,由多个空格分隔的字符串组成的长字符串
- 可选的 grant_type
- 可选的 client_id(本例未使用)
- 可选的 client_secret(本例未使用)
使用表单数据
现在,即可使用表单字段 username,从(伪)数据库中获取用户数据。
如果不存在指定用户,则返回错误消息,提示用户名或密码错误
fake_users_db = {"johndoe": {"username": "johndoe","full_name": "John Doe","email": "johndoe@example.com","hashed_password": "fakehashedsecret","disabled": False,},"alice": {"username": "alice","full_name": "Alice Wonderson","email": "alice@example.com","hashed_password": "fakehashedsecret2","disabled": True,},
}@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):user_dict = fake_users_db.get(form_data.username)if not user_dict:raise HTTPException(status_code=400, detail="Incorrect username or password")
校验密码
接下来,首先将数据放入 Pydantic 的 UserInDB 模型。如果密码不匹配,则返回与上面相同的错误
from typing import Unionfrom fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModelapp = FastAPI()oath2_scheme = OAuth2PasswordBearer(tokenUrl= "token")fake_users_db = {"johndoe": {"username": "johndoe","full_name": "John Doe","email": "johndoe@example.com","hashed_password": "fakehashedsecret","disabled": False,},"alice": {"username": "alice","full_name": "Alice Wonderson","email": "alice@example.com","hashed_password": "fakehashedsecret2","disabled": True,},
}class User(BaseModel):username: stremail: Union[str, None] = Nonefull_name: Union[str, None] = Nonedisabled: Union[bool, None] = Noneclass UserInDB(User):hashed_password: strdef fake_decode_token(token):user = get_user(fake_users_db, token)return user async def get_current_user(token: str = Depends(oath2_scheme)):user = fake_decode_token(token)return userdef fake_hash_password(password: str):return "fakehashed" + password@app.get("/user/me")
async def read_users_me(current_user: User = Depends(get_current_user)):return current_user@app.get("/")
async def read_root():return {"Hello": "World"}@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):user_dict = fake_users_db.get(form_data.username)if not user_dict:raise HTTPException(status_code=400, detail="Incorrect username or password")user = UserInDB(**user_dict)hashed_password = fake_hash_password(form_data.password)if not hashed_password == user.hashed_password:raise HTTPException(status_code=400, detail="Incorrect username or password")
返回 Token
token 端点的响应必须是 JSON 对象。
响应返回的内容应该包含 token_type。本例中用的是BearerToken,因此, Token 类型应为bearer
返回内容还应包含 access_token 字段,它是包含权限 Token 的字符串
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):user_dict = fake_users_db.get(form_data.username)if not user_dict:raise HTTPException(status_code=400, detail="Incorrect username or password")user = UserInDB(**user_dict)hashed_password = fake_hash_password(form_data.password)if not hashed_password == user.hashed_password:raise HTTPException(status_code=400, detail="Incorrect username or password")return {"access_token": user.username, "token_type": "bearer"}
更新依赖项
接下来,更新依赖项。
使之仅在当前用户为激活状态时,才能获取 current_user
为此,要再创建一个依赖项 get_current_active_user,此依赖项以 get_current_user 依赖项为基础。
如果用户不存在,或状态为未激活,这两个依赖项都会返回 HTTP 错误。
async def get_current_user(token: str = Depends(oath2_scheme)):user = fake_decode_token(token)if not user:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid authentication credentials",headers={"WWW-Authenticate": "Bearer"},)return userasync def get_current_active_user(current_user: User = Depends(get_current_user)):if current_user.disabled:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail="Inactive user",)return current_user@app.get("/user/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):return current_user
打开docs文档
点击Authorize按钮。使用以下凭证:
用户名:johndoe
密码:secret
使用 /users/me 路径的 GET 操作。
可以提取如下当前用户数据:
点击小锁图标,注销后,再执行同样的操作,则会得到 HTTP 401 错误
测试未激活用户,输入以下信息,进行身份验证:
用户名:alice
密码:secret2
然后,执行 /users/me 路径的 GET 操作。
OAuth2 实现简单的 Password 和 Bearer 验证
JWT 即JSON 网络令牌(JSON Web Tokens)。
JWT 是一种将 JSON 对象编码为没有空格,且难以理解的长字符串的标准
安装PyJWT
pip install pyjwt
密码哈希
哈希是指把特定内容(本例中为密码)转换为乱码形式的字节序列(其实就是字符串)。
每次传入完全相同的内容时(比如,完全相同的密码),返回的都是完全相同的乱码。
但这个乱码无法转换回传入的密码。
安装 passlib
pip install passlib[bcrypt]
密码哈希与校验
创建三个工具函数,其中一个函数用于哈希用户的密码。
第一个函数用于校验接收的密码是否匹配存储的哈希值。
第三个函数用于身份验证,并返回用户。
from passlib.context import CryptContextpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
这句代码创建了一个密码加密上下文,用于安全地处理密码的哈希和验证。指定使用的哈希算法为 bcrypt(bcrypt 是目前最安全的密码哈希算法之一)deprecated="auto"自动处理过时算法的升级
from pydantic import BaseModelfrom passlib.context import CryptContextfake_users_db = {"johndoe": {"username": "johndoe","full_name": "John Doe","email": "johndoe@example.com","hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled": False,}
}class User(BaseModel):username: stremail: str | None = Nonefull_name: str | None = Nonedisabled: bool | None = Noneclass UserInDB(User):hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password, hashed_password):return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password):return pwd_context.hash(password)def get_user(db, username: str):if username in db:user_dict = db[username]return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str):user = get_user(fake_db, username)if not user:return Falseif not verify_password(password, user.hashed_password):return Falsereturn user
处理JWT令牌
创建用于 JWT 令牌签名的随机密钥。
使用以下命令,生成安全的随机密钥:
openssl rand -hex 32
然后,把生成的密钥复制到变量SECRET_KEY
创建指定 JWT 令牌签名算法的变量 ALGORITHM,本例中的值为 “HS256”
创建设置令牌过期时间的变量。
定义令牌端点响应的 Pydantic 模型。
创建生成新的访问令牌的工具函数。
from pydantic import BaseModel
from datetime import timedelta, timezone, datetime
import jwtfrom passlib.context import CryptContextSECRET_KEY = "cda798e74bb33984844b8ed76a28fa4a004ef98ba4e4ea6bedb92bc9800ffaa7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30class Token(BaseModel):access_token: strtoken_type: strfake_users_db = {"johndoe": {"username": "johndoe","full_name": "John Doe","email": "johndoe@example.com","hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled": False,}
}class User(BaseModel):username: stremail: str | None = Nonefull_name: str | None = Nonedisabled: bool | None = Noneclass UserInDB(User):hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password, hashed_password):return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password):return pwd_context.hash(password)def get_user(db, username: str):if username in db:user_dict = db[username]return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str):user = get_user(fake_db, username)if not user:return Falseif not verify_password(password, user.hashed_password):return Falsereturn userdef create_access_token(data: dict, expires_delta: timedelta | None = None):to_encode = data.copy()if expires_delta:expire = datetime.now(timezone.utc) + expires_deltaelse:expire = datetime.now(timezone.utc) + timedelta(minutes=15)to_encode.update({"exp": expire})encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)return encode_jwt
更新依赖项
更新 get_current_user 以接收与之前相同的令牌,但这里用的是 JWT 令牌。
解码并校验接收到的令牌,然后,返回当前用户。
如果令牌无效,则直接返回 HTTP 错误。
from pydantic import BaseModel
from datetime import timedelta, timezone, datetime
import jwt
from jwt.exceptions import InvalidTokenError
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearerfrom passlib.context import CryptContextSECRET_KEY = "cda798e74bb33984844b8ed76a28fa4a004ef98ba4e4ea6bedb92bc9800ffaa7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30class Token(BaseModel):access_token: strtoken_type: strclass TokenData(BaseModel):username: str | None = Nonefake_users_db = {"johndoe": {"username": "johndoe","full_name": "John Doe","email": "johndoe@example.com","hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled": False,}
}oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")class User(BaseModel):username: stremail: str | None = Nonefull_name: str | None = Nonedisabled: bool | None = Noneclass UserInDB(User):hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password, hashed_password):return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password):return pwd_context.hash(password)def get_user(db, username: str):if username in db:user_dict = db[username]return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str):user = get_user(fake_db, username)if not user:return Falseif not verify_password(password, user.hashed_password):return Falsereturn userdef create_access_token(data: dict, expires_delta: timedelta | None = None):to_encode = data.copy()if expires_delta:expire = datetime.now(timezone.utc) + expires_deltaelse:expire = datetime.now(timezone.utc) + timedelta(minutes=15)to_encode.update({"exp": expire})encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)return encode_jwtasync def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate": "Bearer"},)try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])username = payload.get("sub")if username is None:raise credentials_exceptiontoken_data = TokenData(username=username)except InvalidTokenError:raise credentials_exceptionuser = get_user(fake_users_db, username=token_data.username)if user in None:raise credentials_exceptionreturn user
更新/token路径操作
用令牌过期时间创建 timedelta 对象。
创建并返回真正的 JWT 访问令牌。
@app.post("/token")
async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:user = authenticate_user(fake_users_db, form_data.username, form_data.password)if not user:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate": "Bearer"},)access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)return Token(access_token=access_token, token_type="bearer")
JWT 规范还包括 sub 键,值是令牌的主题
完整代码:
from pydantic import BaseModel
from datetime import timedelta, timezone, datetime
import jwt
from jwt.exceptions import InvalidTokenError
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom passlib.context import CryptContextSECRET_KEY = "cda798e74bb33984844b8ed76a28fa4a004ef98ba4e4ea6bedb92bc9800ffaa7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30class Token(BaseModel):access_token: strtoken_type: strclass TokenData(BaseModel):username: str | None = Nonefake_users_db = {"johndoe": {"username": "johndoe","full_name": "John Doe","email": "johndoe@example.com","hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled": False,}
}oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")class User(BaseModel):username: stremail: str | None = Nonefull_name: str | None = Nonedisabled: bool | None = Noneclass UserInDB(User):hashed_password: strapp = FastAPI()pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password, hashed_password):return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password):return pwd_context.hash(password)def get_user(db, username: str):if username in db:user_dict = db[username]return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str):user = get_user(fake_db, username)if not user:return Falseif not verify_password(password, user.hashed_password):return Falsereturn userdef create_access_token(data: dict, expires_delta: timedelta | None = None):to_encode = data.copy()if expires_delta:expire = datetime.now(timezone.utc) + expires_deltaelse:expire = datetime.now(timezone.utc) + timedelta(minutes=15)to_encode.update({"exp": expire})encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)return encode_jwtasync def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate": "Bearer"},)try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])username = payload.get("sub")if username is None:raise credentials_exceptiontoken_data = TokenData(username=username)except InvalidTokenError:raise credentials_exceptionuser = get_user(fake_users_db, username=token_data.username)if user is None:raise credentials_exceptionreturn user@app.post("/token")
async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:user = authenticate_user(fake_users_db, form_data.username, form_data.password)if not user:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate": "Bearer"},)access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)return Token(access_token=access_token, token_type="bearer")async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]
):if current_user.disabled:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail="Inactive user",)return current_user@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: Annotated[User, Depends(get_current_active_user)]
):return current_user@app.get("/users/me/items/")
async def read_own_items(current_user: Annotated[User, Depends(get_current_active_user)]
):return [{"item_id": "Foo", "owner": current_user.username}]
运行代码,访问docs文档
使用如下凭证:
用户名: johndoe 密码: secret
调用 /users/me/ 端点,收到下面的响应: