Flask ORM 查询详解:Model.query vs db.session.query vs db.session.execute
Flask ORM 查询详解:Model.query vs db.session.query vs db.session.execute
在 Flask 中使用 SQLAlchemy ORM 进行数据库查询时,主要有三种方式:Model.query
、db.session.query
和 db.session.execute
。下面我将详细解释每种方法的使用方式、结果获取和结果类型。
1. Model.query (推荐用于简单查询)
这是最简洁的查询方式,直接通过模型类进行查询。
基本用法
from models import User # 假设有 User 模型# 查询所有用户
all_users = User.query.all()# 根据ID查询单个用户
user = User.query.get(1)# 带过滤条件的查询
admin_users = User.query.filter_by(role='admin').all()
结果获取方法
方法 | 返回类型 | 描述 | 示例 |
---|---|---|---|
all() | list[Model] | 返回所有结果的列表 | users = User.query.all() |
first() | Model 或 None | 返回第一个结果 | user = User.query.first() |
get(ident) | Model 或 None | 根据主键查询 | user = User.query.get(1) |
one() | Model | 必须有且只有一个结果,否则抛异常 | user = User.query.filter_by(email='admin@ex.com').one() |
one_or_none() | Model 或 None | 有结果返回结果,无结果返回None | user = User.query.filter_by(email='notexist@ex.com').one_or_none() |
count() | int | 返回结果数量 | count = User.query.count() |
paginate() | Pagination 对象 | 分页查询 | page = User.query.paginate(page=1, per_page=20) |
结果处理示例
# 获取所有用户列表
users = User.query.all()
print(type(users)) # <class 'list'>
print(type(users[0])) # <class 'models.User'># 遍历用户
for user in users:print(user.username, user.email)# 分页查询
page = User.query.paginate(page=1, per_page=10)
print(page.items) # 当前页的用户列表
print(page.total) # 总记录数
2. db.session.query (推荐用于复杂查询)
这是更灵活的查询方式,特别适合需要查询多个表或特定字段的场景。
基本用法
from models import User, Post
from sqlalchemy.orm import aliased# 查询所有用户
all_users = db.session.query(User).all()# 查询特定字段
user_emails = db.session.query(User.email).all()# 多表查询
results = db.session.query(User, Post).join(Post, User.id == Post.user_id).all()# 使用别名
ua = aliased(User, name='ua')
admin_users = db.session.query(ua).filter(ua.role == 'admin').all()
结果获取方法
方法 | 返回类型 | 描述 |
---|---|---|
all() | list[Row] 或 list[tuple] | 返回所有结果 |
first() | Row 或 tuple 或 None | 返回第一个结果 |
one() | Row 或 tuple | 必须有且只有一个结果 |
one_or_none() | Row 或 tuple 或 None | 最多一个结果 |
scalar() | 标量值 | 返回第一行第一列的值 |
count() | int | 返回结果数量 |
结果类型处理
# 查询整个模型对象
users = db.session.query(User).all()
# 类型: list[User]
for user in users:print(user.username)# 查询特定字段 - 返回元组
emails = db.session.query(User.email).all()
# 类型: list[Row] (行为类似元组)
for row in emails:print(row[0]) # 索引访问print(row.email) # 属性访问# 查询多个字段
user_data = db.session.query(User.id, User.username).all()
# 类型: list[Row]
for row in user_data:print(f"ID: {row.id}, Username: {row.username}")# 多表查询 - 返回元组
user_posts = db.session.query(User, Post).join(Post).all()
for user, post in user_posts:print(f"{user.username} wrote: {post.title}")# 使用标量值
email_count = db.session.query(func.count(User.email)).scalar()
print(f"Total emails: {email_count}")
3. db.session.execute (用于原生SQL查询)
当需要执行原生SQL时使用此方法,返回结果为 ResultProxy
对象。
基本用法
# 执行原生SQL查询
result = db.session.execute("SELECT * FROM users WHERE role = :role", {'role': 'admin'})# 使用text构造SQL
from sqlalchemy import text
sql = text("SELECT * FROM users WHERE created_at > :start_date")
result = db.session.execute(sql, {'start_date': '2023-01-01'})
结果获取方法
方法 | 返回类型 | 描述 |
---|---|---|
fetchall() | list[RowProxy] | 返回所有结果 |
fetchone() | RowProxy 或 None | 返回一行结果 |
fetchmany(size) | list[RowProxy] | 返回指定数量的结果 |
scalar() | 标量值 | 返回第一行第一列的值 |
keys() | list[str] | 返回列名列表 |
结果类型处理
# 执行查询
result = db.session.execute("SELECT id, username, email FROM users")# 获取列名
print(result.keys()) # ['id', 'username', 'email']# 获取所有行
rows = result.fetchall()
# 类型: list[RowProxy]
for row in rows:# 索引访问print(row[0], row[1], row[2])# 列名访问print(row['id'], row['username'], row['email'])# 转换为字典row_dict = dict(row)print(row_dict)# 获取单行
row = result.fetchone()
if row:print(row.username)# 遍历结果集(适用于大量数据)
result = db.session.execute("SELECT * FROM large_table")
for row in result:process_row(row) # 逐行处理,减少内存占用
三种方法对比
特性 | Model.query | db.session.query | db.session.execute |
---|---|---|---|
易用性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
灵活性 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
返回类型 | 模型对象 | Row对象/元组 | RowProxy对象 |
ORM特性支持 | 完整 | 完整 | 有限 |
原生SQL支持 | 不支持 | 部分支持 | 完整支持 |
性能 | 高 | 高 | 最高 |
适用场景 | 简单CRUD操作 | 复杂查询、连接查询 | 高性能需求、复杂SQL |
最佳实践建议
-
优先使用 Model.query
- 适合大多数简单查询场景
- 代码简洁易读
- 直接返回模型对象
-
复杂查询使用 db.session.query
- 需要多表连接时
- 需要选择特定字段时
- 需要使用SQL函数时
from sqlalchemy import func# 每月新增用户统计 monthly_stats = db.session.query(func.date_trunc('month', User.created_at).label('month'),func.count(User.id).label('count') ).group_by('month').all()
-
只在必要时使用 db.session.execute
- 执行复杂原生SQL
- 需要最高性能时
- 执行数据库特定功能
# 使用数据库特定函数(PostgreSQL示例) result = db.session.execute("""SELECT id, username, json_build_object('id', id, 'name', username) AS user_json FROM users """)
-
结果转换技巧
# 将Row对象转换为字典 def row_to_dict(row):return {column: getattr(row, column) for column in row._fields}# 将查询结果直接转为字典列表 users = db.session.query(User.id, User.username).all() user_dicts = [dict(zip(u._fields, u)) for u in users]# 使用ScalarResult获取单一列的值列表 emails = db.session.query(User.email).scalars().all()
-
性能优化
# 使用yield_per处理大数据集 large_query = User.query.yield_per(100) for user in large_query:process_user(user)# 只选择需要的字段 # 而不是 select * db.session.query(User.id, User.name)
事务管理
所有查询都应该在事务上下文中执行:
@app.route('/users')
def get_users():try:# 查询操作users = User.query.all()# 修改操作new_user = User(name="John")db.session.add(new_user)db.session.commit()return jsonify([u.serialize() for u in users])except Exception as e:db.session.rollback()return jsonify({"error": str(e)}), 500
总结
- 简单查询:优先使用
Model.query
,直接返回模型对象列表 - 复杂查询:使用
db.session.query
,灵活处理多表关联和字段选择 - 原生SQL:使用
db.session.execute
,处理特殊需求 - 结果处理:
- 模型对象:直接访问属性
- Row/RowProxy 对象:索引或列名访问
- 标量值:使用
scalar()
或scalars()
- 性能关键:使用
yield_per
处理大数据,只选择必要字段
根据具体需求选择最合适的查询方式,并在开发过程中注意结果类型的处理,可以大大提高 Flask 应用的数据库操作效率和代码可维护性。