用户登出、修改密码或重置密码后,token的删除(flask)
项目已经实现了用户登录,并在登录之后,接口返回token,在后续的访问中,将token放入请求头里
用户登出、修改密码或重置密码后,需要删除携带的token,用户重新登录
实现token删除有两种方法:
一是写入黑名单文件,后续验证时读取文件里是否有这个token
二是用redis来实现
写黑名单文件的方式
import os
# 写入黑名单文件
def save_blacklist(token):
# 文件目录为函数所在的文件的父级目录(token_blacklist.txt与该文件在同一目录下)
basedir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(basedir, "token_blacklist.txt")
timestamp = int(time.time())
with open(file_path, "a") as file:
file.write(f"{token}\n")
# 读取黑名单文件
def load_blacklist(token):
basedir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(basedir, "token_blacklist.txt")
try:
with open(file_path, "r") as file:
lines = file.read().splitlines()
# 提取token
lines = [line.strip() for line in lines]
return token in blacklist
except FileNotFoundError:
return False
# 验证token
def token_verify():
token = request.headers.get("token")
if not token:
return error_response("token缺失!", 401)
if load_blacklist(token):
return error_response("请重新登录!", 401)
try:
# 这里我用的加密算法是RS256,需要提前生成密钥对
jwt.decode(token, public_key, algorithms=["RS256"])
# 也可以试试SH256算法,对称加密算法,安全性低一点
# jwt.decode(token, "123456", algorithms=["SH256"])
return True
except InvalidSignatureError:
return error_response("token不合法!", 401)
except ExpiredSignatureError:
return error_response("token过期!", 401)
except Exception as e:
# 捕获其他异常
return logger.error(f"Token 验证失败: {e}")
# 验证token功能封装成装饰器
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
result = token_verify()
# 如果 token_verify 返回的是 Response 对象
if isinstance(result, Response):
return result
return f(*args, **kwargs)
return decorated
如果怕文件会越来越大,可以加一个定时任务,每天固定时间去清理过期的token,当然这需要在存token的时候也同时把时间戳一起存上。
# 定期清理token,需要存token的格式为token 时间戳
# file.write(f"{token} {timestamp}\n")
# 获取token
# blacklist = [line.split()[0] for line in lines if line.strip()]
# 清理三天前的token
def clean_old_tokens():
basedir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(basedir, "token_blacklist.txt")
# 获取当前时间
current_time = time.time()
# 三天前的时间戳
three_days_ago = current_time - 3 * 24 * 60 * 60
if not os.path.exists(file_path):
# 文件不存在,无需清理
return
with open(file_path, "r") as file:
lines = file.readlines()
valid_tokens = []
for line in lines:
token, timestamp = line.strip().split()
timestamp = float(timestamp)
if timestamp >= three_days_ago:
valid_tokens.append((token, timestamp))
# 将有效的token写回文件
with open(file_path, "w") as file:
for token, timestamp in valid_tokens:
file.write(f"{token} {timestamp}\n")
print(f"已清理 {len(lines) - len(valid_tokens)} 个过期 token。")
在项目启动文件:
import schedule
# 每天凌晨0点执行清理任务
schedule.every().day.at("00:00").do(clean_old_tokens)
if __name__ == "__main__":
tz_app.run(host=tz_app.config['HOST'],
port=tz_app.config['PORT'],
debug=tz_app.config['DEBUG'])
while True:
schedule.run_pending()
time.sleep(1)
用文件存储的方式还有一点不好,就是在高并发场景下的性能较低。
使用redis
使用redis会比用文件的方式好很多,不需要担心文件太大,redis的存储形式是键值对的形式,并且会自动清理过期的数据。读写速度快,适合高频访问的场景。只需要对redis进行维护。
import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')
# token写入黑名单
def save_blacklist(token):
# 这里设置token过期时间,到期redis会自动将token删除
expires_in = 60 * 60 * 24 * 3
try:
redis_client.setex(token, expires_in, 1)
logger.error("原token移入黑名单")
return True
except ConnectionError:
logger.error("redis连接失败")
return False
except Exception as e:
logger.error("token移入黑名单失败:", e)
return False
# 验证token
def token_verify():
token = request.headers.get("token")
if not token:
return error_response("token缺失!", 401)
# if load_blacklist(token):
if redis_client.exists(token):
return error_response("请重新登录!", 401)
try:
key = current_app.config.get("SECRET_KEY")
jwt.decode(token, public_key, algorithms=["RS256"])
return True
except InvalidSignatureError:
return error_response("token不合法!", 401)
except ExpiredSignatureError:
return error_response("token过期!", 401)
except Exception as e:
# 捕获其他异常
return logger.error(f"Token 验证失败: {e}")
from functools import wraps
from flask import Response
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
result = token_verify()
if isinstance(result, Response): # 如果 token_verify 返回的是 Response 对象
return result
return f(*args, **kwargs)
return decorated
用户登出等接口调用上面的函数就行
执行之后可以检查一下token是否已经存在黑名单里
import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')
if redis_client.exists("token123"):
print("Token 在黑名单中")
else:
print("Token 不在黑名单中")