【ORM】-1-SQLAlchemy介绍,以及原生sql应用
文章目录
- 1. SQLAlchemy概述
- 2. 安装与基本配置
- 3. 核心概念
- 3.1 引擎(Engine)
- 3.2 会话(Session)
- 4. 声明式基类与模型定义
- 4.1 基础模型定义
- 4.2 创建表
- 5. 基本CRUD操作
- 5.1 创建(Create)
- 5.2 查询(Read)
- 5.3 更新(Update)
- 5.4 删除(Delete)
- 6. 高级查询技巧
- 6.1 连接查询
- 6.2 聚合函数
- 6.3 子查询
- 7. 关系管理
- 7.1 一对多关系
- 7.2 多对多关系
- 8. 高级特性
- 8.1 混合属性(Hybrid Attributes)
- 8.2 事件监听
- 8.3 数据库迁移(Alembic)
- 9. 性能优化
- 9.1 延迟加载与急切加载
- 9.2 批量操作
- 10. 事务管理
- 10.1 基本事务
- 10.2 嵌套事务
- 11. 最佳实践
- 11.1 会话管理
- 11.2 配置优化
- 12. 常见问题与解决方案
- 12.1 延迟加载问题
- 12.2 并发更新
- 13. SQLAlchemy中手写SQL的场景
- 13.1 使用`text()`构造文本SQL
- 13.2 使用`Table`对象的`insert`、`update`、`delete`方法(Core方式)
- 13.3 使用ORM的批量操作(如`bulk_insert_mappings`)进行高效批量插入
- 13.4 存储过程和函数调用
- 总结
1. SQLAlchemy概述
SQLAlchemy是一个Python的SQL工具包和对象关系映射(ORM)框架,它提供了完整的企业级持久化模式,专为高效和高性能的数据库访问而设计。SQLAlchemy的核心思想是:SQL数据库的行为不像对象集合,而对象集合的行为又不像表和数据行。因此,它提供了一个ORM层来将对象映射到数据库表,同时也可以直接使用SQL表达式语言(不依赖ORM)进行数据库操作。
SQLAlchemy的主要组件包括:
- 核心:包括SQL表达式语言、数据库连接池、事务管理等。
- ORM:建立在核心之上,提供了一种将Python类映射到数据库表的方式。
SQLAlchemy是Python中最流行的ORM(对象关系映射)框架之一,它提供了:
- ORM:高级的对象关系映射
- Core:底层的SQL表达式语言
- Engine:数据库连接和方言处理
2. 安装与基本配置
pip install sqlalchemy
3. 核心概念
3.1 引擎(Engine)
from sqlalchemy import create_engine# 创建数据库引擎
engine = create_engine('sqlite:///example.db', echo=True)
# 其他数据库示例:
# PostgreSQL: postgresql://user:password@localhost/mydatabase
# MySQL: mysql+pymysql://user:password@localhost/mydatabase
3.2 会话(Session)
from sqlalchemy.orm import sessionmakerSession = sessionmaker(bind=engine)
session = Session()
4. 声明式基类与模型定义
4.1 基础模型定义
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(50))fullname = Column(String(50))nickname = Column(String(50))# 一对多关系addresses = relationship("Address", back_populates="user")def __repr__(self):return f"<User(name='{self.name}', fullname='{self.fullname}')>"class Address(Base):__tablename__ = 'addresses'id = Column(Integer, primary_key=True)email_address = Column(String(50), nullable=False)user_id = Column(Integer, ForeignKey('users.id'))# 多对一关系user = relationship("User", back_populates="addresses")
4.2 创建表
Base.metadata.create_all(engine)
5. 基本CRUD操作
5.1 创建(Create)
# 创建新用户
new_user = User(name='john', fullname='John Doe', nickname='johnd')
session.add(new_user)
session.commit()# 批量添加
session.add_all([User(name='alice', fullname='Alice Smith'),User(name='bob', fullname='Bob Johnson')
])
session.commit()
5.2 查询(Read)
# 查询所有用户
users = session.query(User).all()# 条件查询
john = session.query(User).filter_by(name='john').first()
johns = session.query(User).filter(User.name.like('%john%')).all()# 复杂查询
from sqlalchemy import and_, or_
result = session.query(User).filter(or_(User.name == 'john',User.name == 'alice')
).order_by(User.id.desc()).limit(10).all()
5.3 更新(Update)
# 修改对象属性
user = session.query(User).filter_by(name='john').first()
user.nickname = 'johnny'
session.commit()# 批量更新
session.query(User).filter(User.name == 'john').update({'nickname': 'johnny'}, synchronize_session=False
)
session.commit()
5.4 删除(Delete)
# 删除对象
user = session.query(User).filter_by(name='john').first()
session.delete(user)
session.commit()# 批量删除
session.query(User).filter(User.name == 'john').delete()
session.commit()
6. 高级查询技巧
6.1 连接查询
# 内连接
result = session.query(User, Address).filter(User.id == Address.user_id).all()# 使用join
result = session.query(User).join(Address).filter(Address.email_address == 'john@example.com').all()
6.2 聚合函数
from sqlalchemy import func# 计数
count = session.query(func.count(User.id)).scalar()# 分组统计
result = session.query(User.name, func.count(Address.id)).join(Address).group_by(User.name).all()
6.3 子查询
from sqlalchemy.sql import labelsubq = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery()result = session.query(User.name, subq.c.address_count).join(subq, User.id == subq.c.user_id).all()
7. 关系管理
7.1 一对多关系
# 添加关联对象
user = User(name='test')
user.addresses = [Address(email_address='test1@example.com'),Address(email_address='test2@example.com')
]
session.add(user)
session.commit()# 通过关系查询
user = session.query(User).filter_by(name='test').first()
addresses = user.addresses # 自动加载关联的地址
7.2 多对多关系
# 定义关联表
association_table = Table('association', Base.metadata,Column('left_id', Integer, ForeignKey('left.id')),Column('right_id', Integer, ForeignKey('right.id'))
)class Parent(Base):__tablename__ = 'left'id = Column(Integer, primary_key=True)children = relationship("Child", secondary=association_table)class Child(Base):__tablename__ = 'right'id = Column(Integer, primary_key=True)
8. 高级特性
8.1 混合属性(Hybrid Attributes)
from sqlalchemy.ext.hybrid import hybrid_propertyclass User(Base):# ... 其他字段 ...@hybrid_propertydef full_name(self):return f"{self.first_name} {self.last_name}"@full_name.expressiondef full_name(cls):return func.concat(cls.first_name, ' ', cls.last_name)
8.2 事件监听
from sqlalchemy import event@event.listens_for(User, 'before_insert')
def before_insert_listener(mapper, connection, target):target.created_at = datetime.now()@event.listens_for(Session, 'before_commit')
def before_commit_listener(session):# 在提交前执行操作pass
8.3 数据库迁移(Alembic)
# 安装Alembic
pip install alembic# 初始化
alembic init alembic# 创建迁移脚本
alembic revision --autogenerate -m "添加新字段"# 执行迁移
alembic upgrade head
9. 性能优化
9.1 延迟加载与急切加载
# 延迟加载(默认)
user = session.query(User).first()
addresses = user.addresses # 此时才执行查询# 急切加载
from sqlalchemy.orm import joinedload, subqueryload# 使用joinedload
user = session.query(User).options(joinedload(User.addresses)).first()# 使用subqueryload
users = session.query(User).options(subqueryload(User.addresses)).all()
9.2 批量操作
# 批量插入
session.bulk_save_objects([User(name=f'user{i}') for i in range(1000)
])# 批量更新
session.bulk_update_mappings(User, [{'id': 1, 'name': 'new_name1'},{'id': 2, 'name': 'new_name2'}
])
10. 事务管理
10.1 基本事务
try:user = User(name='test')session.add(user)session.commit()
except Exception as e:session.rollback()print(f"事务失败: {e}")
10.2 嵌套事务
with session.begin_nested():# 嵌套事务中的操作session.add(User(name='nested'))# 如果出现异常,只回滚嵌套事务
11. 最佳实践
11.1 会话管理
# 使用上下文管理器管理会话
from contextlib import contextmanager@contextmanager
def session_scope():"""提供事务范围的会话"""session = Session()try:yield sessionsession.commit()except Exception:session.rollback()raisefinally:session.close()# 使用示例
with session_scope() as session:user = User(name='test')session.add(user)
11.2 配置优化
# 生产环境配置
engine = create_engine('postgresql://user:pass@localhost/dbname',pool_size=10,max_overflow=20,pool_recycle=3600,echo_pool=True
)
12. 常见问题与解决方案
12.1 延迟加载问题
# 在会话关闭后访问关系属性会导致错误
user = session.query(User).first()
session.close()
# addresses = user.addresses # 这会报错# 解决方案:使用急切加载或在会话内访问
12.2 并发更新
from sqlalchemy.orm import with_for_update# 使用行级锁
user = session.query(User).with_for_update().filter_by(id=1).first()
user.name = 'new_name'
session.commit()
13. SQLAlchemy中手写SQL的场景
尽管SQLAlchemy提供了强大的ORM功能,但在某些场景下,我们仍然需要手写SQL语句。以下是一些常见的情况和原因:
- 复杂查询:当遇到非常复杂的查询,涉及多个表连接、子查询、窗口函数等,ORM可能无法轻松表达,或者生成的SQL效率不高。
- 数据库特定功能:例如,使用某些数据库特有的函数或特性(如PostGIS、JSON操作等),ORM可能不支持或支持不够好。
- 性能优化:在性能敏感的场合,可能需要对SQL进行精细优化,而ORM生成的SQL可能不够优化。
- 批量操作:对于大批量的插入、更新操作,使用ORM可能效率较低,而手写SQL可以使用批量操作提高性能。
- 存储过程:调用存储过程或执行复杂的数据库操作时,可能需要手写SQL。
- 报表查询:复杂的报表查询通常涉及多个聚合、分组和计算,手写SQL可能更直观和高效。
- 习惯:有些开发者更熟悉SQL,或者项目中原有SQL语句已经经过优化和测试,直接使用手写SQL可能更稳妥。
在SQLAlchemy中,我们可以使用以下方式执行手写SQL:
13.1 使用text()
构造文本SQL
from sqlalchemy import text# 执行简单查询
result = session.execute(text("SELECT * FROM users WHERE name = :name"), {"name": "john"})
for row in result:print(row)# 执行更新操作
session.execute(text("UPDATE users SET name = :name WHERE id = :id"), {"name": "johnny", "id": 1})
session.commit()
13.2 使用Table
对象的insert
、update
、delete
方法(Core方式)
from sqlalchemy import Table, MetaDatametadata = MetaData()
users_table = Table('users', metadata, autoload_with=engine)# 插入数据
stmt = users_table.insert().values(name='john', fullname='John Doe')
session.execute(stmt)
session.commit()
13.3 使用ORM的批量操作(如bulk_insert_mappings
)进行高效批量插入
# 批量插入
session.bulk_insert_mappings(User, [{'name': 'john'}, {'name': 'alice'}])
session.commit()
# 大批量更新(比ORM更高效)
bulk_update_sql = """
UPDATE products
SET price = price * 1.1,updated_at = NOW()
WHERE category_id = :category_id AND stock_quantity > 0
"""session.execute(text(bulk_update_sql), {'category_id': 5})
session.commit()
13.4 存储过程和函数调用
# 调用存储过程
stored_proc_sql = "CALL calculate_user_statistics(:user_id)"
session.execute(text(stored_proc_sql), {'user_id': 123})# 调用数据库函数
function_sql = "SELECT calculate_distance(:lat1, :lon1, :lat2, :lon2)"
result = session.execute(text(function_sql), {'lat1': 40.7128, 'lon1': -74.0060,'lat2': 34.0522, 'lon2': -118.2437
})
总结
在SQLAlchemy中手写SQL是合理的,关键在于:
- 识别合适的场景:复杂查询、性能关键操作、数据库特定功能
- 确保安全性:始终使用参数化查询,避免SQL注入
- 保持代码可维护性:合理组织SQL代码,添加适当注释
- 平衡ORM和原生SQL:利用各自的优势,不要极端偏向某一边
- 团队共识:建立团队规范,确保代码风格一致