当前位置: 首页 > news >正文

用户登出、修改密码或重置密码后,token的删除(flask)

项目已经实现了用户登录,并在登录之后,接口返回token,在后续的访问中,将token放入请求头里

用户登出、修改密码或重置密码后,需要删除携带的token,用户重新登录

实现token删除有两种方法:
一是写入黑名单文件,后续验证时读取文件里是否有这个token
二是用redis来实现

写黑名单文件的方式

import os

# 写入黑名单文件
def save_blacklist(token):
	# 文件目录为函数所在的文件的父级目录(token_blacklist.txt与该文件在同一目录下)
    basedir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(basedir, "token_blacklist.txt")
    timestamp = int(time.time())
    with open(file_path, "a") as file:
        file.write(f"{token}\n")

# 读取黑名单文件
def load_blacklist(token):
    basedir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(basedir, "token_blacklist.txt")
    try:
        with open(file_path, "r") as file:
            lines = file.read().splitlines()
            # 提取token
            lines = [line.strip() for line in lines]
            return token in blacklist
    except FileNotFoundError:
        return False

# 验证token
def token_verify():
    token = request.headers.get("token")
    if not token:
        return error_response("token缺失!", 401)
    if load_blacklist(token):
        return error_response("请重新登录!", 401)
    try:
    	# 这里我用的加密算法是RS256,需要提前生成密钥对
        jwt.decode(token, public_key, algorithms=["RS256"])
        # 也可以试试SH256算法,对称加密算法,安全性低一点
        # jwt.decode(token, "123456", algorithms=["SH256"])
        return True
    except InvalidSignatureError:
        return error_response("token不合法!", 401)
    except ExpiredSignatureError:
        return error_response("token过期!", 401)
    except Exception as e:
        # 捕获其他异常
        return logger.error(f"Token 验证失败: {e}")

# 验证token功能封装成装饰器
def login_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        result = token_verify()
        # 如果 token_verify 返回的是 Response 对象
        if isinstance(result, Response):  
            return result
        return f(*args, **kwargs)
    return decorated

如果怕文件会越来越大,可以加一个定时任务,每天固定时间去清理过期的token,当然这需要在存token的时候也同时把时间戳一起存上。

# 定期清理token,需要存token的格式为token 时间戳
#     file.write(f"{token} {timestamp}\n")
# 获取token
#     blacklist = [line.split()[0] for line in lines if line.strip()]

# 清理三天前的token
def clean_old_tokens():
    basedir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(basedir, "token_blacklist.txt")
    # 获取当前时间
    current_time = time.time()
    # 三天前的时间戳
    three_days_ago = current_time - 3 * 24 * 60 * 60

    if not os.path.exists(file_path):
        # 文件不存在,无需清理
        return

    with open(file_path, "r") as file:
        lines = file.readlines()

    valid_tokens = []
    for line in lines:
        token, timestamp = line.strip().split()
        timestamp = float(timestamp)
        if timestamp >= three_days_ago:
            valid_tokens.append((token, timestamp))

    # 将有效的token写回文件
    with open(file_path, "w") as file:
        for token, timestamp in valid_tokens:
            file.write(f"{token} {timestamp}\n")

    print(f"已清理 {len(lines) - len(valid_tokens)} 个过期 token。")

在项目启动文件:

import schedule
# 每天凌晨0点执行清理任务
schedule.every().day.at("00:00").do(clean_old_tokens)
if __name__ == "__main__":
    tz_app.run(host=tz_app.config['HOST'],
                port=tz_app.config['PORT'],
                debug=tz_app.config['DEBUG'])
    while True:
        schedule.run_pending()
        time.sleep(1)

用文件存储的方式还有一点不好,就是在高并发场景下的性能较低。

使用redis

使用redis会比用文件的方式好很多,不需要担心文件太大,redis的存储形式是键值对的形式,并且会自动清理过期的数据。读写速度快,适合高频访问的场景。只需要对redis进行维护。

import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')

# token写入黑名单
def save_blacklist(token):
	# 这里设置token过期时间,到期redis会自动将token删除
    expires_in = 60 * 60 * 24 * 3
    try:
        redis_client.setex(token, expires_in, 1)
        logger.error("原token移入黑名单")
        return True
    except ConnectionError:
        logger.error("redis连接失败")
        return False
    except Exception as e:
        logger.error("token移入黑名单失败:", e)
        return False
        
# 验证token
def token_verify():
    token = request.headers.get("token")
    if not token:
        return error_response("token缺失!", 401)
    # if load_blacklist(token):
    if redis_client.exists(token):
        return error_response("请重新登录!", 401)
    try:
        key = current_app.config.get("SECRET_KEY")
        jwt.decode(token, public_key, algorithms=["RS256"])
        return True
    except InvalidSignatureError:
        return error_response("token不合法!", 401)
    except ExpiredSignatureError:
        return error_response("token过期!", 401)
    except Exception as e:
        # 捕获其他异常
        return logger.error(f"Token 验证失败: {e}")


from functools import wraps
from flask import Response

def login_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        result = token_verify()
        if isinstance(result, Response):  # 如果 token_verify 返回的是 Response 对象
            return result
        return f(*args, **kwargs)
    return decorated

用户登出等接口调用上面的函数就行
执行之后可以检查一下token是否已经存在黑名单里

import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')
if redis_client.exists("token123"):
    print("Token 在黑名单中")
else:
    print("Token 不在黑名单中")

在这里插入图片描述

相关文章:

  • 【AI News | 20250317】每日AI进展
  • ai本地化 部署常用Ollama软件
  • 【算法百题】专题六_模拟
  • 为什么需要强化学习?它解决了什么问题?
  • SwanLab邮件通知插件:训练完成收到邮件,掌握训练进度更及时
  • SQL Server性能优化实战
  • 人工智能实现电脑任务自动化的开源软件
  • 矩阵的逆的实际意义及牛顿法中的作用
  • debian11安装MongoDB
  • 【Agent】OpenManus-Flow-PlanningFlow设计分析
  • AI开发新纪元:MGX多智能体协作平台深度解析
  • 推理大模型的后训练增强技术-从系统1到系统2:大语言模型推理能力的综述
  • 牛客周赛85 DEF Java
  • 深度学习【迭代梯度下降法求解线性回归】
  • 在 macOS Sequoia 15.2 中启用「三指拖动」并实现快速复制的完整指南 ✨
  • 深度学习-简介
  • 学生选课管理系统数据库设计报告
  • Git下载安装(保姆教程)
  • torcharrow gflags版本问题
  • 动作捕捉手套如何让虚拟现实人机交互 “触手可及”?
  • A股低开高走全线上涨:军工股再度领涨,两市成交12934亿元
  • “救护车”转运病人半路加价,从宝鸡到西安往返都要多收钱
  • 特色业务多点开花,苏州银行擦亮金融为民底色
  • 金正恩视察重要军工企业要求推进武力强化变革
  • 央行:5月8日起7天期逆回购操作利率由此前的1.50%调整为1.40%
  • 上海黄浦区拟73.2654亿元协议出让余庆里7宗组合地块