当前位置: 首页 > news >正文

FastAPI + SQLAlchemy (异步版)连接数据库时,对数据进行加密

简介:此部分内容为,在FastAPI + SQLAlchemy (异步版)连接数据库时,需要对保存在数据库中的API_KEY 以及用户密码进行加密时所著:

一、AES-GCM对称加密步骤

1.特点 :

  • 单一密钥:加密和解密使用相同的密钥(对称加密),需通过安全方式(如环境变量、密钥管理系统)存储和传输密钥。
  • 密钥长度:支持 128、192、256 位密钥(推荐 256 位以获得最高安全性)。

2.加解密步骤: 

1.生成密钥:

密钥一般存在环境变量中,使用安全的随机数生成器(如 Python 的 os.urandom):

generate_aes_gcm_key.py:

import os
import base64# 1. 生成 AES-256 密钥
def generate_aes_gcm_key(key_size: int = 32) -> bytes:"""生成指定长度的 AES-GCM 密钥(字节串)。:param key_size: 16(AES-128)、24(AES-192)、32(AES-256):return: 密钥(bytes)"""if key_size not in (16, 24, 32):raise ValueError("Key size must be 16, 24, or 32 bytes")return os.urandom(key_size)# 2. 将密钥转换为 Base64 字符串(便于存储到环境变量)
def b64encode_generated_key(generated_key: bytes) ->str:key_base64 = base64.b64encode(generated_key).decode()# print("Base64 编码的密钥:", key_base64)return key_base64# 只运行一次,确保全流程中密钥统一
if __name__ == "__main__":generated_key = generate_aes_gcm_key()key_base64 = b64encode_generated_key(generated_key)print("编码后的密钥:",key_base64) # 需要手动保存到环境变量中(.env)

2.将generate_aes_gcm_key.py中生成的密钥 AES_KEY 手动复制添加到 .env文件中:

3.在 config.py 文件中加载环境变量

from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os# 加载环境变量(仅本地开发)
load_dotenv()class DifySetting(BaseSettings):MYSQL_HOST: strMYSQL_PORT: int = 3306MYSQL_USER: strMYSQL_PASSWORD: strMYSQL_NAME: strAPP_ENV: str = "dev"AES_KEY: str  # AES 密钥(Base64 编码)AES_GCM_NONCE_SIZE: int = 12  # 注意,环境变量中存在的值,在DifySetting这个类中也必须包含class Config:env_file =".env"env_file_encoding = "utf-8"# 全局 AES-GCM 实例
dify_settings = DifySetting()

4.编写 aes_gcm_security.py 加密、解密函数:

import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from app.core.config import dify_settings  # 假设这是你的配置模块(包含 AES_KEY)
import os
from cryptography.exceptions import InvalidTag# 1. 初始化 AESGCM 对象
def load_aes_key() -> AESGCM:key_bytes = base64.b64decode(dify_settings.AES_KEY)  # Base64 → bytesreturn AESGCM(key_bytes)  # 创建 AESGCM 实例aesgcm = load_aes_key()# 2. 加密函数 - 返回单个组合字符串
def encrypt_aes_gcm_combined(plaintext: str) -> str:"""使用 AES-GCM 加密明文字符串,返回组合字符串(IV+密文的Base64编码):param plaintext: 明文(字符串):return: Base64编码的字符串(前16字符为IV,后面为密文)"""plaintext_bytes = plaintext.encode("utf-8")iv = os.urandom(12)  # 生成12字节随机IVciphertext = aesgcm.encrypt(iv, plaintext_bytes, None)# 拼接IV和密文后整体进行Base64编码combined = iv + ciphertextreturn base64.b64encode(combined).decode("utf-8")# 3. 解密函数 - 从组合字符串解密
def decrypt_aes_gcm_combined(combined_base64: str) -> str:"""从组合字符串解密出原始明文:param combined_base64: Base64编码的组合字符串(IV+密文):return: 明文(字符串)"""try:combined = base64.b64decode(combined_base64)iv = combined[:12]  # 前12字节为IVciphertext = combined[12:]  # 剩余部分为密文plaintext_bytes = aesgcm.decrypt(iv, ciphertext, None)return plaintext_bytes.decode("utf-8")except InvalidTag:raise ValueError("解密失败:认证标签无效(密钥或数据损坏)")except Exception as e:raise ValueError(f"解密失败:{str(e)}")

 5. 在编写 FastAPI 时调用加密、解密函数;

from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select, exc
from app.database.database import db_dependency
from app.models.dify_models_ORM import Agent
# 导入优化后的加密函数
from app.core.aes_gcm_security import encrypt_aes_gcm_combined, decrypt_aes_gcm_combineddify_router = APIRouter()# 定义 Pydantic 模型
class AgentResponse(BaseModel):id: intagent_name: stragent_describe: stragent_url: stragent_Content_Type: stragent_api_key: str  # 返回解密后的API Keyuser: strcreated_at: datetimeclass Config:from_attributes = Trueclass CreateAgentRequest(BaseModel):agent_name: stragent_describe: strurl: strapi_key: str  # 接收明文API Keycontent_type: str = "application/json"user: str# 创建 Agent - 使用优化后的加密方法
@dify_router.post("/dify_agents", status_code=status.HTTP_201_CREATED)
async def create_agent(request: CreateAgentRequest, db: db_dependency):try:# 使用新的组合加密方法combined_ciphertext = encrypt_aes_gcm_combined(request.api_key)db_agent = Agent(agent_name=request.agent_name,agent_describe=request.agent_describe,agent_url=request.url,agent_api_key=combined_ciphertext,  # 存储组合密文agent_Content_Type=request.content_type,user=request.user,created_at=datetime.utcnow())db.add(db_agent)await db.commit()await db.refresh(db_agent)return {"agent_id": db_agent.id}except exc.IntegrityError:await db.rollback()raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="API Key 已存在")except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询单个 Agent - 使用优化后的解密方法
@dify_router.get("/dify_agents/{agent_id}", status_code=status.HTTP_200_OK)
async def read_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalars().first()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")# 解密逻辑 - 使用新的组合解密方法try:decrypted_key = decrypt_aes_gcm_combined(agent.agent_api_key)# 创建代理对象副本,避免直接修改ORM对象,即返回为此副本中数据agent_data = {"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"agent_api_key": decrypted_key,  # 使用解密后的密钥"user": agent.user,"created_at": agent.created_at}return AgentResponse(**agent_data)except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail=f"解密失败: {str(e)}")except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询所有 Agents - 不返回敏感API Key
@dify_router.get("/dify_agents", status_code=status.HTTP_200_OK)
async def read_agents(db: db_dependency):try:result = await db.execute(select(Agent))agents = result.scalars().all()# 返回不包含敏感API Key的数据safe_agents = []for agent in agents:safe_agents.append({"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"user": agent.user,"created_at": agent.created_at})return safe_agentsexcept Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 删除 Agent - 保持不变
@dify_router.delete("/dify_agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalar()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")await db.delete(agent)await db.commit()except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))return None

二、哈希算法加密用户密码:

1.特点:

  • 密码哈希算法是​​单向的​​(不可逆),专门为存储密码(等无需还原原来明文信息所设计);
  • 抗暴力破解​​:通过加盐(Salt)和多次迭代(Work Factor)增加计算成本;

2.加密步骤: 

http://www.dtcms.com/a/274695.html

相关文章:

  • C++(STL源码刨析/List)
  • [Meetily后端框架] Whisper转录服务器 | 后端服务管理脚本
  • 如何从0开始构建自己的第一个AI应用?(Prompt工程、Agent自定义、Tuning)
  • MyBatis:SQL与Java的智能桥梁
  • Ant Design ProTable组件深度解析
  • CUDA —— 2.3、cuda静态全局变量__device__使用介绍(附:完整代码)
  • 系统思考:多元胜过能力
  • 计算机网络第三章(5)——数据链路层《广域网》
  • 解锁形状与空间的奥秘:微分几何与流形一瞥-AI云计算拓展核心内容
  • 【C++篇】二叉树进阶(上篇):二叉搜索树
  • 云蝠智能 VoiceAgent重构企业呼入场景服务范式
  • Ubuntu20.04运行openmvg和openmvs实现三维重建(未成功,仅供参考)
  • PyTorch笔记5----------Autograd、nn库
  • 《棒球规则介绍》领队和主教练谁说了算·棒球1号位
  • sqli-labs靶场通关笔记:第1-4关 联合注入
  • ros topic和service的使用
  • 深入浅出Redis:一文掌握Redis底层数据结构与实现原理
  • Java Stream流介绍及使用指南
  • GIC控制器 (三)
  • 猿人学js逆向比赛第一届第十八题
  • 【一起来学AI大模型】微调技术:LoRA(Low-Rank Adaptation) 的实战应用
  • Linux kernel regcache_cache_only()函数详解
  • pytest中mark的使用
  • SpringCloud之Feign
  • 深入探讨大模型的记忆机制及其前沿技术
  • 数据结构与算法——从递归入手一维动态规划【2】
  • 极端高温下的智慧出行:危险检测与救援
  • AI介入电商内容生产,会颠覆品牌运营吗?
  • 打破内网壁垒,轻松实现安防视频的云端汇聚与P2P超低延迟播放
  • 史上最详细Java并发多线程(面试必备,一篇足矣)