SQLAlchemy 2.0 查询使用指南
SQLAlchemy 2.0 查询使用指南
1. 环境设置
首先,需要安装 SQLAlchemy 2.0 版本。假设你使用的是 SQLite 数据库,可以通过以下命令安装 SQLAlchemy:
pip install sqlalchemy
接着,我们创建数据库连接并初始化会话:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker# 创建数据库引擎
engine = create_engine('sqlite:///example.db', echo=True)# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
2. 定义数据模型
SQLAlchemy 使用 ORM (对象关系映射) 使得 Python 对象和数据库表之间实现映射。以下是定义 User 和 Address 表的模型示例:
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String)age = Column(Integer)# 一对多关系,用户可以有多个地址addresses = relationship('Address', back_populates='user', lazy='select')class Address(Base):__tablename__ = 'addresses'id = Column(Integer, primary_key=True)user_id = Column(Integer, ForeignKey('users.id'))email = Column(String)# 反向关系user = relationship('User', back_populates='addresses')
在这个模型中,User 表和 Address 表通过 user_id 建立了外键关联,用户可以有多个地址。
3. 基本查询操作
SQLAlchemy 2.0 提供了一个更简洁和一致的查询接口。以下是一些常见的查询操作。
3.1 查询所有记录
from sqlalchemy import selectstmt = select(User)
result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
SQL 原生查询:
SELECT * FROM users;
3.2 查询特定字段
查询 name 和 age 字段:
stmt = select(User.name, User.age)
result = session.execute(stmt).all()for name, age in result:print(name, age)
SQL 原生查询:
SELECT name, age FROM users;
3.3 查询带条件的记录
查询年龄大于 30 的用户:
stmt = select(User).where(User.age > 30)
result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
SQL 原生查询:
SELECT * FROM users WHERE age > 30;
3.4 排序查询
根据年龄降序排列查询用户:
stmt = select(User).order_by(User.age.desc())
result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
SQL 原生查询:
SELECT * FROM users ORDER BY age DESC;
3.5 限制查询结果
限制返回的记录数为 5:
stmt = select(User).limit(5)
result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
SQL 原生查询:
SELECT * FROM users LIMIT 5;
4. 关联查询(Join)
4.1 懒查询 (Lazy Loading)
懒查询是 SQLAlchemy 默认的加载方式,只有在访问关联属性时才会执行查询。它通过延迟加载来避免不必要的查询,但是可能导致 “N+1 查询问题”。
# 查询用户,并访问用户的地址
user = session.query(User).first() # 执行查询,不查询 addresses
print(user.name) # 输出用户名称addresses = user.addresses # 当访问 addresses 时,SQLAlchemy 会查询 addresses 表
for address in addresses:print(address.email)
SQL 原生查询:
SELECT * FROM users WHERE id = 1;
-- 查询用户的地址
SELECT * FROM addresses WHERE user_id = 1;
4.2 预加载(Eager Loading)
预加载是通过 JOIN 或子查询一次性加载所有相关数据。常见的预加载方法有 joinedload、subqueryload 和 selectinload。
4.2.1 joinedload
joinedload 会通过 JOIN 一次性加载所有相关数据,适用于关联数据量少的情况。
from sqlalchemy.orm import joinedloadstmt = select(User).options(joinedload(User.addresses))
result = session.execute(stmt).scalars().all()for user in result:print(user.name)for address in user.addresses:print(address.email)
SQL 原生查询:
SELECT * FROM users
JOIN addresses ON users.id = addresses.user_id;
4.2.2 subqueryload
subqueryload 使用子查询来加载关联数据,适用于关联数据较多的情况。
from sqlalchemy.orm import subqueryloadstmt = select(User).options(subqueryload(User.addresses))
result = session.execute(stmt).scalars().all()for user in result:print(user.name)for address in user.addresses:print(address.email)
SQL 原生查询:
SELECT * FROM users;
-- 查询 addresses 表的所有数据
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);
4.2.3 selectinload
selectinload 适用于多对一和一对多的关系,能够通过一次查询批量加载所有关联数据。
from sqlalchemy.orm import selectinloadstmt = select(User).options(selectinload(User.addresses))
result = session.execute(stmt).scalars().all()for user in result:print(user.name)for address in user.addresses:print(address.email)
SQL 原生查询:
SELECT * FROM users;
-- 查询 addresses 表
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);
4.3 延迟加载与预加载的比较
特性 | 懒查询 (Lazy Loading) | 预加载 (Eager Loading) |
---|---|---|
数据加载方式 | 延迟加载,直到访问关联属性时才查询 | 一次性加载所有关联数据 |
查询次数 | 可能多次查询(N+1 查询问题) | 一次性查询,通常只发出少量查询 |
性能 | 对于小数据量高效 | 对于复杂查询避免 N+1 问题,适用于大量关联数据 |
常用方法 | 无 | joinedload、subqueryload、selectinload |
5. 高级查询
5.1 聚合查询
SQLAlchemy 支持聚合函数,如 count、sum、avg 等。以下是计算用户数量的例子:
from sqlalchemy import funcstmt = select(func.count(User.id))
result = session.execute(stmt).scalar()
print(f"User Count: {result}")
SQL 原生查询:
SELECT COUNT(id) FROM users;
5.2 分组查询
通过 group_by 方法进行分组查询:
stmt = select(User.age, func.count(User.id)).group_by(User.age)
result = session.execute(stmt).all()for age, count in result:print(f"Age: {age}, Count: {count}")
SQL 原生查询:
SELECT age, COUNT(id) FROM users GROUP BY age;
5.3 子查询
通过子查询来执行更复杂的查询:
subquery = select(func.max(User.age)).scalar_subquery()
stmt = select(User).where(User.age == subquery)
result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
SQL 原生查询:
SELECT * FROM users WHERE age = (SELECT MAX(age) FROM users);
6. 事务管理
SQLAlchemy 2.0 自动管理事务,但你也可以显式地管理事务。例如:
from sqlalchemy.exc import SQLAlchemyErrortry:new_user = User(name="David", age=28)session.add(new_user)session.commit() # 提交事务
except SQLAlchemyError:session.rollback() # 回滚事务print("事务失败,已回滚")
7. 总结
SQLAlchemy 2.0 提供了强大的 ORM 功能和灵活的查询接口。合理选择懒查询和预加载策略可以有效避免性能问题,特别是对于关系复杂的数据模型,预加载能帮助避免 N+1 查询问题。通过使用 select()、joinedload、subqueryload 等方法,我们可以优化查询性能,提高数据操作的效率。
当然,以下是更详细的 SQLAlchemy 2.0 查询使用指南,包括懒查询(Lazy Loading)、预加载(Eager Loading)以及异步 session 使用示例。我们将通过详细的案例来解释这些概念,帮助团队成员全面理解 SQLAlchemy 2.0 的使用方式。
SQLAlchemy 2.0 查询使用指南
1. 环境设置
首先,确保安装了 SQLAlchemy 2.0:
pip install sqlalchemy
对于异步支持,确保你安装了 asyncpg(适用于 PostgreSQL)或 aiomysql(适用于 MySQL)等异步数据库驱动。
pip install sqlalchemy[asyncio] asyncpg
1.1 创建数据库引擎和会话
数据库连接使用 create_engine() 来创建数据库引擎,会话使用 sessionmaker() 来创建。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker# 创建数据库引擎
engine = create_engine('sqlite+aiosqlite:///example.db', echo=True, future=True)# 创建异步会话
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine# 异步引擎
engine = create_async_engine('sqlite+aiosqlite:///example.db', echo=True, future=True)# 异步会话
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
2. 定义数据模型
SQLAlchemy ORM 通过 declarative_base() 定义数据库模型。这里我们定义 User 和 Address 模型。
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String)age = Column(Integer)# 一对多关系addresses = relationship('Address', back_populates='user', lazy='select')class Address(Base):__tablename__ = 'addresses'id = Column(Integer, primary_key=True)user_id = Column(Integer, ForeignKey('users.id'))email = Column(String)user = relationship('User', back_populates='addresses')
2.1 解释模型
在这个模型中,User 表有一个一对多关系与 Address 表关联,User 表的 addresses 属性代表用户的多个地址,而 Address 表的 user 属性反向关联用户。
3. 基本查询操作
SQLAlchemy 2.0 提供了更加直观的查询接口。以下是一些常见的查询操作。
3.1 查询所有记录
查询 User 表中的所有用户数据:
from sqlalchemy import select# 创建查询语句
stmt = select(User)# 执行查询
with session.begin():result = session.execute(stmt).scalars().all()# 输出结果
for user in result:print(user.name, user.age)
原生 SQL 查询:
SELECT * FROM users;
3.2 查询特定字段
查询 name 和 age 字段的用户数据:
stmt = select(User.name, User.age)with session.begin():result = session.execute(stmt).all()for name, age in result:print(name, age)
原生 SQL 查询:
SELECT name, age FROM users;
3.3 查询带条件的记录
查询年龄大于 30 的用户:
stmt = select(User).where(User.age > 30)with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
原生 SQL 查询:
SELECT * FROM users WHERE age > 30;
3.4 排序查询
按年龄降序排列查询:
stmt = select(User).order_by(User.age.desc())with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
原生 SQL 查询:
SELECT * FROM users ORDER BY age DESC;
3.5 限制查询结果
限制返回前 5 条记录:
stmt = select(User).limit(5)with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
原生 SQL 查询:
SELECT * FROM users LIMIT 5;
4. 关联查询(Join)
SQLAlchemy 提供了强大的关联查询功能。我们可以使用 joinedload、subqueryload 等方法来优化查询。
4.1 懒查询 (Lazy Loading)
懒查询是 SQLAlchemy 默认的加载方式。只有当你访问某个关联属性时,SQLAlchemy 会延迟加载数据。
user = session.query(User).first() # 执行查询,不查询 addresses
print(user.name) # 输出用户名称
addresses = user.addresses # 此时会查询 addresses 表
for address in addresses:print(address.email)
原生 SQL 查询:
SELECT * FROM users WHERE id = 1;
-- 查询地址
SELECT * FROM addresses WHERE user_id = 1;
4.2 预加载(Eager Loading)
为了避免 N+1 查询问题,可以使用预加载。以下是几种常用的预加载方式。
4.2.1 joinedload
joinedload 使用 SQL JOIN 一次性加载所有关联数据,适用于数据量较小的场景。
from sqlalchemy.orm import joinedloadstmt = select(User).options(joinedload(User.addresses))with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name)for address in user.addresses:print(address.email)
原生 SQL 查询:
SELECT * FROM users
JOIN addresses ON users.id = addresses.user_id;
4.2.2 subqueryload
subqueryload 使用子查询加载关联数据,适用于关联数据较多的情况。
from sqlalchemy.orm import subqueryloadstmt = select(User).options(subqueryload(User.addresses))with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name)for address in user.addresses:print(address.email)
原生 SQL 查询:
SELECT * FROM users;
-- 查询 addresses 表的所有数据
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);
4.2.3 selectinload
selectinload 通过批量查询一次性加载关联数据,适用于多对一和一对多的关系。
from sqlalchemy.orm import selectinloadstmt = select(User).options(selectinload(User.addresses))with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name)for address in user.addresses:print(address.email)
原生 SQL 查询:
SELECT * FROM users;
-- 查询 addresses 表
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);
4.3 延迟加载与预加载的比较
特性 | 懒查询 (Lazy Loading) | 预加载 (Eager Loading) |
---|---|---|
数据加载方式 | 延迟加载,直到访问关联属性时才查询 | 一次性加载所有关联数据 |
查询次数 | 可能多次查询(N+1 查询问题) | 一次性查询,通常只发出少量查询 |
性能 | 对于小数据量高效 | 对于复杂查询避免 N+1 问题,适用于大量关联数据 |
常用方法 | 无 | joinedload、subqueryload、selectinload |
5. 高级查询
5.1 聚合查询
SQLAlchemy 支持聚合函数,如 count、sum、avg 等。以下是计算用户数量的例子:
from sqlalchemy import funcstmt = select(func.count(User.id))with session.begin():result = session.execute(stmt).scalar()print(f"User Count: {result}")
SQL 原生查询:
SELECT COUNT(id) FROM users;
5.2 分组查询
通过 group_by 方法进行分组查询:
stmt = select(User.age, func.count(User.id)).group_by(User.age)with session.begin():result = session.execute(stmt).all()for age, count in result:print(f"Age: {age}, Count: {count}")
SQL 原生查询:
SELECT age, COUNT(id) FROM users GROUP BY age;
5.3 子查询
通过子查询来执行更复杂的查询:
subquery = select(func.max(User.age)).scalar_subquery()stmt = select(User).where(User.age == subquery)with session.begin():result = session.execute(stmt).scalars().all()for user in result:print(user.name, user.age)
SQL 原生查询:
SELECT * FROM users WHERE age = (SELECT MAX(age) FROM users);
6. 异步查询使用示例
SQLAlchemy 支持异步查询,适用于需要处理高并发操作的场景。以下是使用异步 session 进行查询的示例。
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.future import select# 创建异步引擎和会话
engine = create_async_engine('sqlite+aiosqlite:///example.db', echo=True, future=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)async def async_query():async with async_session() as session:stmt = select(User)result = await session.execute(stmt)users = result.scalars().all()for user in users:print(user.name)# 执行异步查询
import asyncio
asyncio.run(async_query())
在异步查询中,我们使用 await 来执行查询,确保数据库操作不会阻塞主线程。
7. 总结
SQLAlchemy 2.0 提供了灵活且高效的查询功能,支持懒查询、预加载和异步操作。通过合理使用懒查询和预加载,可以有效地避免 N+1 查询问题,提高应用性能。在异步操作方面,SQLAlchemy 的异步会话使得我们可以在高并发环境下高效地进行数据库查询。