Flask(六) 数据库操作SQLAlchemy
文章目录
- 一、准备工作
- 二、最小化可运行示例
- 三、数据库基本操作(增删改查)
- 1. 插入数据(增)
- 2. 查询数据(查)
- 3. 更新数据(改)
- 4. 删除数据(删)
- 四、其他有用方法
- 五、常用字段类型
- 六、初始化数据库脚本(推荐)
- sqlalchemy 实例
- 基本使用
- 常见方法速查
- 多表查询(JOIN)
- 原始 SQL 语句(可选)
- 示例:分页 + 排序
- 推荐:使用 Flask-SQLAlchemy 提供的简写风格
- 完整的 Flask-SQLAlchemy 配置示例(含连接池/超时/调试配置)
- ✅ 1. config.py 配置文件
- ✅ 2. app.py 应用初始化
- ✅ 3. models.py 数据模型定义
- ✅ 4. init_db.py 数据库初始化脚本
- 使用 PyMySQL 连接 MySQL,app.py 文件代码:
- 使用 SQLAlchemy:定义模型,配置数据库,执行基本的 CRUD 操作。
- 创建和管理数据库:使用 db.create_all() 创建表。
- CRUD 操作:添加、读取、更新和删除记录。
- 查询操作:执行基本和复杂查询,包括排序和分页。
- Flask-Migrate:使用 Flask-Migrate 管理数据库迁移。
- 执行原始 SQL:使用原始 SQL 语句进行数据库操作
一、准备工作
pip install flask flask_sqlalchemy
二、最小化可运行示例
from flask import Flask
from flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///mydb.sqlite3'
#或者
#app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://user:password@localhost/db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falsedb = SQLAlchemy(app)# 定义数据模型(表)
class User(db.Model):id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(80), unique=True, nullable=False)age = db.Column(db.Integer)# 创建表
with app.app_context():db.create_all()# 启动应用
@app.route('/')
def index():return '数据库已初始化'
部分 | 含义 |
---|---|
'sqlite:///mydatabase.db' | 使用 SQLite 数据库,数据库文件保存在当前目录 |
SQLAlchemy(app) | 绑定 Flask 应用,初始化 ORM 引擎 |
SQLALCHEMY_TRACK_MODIFICATIONS | 关闭事件监听,提升性能,防止警告 |
三、数据库基本操作(增删改查)
1. 插入数据(增)
with app.app_context():user = User(username='alice', age=20)db.session.add(user)db.session.commit()
db.session.add(user)
:将新用户对象添加到会话中。
db.session.commit()
:提交事务,将更改保存到数据库。
2. 查询数据(查)
# 查询所有
User.query.all()# 查询第一条
User.query.first()# 条件查询
User.query.filter_by(username='alice').first()# 更复杂的筛选
User.query.filter(User.age >= 18).all()
User.query.all()
:查询所有用户记录。
3. 更新数据(改)
with app.app_context():user = User.query.filter_by(username='alice').first()if user:user.age = 25db.session.commit()
@app.route('/update_user/<int:user_id>')
def update_user(user_id):user = User.query.get(user_id)if user:user.username = 'new_username'db.session.commit()return 'User updated!'return 'User not found!'
User.query.get(user_id)
:通过主键查询单个用户记录。
更新字段值并提交事务。
4. 删除数据(删)
with app.app_context():user = User.query.filter_by(username='alice').first()if user:db.session.delete(user)db.session.commit()
db.session.delete(user)
:删除用户记录,并提交事务。
四、其他有用方法
方法 / 类 | 功能 |
---|---|
db.Model | 所有模型基类 |
db.Column | 定义字段类型 |
db.session.add(obj) | 添加记录 |
db.session.commit() | 提交更改 |
User.query.filter(...) | ORM 查询 |
db.create_all() | 创建所有表 |
db.drop_all() | 删除所有表 |
五、常用字段类型
类型 | 含义 |
---|---|
db.String(n) | 字符串(最长 n) |
db.Integer | 整数 |
db.Boolean | 布尔值 |
db.DateTime | 时间戳 |
db.Float | 浮点数 |
六、初始化数据库脚本(推荐)
可以加一个 Python 脚本 init_db.py,自动初始化数据库:
from app import db
from app import app # 确保 Flask app 存在with app.app_context():db.create_all()print("数据库已初始化!")
db.create_all()
:创建所有在当前上下文中定义的模型对应的表。
sqlalchemy 实例
基本使用
假设你有模型:
class User(db.Model):__tablename__ = 'users'id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(80), unique=True)
✅
__tablename__ = "users"
在 SQLAlchemy 中,这是在 ORM 模型类中定义 对应的数据库表名。
✅ 不加__tablename__
会怎样?
SQLAlchemy 默认使用类名的小写作为表名,如 User → user
但不推荐依赖默认行为 —— 明确指定更安全、更清晰
模型是数据库表的 Python 类,每个模型类代表数据库中的一张表。
✅ 推荐命名风格
模型类名 表名 建议写法 User
users
__tablename__ = "users"
Order
orders
__tablename__ = "orders"
UserOrder
user_orders
__tablename__ = "user_orders"
你可以:
# 查询所有用户
users = db.session.query(User).all()# 查询用户名为 'root' 的用户
user = db.session.query(User).filter_by(username='root').first()# 查询 ID 大于 10 的用户
users = db.session.query(User).filter(User.id > 10).all()
常见方法速查
方法 | 说明 |
---|---|
query(Model) | 创建查询对象 |
.filter(condition) | 过滤条件,支持表达式 |
.filter_by(field=value) | 简写形式,等同于 == 比较 |
.first() | 获取第一条记录或 None |
.all() | 获取所有结果(列表) |
.count() | 获取记录数量 |
.limit(n) | 限制返回数量 |
.offset(n) | 跳过前 n 行记录 |
.order_by(Model.field) | 排序(可加 .desc() ) |
.one() / .one_or_none() | 获取一条记录(不唯一会报错) |
.distinct() | 去重 |
多表查询(JOIN)
db.session.query(User, Post).join(Post, User.id == Post.user_id).all()
from sqlalchemy import or_users = User.query.filter(or_(User.username == 'john_doe', User.email == 'john@example.com')).all()
or_()
:用于执行复杂的查询条件。
原始 SQL 语句(可选)
from sqlalchemy import text
db.session.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 1}).fetchall()
db.session.execute()
:执行原始 SQL 查询。
示例:分页 + 排序
users = db.session.query(User) \.filter(User.username.like('%admin%')) \.order_by(User.id.desc()) \.offset(0).limit(10) \.all()
order_by()
:按指定字段排序。
paginate()
:分页查询。
推荐:使用 Flask-SQLAlchemy 提供的简写风格
User.query.filter_by(username='root').first()
User.query.filter(User.id > 10).all()
完整的 Flask-SQLAlchemy 配置示例(含连接池/超时/调试配置)
推荐项目结构(简化)
your_project/
├── app.py
├── config.py ← 配置集中管理
├── models.py ← 数据模型
└── init_db.py ← 初始化数据库
✅ 1. config.py 配置文件
import osclass Config:# ✅ 数据库连接(选你要的数据库)# SQLite(开发测试用)SQLALCHEMY_DATABASE_URI = 'sqlite:///mydb.sqlite3'# MySQL 示例# SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/mydb?charset=utf8mb4'# PostgreSQL 示例# SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost/mydb'# ✅ SQLAlchemy 设置SQLALCHEMY_TRACK_MODIFICATIONS = False# ✅ 调试设置DEBUG = True# ✅ 数据库连接池设置SQLALCHEMY_ENGINE_OPTIONS = {"pool_size": 10, # 最大连接数"max_overflow": 5, # 超出 pool_size 后允许的连接数"pool_timeout": 30, # 获取连接最大等待时间(秒)"pool_recycle": 1800, # 自动重连时间(秒)防止 MySQL 8 小时断开"echo": True, # 输出执行 SQL(调试用)}# ✅ 密钥(如启用 Session)SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-key')
数据库连接池说明(MySQL/PostgreSQL 推荐)
参数 | 含义 |
---|---|
pool_size | 最大连接数 |
max_overflow | 超出最大连接数后的额外连接数 |
pool_timeout | 获取连接等待时长(秒) |
pool_recycle | 多久重置连接(避免断线),MySQL 默认 8 小时(28800 秒)断连接,推荐 pool_recycle = 1800。 |
echo | 是否打印 SQL(调试用) |
✅ 2. app.py 应用初始化
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config import Configdb = SQLAlchemy()def create_app():app = Flask(__name__)app.config.from_object(Config)db.init_app(app)# 注册路由/蓝图@app.route('/')def index():return 'Hello Flask + SQLAlchemy'return app
✅ 3. models.py 数据模型定义
from app import dbclass User(db.Model):__tablename__ = 'users'id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(64), unique=True, nullable=False)age = db.Column(db.Integer)
✅ 4. init_db.py 数据库初始化脚本
from app import create_app, db
from models import *app = create_app()with app.app_context():db.create_all()print("✅ 数据库已初始化")
运行方式:
python init_db.py
使用 PyMySQL 连接 MySQL,app.py 文件代码:
from flask import Flask, request, jsonify
import pymysqlapp = Flask(__name__)def get_db_connection():connection = pymysql.connect(host='localhost',user='username',password='password',database='dbname',cursorclass=pymysql.cursors.DictCursor)return connection@app.route('/add_user', methods=['POST'])
def add_user():data = request.jsonname = data['name']email = data['email']connection = get_db_connection()with connection.cursor() as cursor:cursor.execute('INSERT INTO user (username, email) VALUES (%s, %s)', (name, email))connection.commit()connection.close()return 'User added!'@app.route('/get_users')
def get_users():connection = get_db_connection()with connection.cursor() as cursor:cursor.execute('SELECT * FROM user')users = cursor.fetchall()connection.close()return jsonify(users)if __name__ == '__main__':app.run(debug=True)