做网站的技术关键免费关键词排名优化
前言
在现代Web应用开发中,用户认证系统是必不可少的功能。本文将带你使用FastAPI框架构建一个完整的用户认证系统,包含注册、登录、信息更新和删除等功能。我们将采用JWT(JSON Web Token)进行身份验证,并使用SQLite作为数据库存储用户信息。
一、项目概述
1.1 功能列表
- 用户注册
- 用户登录(获取JWT令牌)
- 用户信息更新
- 用户删除
- 用户信息查询(单个/全部)
1.2 技术栈
- FastAPI:高性能Python Web框架
- SQLAlchemy:Python SQL工具包和ORM
- JWT:JSON Web Token身份验证
- Passlib:密码哈希处理
- SQLite:轻量级数据库
二、环境准备
2.1 安装依赖
pip install fastapi uvicorn sqlalchemy passlib python-jose[cryptography]
2.2 项目结构
fastapi-auth/
├── main.py # 主应用文件
└── Login.db # SQLite数据库文件(运行后自动生成)
三、核心代码解析
3.1 配置与初始化
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel, ConfigDict
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm# OAuth2配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")# 密码加密配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")# JWT配置
SECRET_KEY = "ninglegedongdong" # 生产环境应使用更安全的密钥
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30# 创建FastAPI应用
app = FastAPI()# 数据库配置
my_database = 'sqlite:///Login.db'
engine = create_engine(my_database, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
3.2 数据模型定义
数据库模型
class User(Base):__tablename__ = 'login'id = Column(Integer, primary_key=True, index=True)username = Column(String, unique=True, index=True)hashed_password = Column(String)
Pydantic模型(用于请求/响应验证)
class UserCreate(BaseModel):username: strpassword: strclass UserResponse(BaseModel):id: intusername: strmodel_config = ConfigDict(from_attributes=True)class UserUpdate(BaseModel):username: strpassword: strclass Token(BaseModel):access_token: strtoken_type: strclass TokenData(BaseModel):username: Optional[str] = None
3.3 工具函数
def get_password_hash(password: str):"""生成密码哈希"""return pwd_context.hash(password)def verify_password(plain_password: str, hashed_password: str):"""验证密码"""return pwd_context.verify(plain_password, hashed_password)def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):"""创建JWT令牌"""to_encode = data.copy()expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))to_encode.update({"exp": expire})return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)async def get_current_user(token: str = Depends(oauth2_scheme)):"""获取当前用户(依赖项)"""credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="身份凭证验证失败",headers={"WWW-Authenticate": "Bearer"},)try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])username: str = payload.get("sub")if not username:raise credentials_exceptiontoken_data = TokenData(username=username)except JWTError:raise credentials_exceptiondb = SessionLocal()try:user = db.query(User).filter(User.username == token_data.username).first()if user is None:raise credentials_exceptionreturn userfinally:db.close()
3.4 路由处理
用户注册
@app.post('/register', response_model=UserResponse)
def create_user(user: UserCreate):db = SessionLocal()try:# 检查用户名是否已存在existing_user = db.query(User).filter(User.username == user.username).first()if existing_user:raise HTTPException(status_code=400, detail="用户名已存在")# 创建新用户hashed_password = get_password_hash(user.password)new_user = User(username=user.username, hashed_password=hashed_password)db.add(new_user)db.commit()db.refresh(new_user)return new_userfinally:db.close()
用户登录(获取JWT令牌)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):db = SessionLocal()try:user = db.query(User).filter(User.username == form_data.username).first()if not user or not verify_password(form_data.password, user.hashed_password):raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="用户名或密码错误",headers={"WWW-Authenticate": "Bearer"},)access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token = create_access_token(data={"sub": user.username},expires_delta=access_token_expires)return {"access_token": access_token, "token_type": "bearer"}finally:db.close()
用户信息更新
@app.put('/login/{username}', response_model=UserResponse)
def update_user(username: str,user_update: UserUpdate,current_user: User = Depends(get_current_user)
):db = SessionLocal()try:existing_user = db.query(User).filter(User.username == username).first()if not existing_user:raise HTTPException(status_code=404, detail="用户不存在")existing_user.username = user_update.usernameexisting_user.hashed_password = get_password_hash(user_update.password)db.commit()db.refresh(existing_user)return existing_userexcept Exception as e:db.rollback()raise HTTPException(status_code=500, detail=str(e))finally:db.close()
用户删除
@app.delete('/login/{username}', response_model=dict)
def delete_user(username: str,current_user: User = Depends(get_current_user)
):db = SessionLocal()try:existing_user = db.query(User).filter(User.username == username).first()if not existing_user:raise HTTPException(status_code=404, detail="用户不存在")db.delete(existing_user)db.commit()return {'message': f'用户 {username} 删除成功'}finally:db.close()
用户查询
# 查询单个用户
@app.get('/login/{username}', response_model=UserResponse)
def get_user_name(username: str, current_user: User = Depends(get_current_user)):db = SessionLocal()try:user = db.query(User).filter(User.username == username).first()if not user:raise HTTPException(status_code=404, detail="用户不存在")return userfinally:db.close()# 查询所有用户
@app.get('/login', response_model=list[UserResponse])
def get_all_users(current_user: User = Depends(get_current_user)):db = SessionLocal()try:return db.query(User).all()finally:db.close()
3.5 启动应用
if __name__ == '__main__':import uvicornuvicorn.run("main:app", host='127.0.0.1', port=8000, reload=True)
四、API测试指南
4.1 启动服务
python main.py
服务启动后,访问 http://127.0.0.1:8000/docs
可查看Swagger UI界面。
4.2 测试流程
1.注册用户:
- 方法:POST
/register
- 请求体
{"username": "testuser","password": "testpass"
}
2.登录获取令牌:
- 方法:POST
/token
- 表单数据:
- username: testuser
- password: testpass
3.访问受保护端点:
- 在请求头中添加:
Authorization: Bearer <your_token>
4.更新用户信息:
- 方法:PUT
/login/testuser
- 请求体:
{"username": "newusername","password": "newpassword"
}
5.查询用户信息:
- 方法:GET
/login/testuser
或 GET/login
6.删除用户:
- 方法:DELETE
/login/testuser
五、安全注意事项
-
生产环境配置:
- 使用环境变量存储敏感信息(如SECRET_KEY)
- 使用HTTPS加密通信
- 设置更强的密码哈希算法参数
-
JWT安全:
- 设置合理的令牌过期时间
- 考虑实现令牌刷新机制
- 避免在令牌中存储敏感信息
-
数据库安全:
- 定期备份数据库
- 使用更安全的数据库如PostgreSQL或MySQL
- 实施数据库连接池
六、总结
本文详细介绍了如何使用FastAPI构建一个完整的用户认证系统,涵盖了从数据库模型设计到API实现的各个方面。通过这个项目,你可以学习到:
- FastAPI的基本使用和路由定义
- SQLAlchemy ORM的配置和操作
- JWT身份验证的实现
- 密码安全处理的最佳实践
- RESTful API的设计原则
这个项目可以作为你开发更复杂Web应用的基础模板,根据实际需求进行扩展和优化。希望这篇指南对你的学习有所帮助!