Flask项目中CSRF Token实现的解决方案
文章目录
- Flask项目中CSRF Token实现的解决方案
- 1. 使用 Flask-WTF 扩展(推荐)
- 安装和基础配置
- 基础实现
- 模板中使用
- 2. 使用 Flask-SeaSurf 扩展
- 安装
- 实现
- 前端使用
- 3. 手动实现CSRF保护
- 完整的自定义实现
- 4. 基于JWT的CSRF保护
- 实现方案
- 5. 双重提交Cookie模式
- 实现方案
- 6. 基于Redis的分布式CSRF保护
- 实现方案
- 实践建议
- 1. 安全性配置
- 2. AJAX请求处理
- 3. 豁免特定路由
- 总结对比
Flask项目中CSRF Token实现的解决方案
Flask作为轻量级Web框架,提供了多种CSRF保护实现方式。以下是主要的几种解决方案:
1. 使用 Flask-WTF 扩展(推荐)
安装和基础配置
pip install flask-wtf
基础实现
from flask import Flask, render_template, request, jsonify
from flask_wtf.csrf import CSRFProtect, generate_csrf, CSRFError
from wtforms import Form, StringField, validatorsapp = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['WTF_CSRF_SECRET_KEY'] = 'csrf-secret-key' # 可选,单独设置CSRF密钥# 初始化CSRF保护
csrf = CSRFProtect(app)class MyForm(Form):name = StringField('Name', [validators.Length(min=4, max=25)])email = StringField('Email', [validators.Email()])@app.route('/form', methods=['GET', 'POST'])
def form_page():form = MyForm(request.form)if request.method == 'POST' and form.validate():# 处理表单数据return f"Hello {form.name.data}!"return render_template('form.html', form=form)# 处理CSRF错误
@app.errorhandler(CSRFError)
def handle_csrf_error(e):return jsonify(error=str(e.description)), 400# 提供获取CSRF Token的API端点
@app.route('/csrf-token', methods=['GET'])
def get_csrf_token():return jsonify(csrf_token=generate_csrf())
模板中使用
<!-- templates/form.html -->
<form method="POST">{{ form.csrf_token }}<p>{{ form.name.label }}<br>{{ form.name(size=32) }}{% if form.name.errors %}<ul class="errors">{% for error in form.name.errors %}<li>{{ error }}</li>{% endfor %}</ul>{% endif %}</p><p>{{ form.email.label }}<br>{{ form.email(size=32) }}</p><p><input type="submit" value="Submit"></p>
</form>
2. 使用 Flask-SeaSurf 扩展
安装
pip install flask-seasurf
实现
from flask import Flask, render_template, request, make_response
from flask_seasurf import SeaSurfapp = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'# 初始化SeaSurf
csrf = SeaSurf(app)@app.route('/')
def index():return render_template('index.html')@app.route('/submit', methods=['POST'])
@csrf.exempt # 如果需要豁免某个路由
def submit():name = request.form.get('name')return f"Hello {name}!"# 手动设置CSRF Token到Cookie
@app.after_request
def set_csrf_cookie(response):if request.method == 'GET':response.set_cookie('csrftoken', csrf._get_token())return response
前端使用
<form method="POST" action="/submit"><input type="hidden" name="_csrf_token" value="{{ session['_csrf_token'] }}"><input type="text" name="name" required><button type="submit">Submit</button>
</form><!-- 或者从Cookie中获取 -->
<script>
function getCookie(name) {let value = "; " + document.cookie;let parts = value.split("; " + name + "=");if (parts.length === 2) return parts.pop().split(";").shift();
}// AJAX请求示例
fetch('/submit', {method: 'POST',headers: {'Content-Type': 'application/json','X-CSRFToken': getCookie('csrftoken')},body: JSON.stringify({name: 'John'})
});
</script>
3. 手动实现CSRF保护
完整的自定义实现
import secrets
from flask import Flask, request, session, render_template, jsonify, make_response
from functools import wrapsapp = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['CSRF_TOKEN_EXPIRES'] = 3600 # 1小时def generate_csrf_token():"""生成CSRF Token"""if '_csrf_token' not in session:session['_csrf_token'] = secrets.token_urlsafe(32)session['_csrf_token_created'] = datetime.now().timestamp()return session['_csrf_token']def validate_csrf_token():"""验证CSRF Token"""token = session.get('_csrf_token')created_time = session.get('_csrf_token_created', 0)current_time = datetime.now().timestamp()# 检查Token是否过期if current_time - created_time > app.config['CSRF_TOKEN_EXPIRES']:session.pop('_csrf_token', None)session.pop('_csrf_token_created', None)return False# 获取提交的Tokensubmitted_token = (request.form.get('csrf_token') or request.headers.get('X-CSRF-Token'))if not token or not submitted_token:return Falsereturn secrets.compare_digest(token, submitted_token)def csrf_protect(f):"""CSRF保护装饰器"""@wraps(f)def decorated_function(*args, **kwargs):if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']:if not validate_csrf_token():return jsonify({'error': 'CSRF token validation failed'}), 403return f(*args, **kwargs)return decorated_function@app.route('/')
def index():csrf_token = generate_csrf_token()response = make_response(render_template('index.html', csrf_token=csrf_token))return response@app.route('/submit', methods=['POST'])
@csrf_protect
def submit():name = request.form.get('name')return f"Hello {name}!"# 提供CSRF Token的API端点
@app.route('/api/csrf-token')
def get_csrf_token():return jsonify({'csrf_token': generate_csrf_token()})
4. 基于JWT的CSRF保护
实现方案
import jwt
import datetime
from flask import Flask, request, jsonify, make_responseapp = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['JWT_SECRET'] = 'jwt-secret-key'def create_csrf_token():"""创建JWT格式的CSRF Token"""payload = {'type': 'csrf','exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1),'iat': datetime.datetime.utcnow()}return jwt.encode(payload, app.config['JWT_SECRET'], algorithm='HS256')def verify_csrf_token(token):"""验证JWT CSRF Token"""try:payload = jwt.decode(token, app.config['JWT_SECRET'], algorithms=['HS256'])return payload.get('type') == 'csrf'except jwt.ExpiredSignatureError:return Falseexcept jwt.InvalidTokenError:return False@app.route('/')
def index():csrf_token = create_csrf_token()response = make_response(render_template('index.html'))response.set_cookie('csrf_token', csrf_token, httponly=False, samesite='Strict')return response@app.route('/submit', methods=['POST'])
def submit():csrf_token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')if not verify_csrf_token(csrf_token):return jsonify({'error': 'Invalid CSRF token'}), 403# 处理业务逻辑return jsonify({'message': 'Success'})
5. 双重提交Cookie模式
实现方案
from flask import Flask, request, session, make_response
import secretsapp = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'def set_csrf_cookies():"""设置CSRF相关的Cookie"""csrf_token = secrets.token_urlsafe(32)response = make_response()response.set_cookie('csrf_token', csrf_token, httponly=False, samesite='Strict')# 存储期望的Token在session中session['expected_csrf_token'] = csrf_tokenreturn response, csrf_tokendef validate_double_submit_cookie():"""验证双重提交Cookie"""cookie_token = request.cookies.get('csrf_token')submitted_token = (request.form.get('csrf_token') or request.headers.get('X-CSRF-Token'))expected_token = session.get('expected_csrf_token')# 验证Cookie中的Token与提交的Token一致,且与期望的Token匹配return (cookie_token and submitted_token and secrets.compare_digest(cookie_token, submitted_token) andsecrets.compare_digest(cookie_token, expected_token))@app.route('/')
def index():response, csrf_token = set_csrf_cookies()response.set_data(render_template('index.html', csrf_token=csrf_token))return response@app.route('/submit', methods=['POST'])
def submit():if not validate_double_submit_cookie():return jsonify({'error': 'CSRF validation failed'}), 403return jsonify({'message': 'Success'})
6. 基于Redis的分布式CSRF保护
实现方案
import redis
import secrets
from flask import Flask, request, session, jsonify
from datetime import datetime, timedeltaapp = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['REDIS_URL'] = 'redis://localhost:6379/0'# 初始化Redis连接
redis_client = redis.from_url(app.config['REDIS_URL'])def generate_csrf_token(user_id=None):"""生成并存储CSRF Token"""token = secrets.token_urlsafe(32)key = f"csrf:{user_id or session.sid}:{token}"# 存储Token,设置过期时间redis_client.setex(key, 3600, 'valid') # 1小时过期return tokendef validate_csrf_token(token, user_id=None):"""验证CSRF Token"""if not token:return Falsekey = f"csrf:{user_id or session.sid}:{token}"# 检查Token是否存在且有效if not redis_client.exists(key):return False# 删除已使用的Token(一次性使用)redis_client.delete(key)return Truedef csrf_required(f):"""CSRF保护装饰器"""@wraps(f)def decorated_function(*args, **kwargs):if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']:token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')if not validate_csrf_token(token):return jsonify({'error': 'CSRF token validation failed'}), 403return f(*args, **kwargs)return decorated_function@app.route('/api/csrf-token')
def get_csrf_token():token = generate_csrf_token()return jsonify({'csrf_token': token})@app.route('/api/protected', methods=['POST'])
@csrf_required
def protected_endpoint():return jsonify({'message': 'CSRF protected endpoint accessed'})
实践建议
1. 安全性配置
# 生产环境配置
app.config.update(SESSION_COOKIE_HTTPONLY=True,SESSION_COOKIE_SECURE=True, # 仅HTTPSSESSION_COOKIE_SAMESITE='Lax',WTF_CSRF_SSL_STRICT=False, # 如果使用子域名需要设置为FalseWTF_CSRF_TIME_LIMIT=3600 # Token有效期
)
2. AJAX请求处理
// 前端AJAX请求示例
function getCSRFToken() {return document.querySelector('meta[name="csrf-token"]').content;
}// 设置全局AJAX头
$.ajaxSetup({beforeSend: function(xhr, settings) {if (!/^(GET|HEAD|OPTIONS|TRACE)$/i.test(settings.type)) {xhr.setRequestHeader("X-CSRFToken", getCSRFToken());}}
});
3. 豁免特定路由
# Flask-WTF豁免
csrf.exempt(api_blueprint)# 装饰器豁免
@app.route('/webhook', methods=['POST'])
@csrf.exempt
def webhook_handler():# 处理webhook,不需要CSRF保护pass
总结对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Flask-WTF | 集成度高,文档完善 | 依赖WTForms | 传统表单应用 |
Flask-SeaSurf | 轻量,专注于CSRF | 功能相对简单 | 需要简单CSRF保护的应用 |
手动实现 | 完全可控,灵活 | 需要自行处理安全细节 | 特殊需求或学习目的 |
JWT方案 | 无状态,适合API | Token较大 | 前后端分离项目 |
双重Cookie | 简单有效 | 需要JavaScript支持 | 现代Web应用 |
Redis方案 | 分布式支持,可撤销 | 依赖Redis | 分布式系统 |
推荐选择:
- 对于传统Flask应用:使用 Flask-WTF
- 对于API或前后端分离:使用 JWT方案 或 双重Cookie模式
- 对于分布式系统:使用 Redis方案
- 对于简单项目:使用 Flask-SeaSurf
选择方案时应根据项目具体需求、团队熟悉度和安全要求来决定。