异步数据库基本代码实现
数据库的查询、过滤和查询总数 total
1.构建查询
2.添加过滤条件 使用 filter
3.查询总数
# 获取过滤后的总条数(关键修正:用 scalar() 提取整数)total_query = select(func.count()).select_from(query.subquery())total_result = await db_async.execute(total_query) # 执行计数查询,得到 Result 对象total = total_result.scalar() # 从 Result 中提取整数(计数结果)
base_query.subquery()
:将base_query
转换为子查询(subquery),相当于 SQL 中FROM
后的子查询语句(如FROM (SELECT ...)
)。select(func.count()).select_from(...)
:构建一个新的查询,作用是统计子查询(即base_query
的结果)的总记录数,等价于 SQL 语句:SELECT COUNT(*) FROM (base_query对应的子查询)
。
使用
base_query.subquery()
本质上是将base_query
的完整逻辑(过滤、关联、分组等)封装成一个 “临时结果集”,然后基于这个结果集计数。这样做的好处是:
- 完全复用
base_query
的逻辑,避免重复代码;- 确保计数结果与
base_query
的实际查询结果严格一致,无论base_query
多复杂。
4.分页
query = query.order_by(UserModel.User.id).offset(skip).limit(limit)result = await db_async.execute(query)users = result.scalars().all()
构建查询 -> 执行任务 -> 获取结果
写个测试函数进行测试:
函数返回的参数