flask 框架的ORM 学习及应用
目录
🔮 Flask ORM简介
📦 安装与配置
🗄️ 定义模型
💎 常见数据库操作(CRUD)
🔍 进阶查询技巧
⚠️ 重要提示
🔗 关联查询与预加载
🔍 复杂过滤与查询
📊 聚合查询与分组统计
🧩 子查询运用
📂 组合查询
⚡ 查询性能优化
💎 总结
🔄 数据库迁移管理
🔄 事务管理与回滚
🎯 高级查询技巧
分页查询
窗口函数(高级分析)
🏗️ 复杂关系模式
多态关联
自引用关系
🔍 高级过滤技巧
数组/JSON字段查询
全文搜索
⚡ 性能优化进阶
查询缓存
批量操作优化
🎭 混合属性和混合表达式
🔐 软删除模式
📊 数据库事件监听
🌐 多数据库支持
🎨 自定义查询方法
🎯 scalar() 的核心作用
基本语法
📚 常见使用场景
1. 聚合函数查询(最常用)
2. 获取单个字段值
3. 存在性检查
🔄 scalar() vs first() vs one()
对比示例
🛠️ 实际应用示例
统计数据分析
业务逻辑中的使用
⚠️ 注意事项
1. 空结果处理
2. 多列查询的问题
3. 性能优化
💎 总结
内容来源于 deepseek, 仅记录学习使用!
flask 框架的ORM 学习及应用,提供常见的基本操作
Flask框架本身不直接包含ORM功能,但可以通过扩展(如Flask-SQLAlchemy)来集成。下面我将为你梳理Flask ORM的基本概念、安装配置、模型定义以及最常用的数据库操作。
🔮 Flask ORM简介
ORM(对象关系映射)允许你使用Python类和对象来操作数据库,而无需直接编写SQL
。在Flask中,这主要通过Flask-SQLAlchemy扩展实现
,它基于SQLAlchemy库,简化了在Flask应用中的数据库操作。
📦 安装与配置
-
安装必要的库
使用pip安装Flask-SQLAlchemy以及对应数据库的驱动,例如连接MySQL时常用pymysql
pip install flask-sqlalchemy pymysql
配置数据库连接
在Flask应用中进行配置,设置数据库的连接字符串
from flask import Flask
from flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://用户名:密码@主机:端口/数据库名'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 关闭事件系统以节省资源
db = SQLAlchemy(app)
🗄️ 定义模型
模型是一个继承自db.Model的Python类,它对应数据库中的一张表。以下是一个用户模型的示例:
class User(db.Model):__tablename__ = 'user' # 指定数据库中表名id = db.Column(db.Integer, primary_key=True) # 主键username = db.Column(db.String(80), unique=True, nullable=False)email = db.Column(db.String(120), unique=True, nullable=False)def __repr__(self):return f'<User {self.username}>'
常用的列类型包括db.Integer、db.String、db.Text、db.Boolean、db.DateTime等。列选项包括primary_key、unique、nullable、default等。
定义好模型后,可以使用db.create_all()创建所有表,但在生产环境中更推荐使用Flask-Migrate进行数据库迁移来管理表结构的变更。
💎 常见数据库操作(CRUD)
以下操作是ORM的核心,均通过db.session进行会话管理,最后需要db.session.commit()提交事务。
| 操作 | 方法 | 代码示例 |
|---|---|---|
| 增加(C) | db.session.add() | new_user = User(username='Alice', email='alice@example.com')db.session.add(new_user)db.session.commit() |
| 查询(R) | Model.query.方法() |
user = User.query.get(1) # 按主键查询 |
| 更新(U) | 直接赋值后提交 | user = User.query.get(1)user.username = 'Bob'db.session.commit() |
| 删除(D) | db.session.delete() | user = User.query.get(1)db.session.delete(user)db.session.commit() |
🔍 进阶查询技巧
除了基本的查询,ORM还支持更复杂的查询操作:
-
过滤:使用
filter进行更灵活的查询,支持比较运算符。
# 查找年龄大于等于18的用户
users = User.query.filter(User.age >= 18).all()
模糊查询:使用like进行模糊匹配。
# 查找用户名中包含'john'的用户
users = User.query.filter(User.username.like('%john%')).all()
逻辑运算:使用and_(), or_(), not_()进行组合条件查询。
-
排序:使用
order_by()进行排序。
# 按用户名升序排列
users = User.query.order_by(User.username).all()
# 按年龄降序排列
users = User.query.order_by(db.desc(User.age)).all()
⚠️ 重要提示
-
会话提交:所有对数据库的修改(增、删、改)操作,都必须通过
db.session.commit()提交后才会真正生效。
-
关系映射:ORM支持定义表之间的关系(一对一、一对多、多对多),通过
db.relationship和db.ForeignKey实现。例如,一个用户拥有多篇文章的场景,可以在用户模型中定义articles = db.relationship('Article', backref='author'),同时在文章模型中定义user_id = db.Column(db.Integer, db.ForeignKey('user.id'))。 -
项目结构:对于稍大的项目,建议将配置(config)、模型(models)和扩展(如db对象)分拆到不同文件中,以避免循环导入问题。
在Flask应用中,尤其是使用SQLAlchemy ORM时,掌握进阶的查询技巧能显著提升开发效率和应用性能。以下是一些更完整的进阶查询技巧,并用表格汇总了核心方法供参考:
| 技巧类别 | 核心方法/概念 | 主要用途 |
|---|---|---|
| 关联查询 | joinedload(), subqueryload() | 避免N+1查询问题,预加载关联数据 |
| 复杂过滤 | filter() 与表达式 | 构建灵活、复杂的查询条件 |
| 聚合与分组 | func.count(), func.sum(), group_by() | 执行数据统计和汇总计算 |
| 子查询 | subquery(), Query 对象作为子句 | 将复杂查询分解为多个步骤 |
| 组合查询 | union(), union_all() | 合并多个查询的结果集 |
| 性能优化 | 索引,批量操作,只查所需字段 | 提升查询速度和应用响应能力 |
🔗 关联查询与预加载
处理表间关系时,避免N+1查询问题是关键。N+1问题是指:当获取一个对象列表(1次查询),然后循环访问每个对象的关联属性时(N次查询),会导致大量数据库查询。SQLAlchemy提供了预加载策略来解决:
from sqlalchemy.orm import joinedload, subqueryload# 使用joinedload一次性加载关联对象(适用于一对一、多对一)
users = db.session.query(User).options(joinedload(User.profile)).all()# 使用subqueryload(适用于一对多、多对多)
blogs = db.session.query(Blog).options(subqueryload(Blog.posts)).all()
通过预加载,关联数据会在最初的主查询中或通过一个额外的查询一次性加载完毕。
🔍 复杂过滤与查询
基础的 filter_by() 功能有限,filter() 方法配合SQLAlchemy的列表达式能构建更复杂的查询条件:
# 模糊查询与多条件组合
from sqlalchemy import and_, or_# 查找标题包含"Flask"且阅读量超过100的文章
posts = Post.query.filter(and_(Post.title.like('%Flask%'),Post.view_count > 100)
).all()# 查找由指定用户发表,或状态为已发布的文章
posts = Post.query.filter(or_(Post.author_id == 1,Post.status == 'published')
).all()
📊 聚合查询与分组统计
需要对数据进行统计和汇总时,可以使用SQLAlchemy的 func 对象进行聚合操作:
from sqlalchemy import func# 计算所有文章的平均阅读量
avg_views = db.session.query(func.avg(Post.view_count)).scalar()# 统计每个分类下的文章数量
counts = db.session.query(Post.category_id,func.count(Post.id).label('post_count')
).group_by(Post.category_id).all()
🧩 子查询运用
对于复杂的查询逻辑,可以将其分解为多个步骤,使用子查询:
# 首先,构建一个子查询,找出每个用户的文章数量
subq = db.session.query(Post.user_id,func.count('*').label('post_count')
).group_by(Post.user_id).subquery()# 然后,在主查询中关联这个子查询,找出文章数量大于5的用户
users_with_many_posts = db.session.query(User).join(subq, User.id == subq.c.user_id
).filter(subq.c.post_count > 5).all()
📂 组合查询
使用 union 或 union_all 可以将多个查询的结果集合并:
# 查询所有发布了的文章和被删除的文章
published_posts = Post.query.filter_by(status='published')
deleted_posts = Post.query.filter_by(status='deleted')
result = published_posts.union_all(deleted_posts).all()
⚡ 查询性能优化
除了查询技巧,性能优化也至关重要:
-
善用索引:为经常用于查询条件、排序或连接的字段创建索引。在模型字段定义中使用
index=True:
class User(db.Model):username = db.Column(db.String(80), index=True) # 为username字段创建索引
批量操作:当需要插入或更新大量数据时,使用批量操作(如 bulk_insert_mappings)可以减少数据库的往返次数,显著提高效率。
-
只查询需要的字段:使用
with_entities()或load_only()指定只获取必要的列,减少数据传输量:
# 只查询用户的ID和用户名
users = User.query.with_entities(User.id, User.username).all()
💎 总结
熟练掌握这些Flask ORM的进阶查询技巧,不仅能让你更高效地获取所需数据,还能更好地优化应用性能。核心在于:合理运用关联查询预加载、灵活构建复杂查询条件、善用聚合与子查询分解复杂逻辑,并始终关注查询性能。
🔄 数据库迁移管理
使用Flask-Migrate进行数据库版本控制:
pip install flask-migrate
from flask_migrate import Migratemigrate = Migrate(app, db)# 命令行操作
# flask db init # 初始化迁移环境
# flask db migrate -m "initial migration" # 生成迁移脚本
# flask db upgrade # 应用迁移
# flask db downgrade # 回滚迁移
🔄 事务管理与回滚
try:user = User(username='john', email='john@example.com')db.session.add(user)profile = Profile(bio='Developer', user_id=user.id)db.session.add(profile)db.session.commit() # 所有操作要么全部成功,要么全部失败
except Exception as e:db.session.rollback() # 发生错误时回滚print(f"操作失败: {e}")
🎯 高级查询技巧
分页查询
# 基本分页
page = request.args.get('page', 1, type=int)
per_page = 20
pagination = User.query.paginate(page=page, per_page=per_page, error_out=False
)users = pagination.items
total_pages = pagination.pages
current_page = pagination.page
窗口函数(高级分析)
from sqlalchemy import over, func# 为每个用户的文章按阅读量排名
subq = db.session.query(Post.id,Post.title,Post.view_count,func.rank().over(order_by=Post.view_count.desc(),partition_by=Post.user_id).label('rank')
).subquery()ranked_posts = db.session.query(subq).filter(subq.c.rank <= 3).all()
🏗️ 复杂关系模式
多态关联
class Comment(db.Model):id = db.Column(db.Integer, primary_key=True)content = db.Column(db.Text)# 多态关联字段commentable_type = db.Column(db.String(50)) # 'post' 或 'article'commentable_id = db.Column(db.Integer)@propertydef commentable(self):if self.commentable_type == 'post':return Post.query.get(self.commentable_id)elif self.commentable_type == 'article':return Article.query.get(self.commentable_id)
自引用关系
class Category(db.Model):id = db.Column(db.Integer, primary_key=True)name = db.Column(db.String(50))parent_id = db.Column(db.Integer, db.ForeignKey('category.id'))# 自引用关系children = db.relationship('Category', backref=db.backref('parent', remote_side=[id]),lazy='dynamic')
🔍 高级过滤技巧
数组/JSON字段查询
# 假设Post.tags是JSON字段存储标签数组
posts_with_python = Post.query.filter(Post.tags.contains(['python'])
).all()# JSON字段特定路径查询
users_with_city = User.query.filter(User.meta['address']['city'].astext == 'Beijing'
).all()
全文搜索
# 使用数据库的全文搜索功能
posts = Post.query.filter(db.func.to_tsvector('english', Post.content).match('flask tutorial')
).all()
⚡ 性能优化进阶
查询缓存
from flask_caching import Cachecache = Cache(config={'CACHE_TYPE': 'SimpleCache'})@cache.memoize(timeout=50)
def get_user_stats(user_id):return db.session.query(func.count(Post.id),func.avg(Post.view_count)).filter(Post.user_id == user_id).first()
批量操作优化
# 批量插入
users = [User(username=f'user{i}') for i in range(1000)]
db.session.bulk_save_objects(users)
db.session.commit()# 批量更新
db.session.query(Post).filter(Post.views < 10).update({'status': 'inactive'}, synchronize_session=False
)
db.session.commit()
🎭 混合属性和混合表达式
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_methodclass User(db.Model):first_name = db.Column(db.String(50))last_name = db.Column(db.String(50))@hybrid_propertydef full_name(self):return f"{self.first_name} {self.last_name}"@full_name.expressiondef full_name(cls):return db.func.concat(cls.first_name, ' ', cls.last_name)@hybrid_methoddef is_older_than(self, age):return db.func.age(self.birth_date) > age@is_older_than.expressiondef is_older_than(cls, age):return db.func.age(cls.birth_date) > age# 使用混合属性进行查询
users = User.query.filter(User.full_name == 'John Doe').all()
🔐 软删除模式
class SoftDeleteMixin:is_deleted = db.Column(db.Boolean, default=False)deleted_at = db.Column(db.DateTime)def soft_delete(self):self.is_deleted = Trueself.deleted_at = db.func.now()db.session.commit()class Article(db.Model, SoftDeleteMixin):id = db.Column(db.Integer, primary_key=True)title = db.Column(db.String(200))# 重写查询基类,自动过滤已删除的记录query_class = SoftDeleteQueryclass SoftDeleteQuery(db.Query):def __new__(cls, *args, **kwargs):obj = super().__new__(cls)with_removed = kwargs.pop('with_removed', False)if len(args) > 0:super(SoftDeleteQuery, obj).__init__(*args, **kwargs)if not with_removed:return obj.filter_by(is_deleted=False)return obj
📊 数据库事件监听
from sqlalchemy import event@event.listens_for(User, 'before_insert')
def before_user_insert(mapper, connection, target):target.created_at = db.func.now()@event.listens_for(Post, 'after_insert')
def after_post_insert(mapper, connection, target):# 更新用户文章计数等操作pass
🌐 多数据库支持
class User(db.Model):__bind_key__ = 'users_db' # 指定使用的数据库id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(80))# 配置多个数据库
app.config['SQLALCHEMY_BINDS'] = {'users_db': 'sqlite:///users.db','posts_db': 'sqlite:///posts.db'
}
🎨 自定义查询方法
class PostQuery(db.Query):def published(self):return self.filter(Post.status == 'published')def by_author(self, author_id):return self.filter(Post.author_id == author_id)def popular(self, min_views=100):return self.filter(Post.view_count >= min_views)class Post(db.Model):query_class = PostQuery# ... 字段定义# 使用自定义查询
popular_posts = Post.query.published().popular(500).all()
scalar() 是 SQLAlchemy 中一个非常实用的方法,主要用于获取查询结果的单个值。让我详细解释它的作用和使用场景:
🎯 scalar() 的核心作用
scalar() 方法会执行查询并返回结果集的第一行的第一列,如果查询没有结果则返回 None。
基本语法
result = db.session.query(SomeColumn).scalar()
📚 常见使用场景
1. 聚合函数查询(最常用)
from sqlalchemy import func# 获取用户总数
user_count = db.session.query(func.count(User.id)).scalar()
print(user_count) # 输出: 42 (直接是整数)# 对比不使用 scalar()
result = db.session.query(func.count(User.id)).first()
print(result) # 输出: (42,) (元组)
print(result[0]) # 输出: 42 (需要索引访问)
2. 获取单个字段值
# 获取特定用户的邮箱
email = db.session.query(User.email).filter(User.id == 1).scalar()
print(email) # 输出: 'john@example.com'# 对比常规方式
user = User.query.filter(User.id == 1).first()
email = user.email if user else None
3. 存在性检查
# 检查用户名是否存在
exists = db.session.query(db.session.query(User).filter(User.username == 'alice').exists()
).scalar()
print(exists) # 输出: True 或 False
🔄 scalar() vs first() vs one()
| 方法 | 返回值 | 空结果时 | 多行时 | 适用场景 |
|---|---|---|---|---|
scalar() | 单个值 | 返回 None | 返回第一行的第一列 | 获取聚合结果、单个字段值 |
first() | 模型实例或元组 | 返回 None | 返回第一行 | 获取完整的第一条记录 |
one() | 模型实例或元组 | 抛出异常 | 抛出异常 | 确保有且只有一条记录 |
对比示例
# 查询用户平均年龄# 使用 scalar() - 推荐
avg_age = db.session.query(func.avg(User.age)).scalar()
print(avg_age) # 输出: 25.6 (浮点数)# 使用 first()
result = db.session.query(func.avg(User.age)).first()
print(result) # 输出: (25.6,) (元组)
avg_age = result[0] if result else None# 使用 one() - 不推荐用于聚合查询
result = db.session.query(func.avg(User.age)).one()
print(result) # 输出: (25.6,) (元组)
🛠️ 实际应用示例
统计数据分析
class UserStats:@classmethoddef get_dashboard_stats(cls):return {'total_users': db.session.query(func.count(User.id)).scalar(),'avg_age': db.session.query(func.avg(User.age)).scalar(),'max_age': db.session.query(func.max(User.age)).scalar(),'active_users': db.session.query(func.count(User.id)).filter(User.is_active == True).scalar()}stats = UserStats.get_dashboard_stats()
print(stats)
# 输出: {'total_users': 150, 'avg_age': 28.5, 'max_age': 65, 'active_users': 120}
业务逻辑中的使用
def can_create_post(user_id):"""检查用户是否可以创建新帖子"""# 获取用户当前帖子数量post_count = db.session.query(func.count(Post.id))\.filter(Post.user_id == user_id)\.scalar() or 0# 获取用户等级限制max_posts = db.session.query(UserLevel.max_posts)\.join(User, User.level_id == UserLevel.id)\.filter(User.id == user_id)\.scalar() or 10return post_count < max_posts
⚠️ 注意事项
1. 空结果处理
# scalar() 在无结果时返回 None
result = db.session.query(User.email).filter(User.id == 999).scalar()
print(result) # 输出: None# 提供默认值
result = db.session.query(User.email).filter(User.id == 999).scalar() or '未知'
2. 多列查询的问题
# 错误用法 - 查询多列时 scalar() 仍只返回第一列
result = db.session.query(User.username, User.email).filter(User.id == 1).scalar()
print(result) # 只返回 username,丢失了 email# 正确做法 - 单列查询或用 first()
user_data = db.session.query(User.username, User.email).filter(User.id == 1).first()
if user_data:username, email = user_data
3. 性能优化
# 使用 scalar() 只获取需要的字段,避免加载整个模型
user_id = db.session.query(User.id).filter(User.username == 'john').scalar()# 比这种方式更高效(如果只需要ID)
user = User.query.filter(User.username == 'john').first()
user_id = user.id if user else None
💎 总结
scalar() 的主要优势:
-
简洁性:直接返回值而非元组或对象
-
可读性:代码意图更明确
-
效率:避免不必要的对象实例化
最佳实践:
-
在聚合查询(count、sum、avg等)中优先使用
scalar() -
获取单个字段值时使用
scalar() -
需要完整对象信息时使用
first() -
确保结果唯一时使用
one()
掌握了 scalar() 的用法,能让你的数据库查询代码更加简洁和高效!
