Python SQLAlchemy:告别原生 SQL,用 ORM 优雅操作数据库
在Python中操作数据库时,你是否遇到过这些问题?写原生SQL容易出现语法错误,手动拼接参数有注入风险,切换MySQL/PostgreSQL时要改大量SQL语句,代码里SQL和业务逻辑混在一起难以维护……
如果你有这些困扰,**SQLAlchemy** 就是解决方案。它是Python最流行的ORM(对象关系映射)框架,能将数据库表映射为Python类,用面向对象的方式操作数据库,彻底告别繁琐的原生SQL,同时支持跨数据库、事务管理、复杂查询等核心需求。
今天这篇博客,我们从「入门到实战」,用10+示例带你掌握SQLAlchemy的核心用法,让数据库操作变得优雅又高效。
一、什么是SQLAlchemy?先搞懂核心定位
SQLAlchemy并非直接操作数据库,而是通过「ORM层」和「核心层」两层结构实现数据库交互:
ORM层:将数据库表映射为Python类(如
User
类对应users
表),通过类实例的创建/修改/删除实现数据增删改查,适合大多数业务场景。核心层:支持编写半原生SQL(通过SQL表达式),兼顾灵活性和安全性,适合复杂查询场景。
它的核心优势:
跨数据库兼容:一套代码无缝切换MySQL、PostgreSQL、SQLite等,无需修改操作逻辑。
防SQL注入:自动处理参数绑定,避免手动拼接SQL的安全风险。
代码解耦:ORM将数据模型与业务逻辑分离,代码可读性和可维护性大幅提升。
功能全面:支持事务、关联查询、索引、分页等企业级需求。
二、环境准备:5分钟搭好运行环境
首先需要安装SQLAlchemy和对应的数据库驱动(不同数据库驱动不同),以最常用的MySQL为例。
1. 安装依赖
# 安装SQLAlchemy核心库
pip install sqlalchemy# 安装数据库驱动(根据你的数据库选择)
# MySQL驱动:pymysql
pip install pymysql
# PostgreSQL驱动:psycopg2-binary
# pip install psycopg2-binary
# SQLite无需额外驱动(Python内置)
2. 数据库连接URL格式
SQLAlchemy通过「连接URL」识别数据库类型和连接信息,不同数据库的URL格式略有差异:
数据库 | URL格式 | 示例 |
MySQL | mysql+pymysql://用户名:密码@主机:端口/数据库名?参数 | mysql+pymysql://root:123456@localhost:3306/test_db?charset=utf8mb4 |
PostgreSQL | postgresql+psycopg2://用户名:密码@主机:端口/数据库名 | postgresql+psycopg2://postgres:123456@localhost:5432/test_db |
SQLite | sqlite:///数据库文件路径(相对/绝对) | sqlite:///test.db(当前目录下的test.db) |
注意:MySQL连接需添加charset=utf8mb4
参数,避免中文乱码。
三、核心概念:3个基础组件打通ORM流程
SQLAlchemy的核心流程是「创建连接→定义模型→通过会话操作数据」,对应的3个关键组件是:Engine
(连接池)、Model
(数据模型)、Session
(会话)。
我们先通过一个「用户表(users)」的示例,理解这3个组件的用法。
1. 组件1:Engine(数据库连接池)
Engine
是SQLAlchemy与数据库的「连接入口」,负责管理数据库连接池(避免频繁创建/关闭连接,提升性能)。**一个项目通常只需要创建一个Engine实例**。
from sqlalchemy import create_engine# 1. 定义MySQL连接URL(替换为你的数据库信息)
DB_URL = "mysql+pymysql://root:123456@localhost:3306/test_db?charset=utf8mb4"# 2. 创建Engine(关键参数说明)
engine = create_engine(DB_URL,echo=False, # 若为True,会打印SQL执行日志(调试时有用,生产环境关闭)pool_size=5, # 连接池默认大小max_overflow=10, # 连接池最大溢出数量(临时超过pool_size的连接数)pool_recycle=3600 # 连接超时时间(秒),避免长时间闲置连接被数据库关闭
)# 验证连接(可选,首次使用时触发连接)
with engine.connect() as conn:result = conn.execute("SELECT 1")print(result.scalar()) # 输出1表示连接成功
2. 组件2:Model(数据模型,映射数据库表)
Model
是ORM的核心,通过Python类映射数据库表——类名对应表名,类属性对应表字段。
需要先创建「基类」,所有数据模型都继承这个基类,基类会自动管理表的元数据(如字段、索引)。
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from datetime import datetime# 1. 创建基类(所有模型的父类)
Base = declarative_base()# 2. 定义User模型(映射users表)
class User(Base):# 必须指定表名(数据库中实际的表名)__tablename__ = "users"# 定义字段(Column参数对应数据库字段属性)id = Column(Integer, primary_key=True, autoincrement=True, comment="用户ID")username = Column(String(50), nullable=False, unique=True, comment="用户名")email = Column(String(100), nullable=False, unique=True, comment="邮箱")password = Column(String(100), nullable=False, comment="加密后的密码")is_active = Column(Boolean, default=True, comment="是否激活")created_at = Column(DateTime, default=datetime.now, comment="创建时间")updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment="更新时间")# 可选:定义__repr__方法,打印实例时更友好def __repr__(self):return f"<User(id={self.id}, username={self.username}, email={self.email})>"# 3. 创建数据库表(首次运行时执行,若表已存在则不重复创建)
Base.metadata.create_all(bind=engine)
关键参数说明:
primary_key=True
:设为主键(唯一标识一条数据)。autoincrement=True
:自增(适合ID字段)。nullable=False
:不允许为空(必填字段)。unique=True
:字段值唯一(如用户名、邮箱不允许重复)。default
:默认值(如is_active
默认True,created_at
默认当前时间)。onupdate
:更新时自动触发(如updated_at
更新时自动设为当前时间)。
3. 组件3:Session(会话,操作数据的入口)
Session
相当于数据库的「游标」,所有数据增删改查都通过Session完成。它的核心作用是:管理数据对象的生命周期,批量处理操作,确保事务一致性。
通常用「工厂函数」创建Session,确保每次操作使用独立的会话(避免线程安全问题):
from sqlalchemy.orm import sessionmaker# 1. 创建Session工厂(绑定Engine)
SessionLocal = sessionmaker(bind=engine,autoflush=False, # 关闭自动刷新(避免未提交的修改被自动同步到数据库)autocommit=False # 关闭自动提交(需手动调用commit()提交事务)
)# 2. 获取Session实例(推荐用上下文管理器,自动关闭会话)
def get_db():db = SessionLocal()try:yield db # 提供Session给外部使用finally:db.close() # 无论是否报错,都关闭会话(释放连接)# 使用示例
if __name__ == "__main__":db = next(get_db()) # 获取Sessionprint(type(db)) # <class 'sqlalchemy.orm.session.Session'>db.close() # 手动关闭(实际项目中用上下文管理器无需手动关)
四、基础CRUD:用ORM优雅操作数据
掌握了Engine、Model、Session后,就能轻松实现增删改查。以下示例基于上面定义的User
模型和get_db()
函数。
1. 新增数据(Create)
支持单个数据新增和批量新增,需调用session.add()
(单个)/session.add_all()
(批量),再用session.commit()
提交事务。
from datetime import datetime# 1. 单个数据新增
def create_user(db, username: str, email: str, password: str):# 创建User实例(相当于构造一条数据)new_user = User(username=username,email=email,password=password, # 实际项目中需加密(如用bcrypt),此处简化is_active=True,created_at=datetime.now())db.add(new_user) # 将实例添加到Sessiondb.commit() # 提交事务(同步到数据库)db.refresh(new_user) # 刷新实例,获取数据库自动生成的字段(如id、created_at)return new_user# 调用示例
db = next(get_db())
user1 = create_user(db, "zhangsan", "zhangsan@example.com", "123456")
print(user1) # <User(id=1, username=zhangsan, email=zhangsan@example.com)>
db.close()# 2. 批量新增(效率更高,减少数据库交互次数)
def batch_create_users(db, user_list: list):# user_list格式:[{"username": "...", "email": "...", "password": "..."}]users = [User(**user) for user in user_list] # 批量创建实例db.add_all(users) # 批量添加到Sessiondb.commit()for user in users:db.refresh(user)return users# 调用示例
db = next(get_db())
user_list = [{"username": "lisi", "email": "lisi@example.com", "password": "654321"},{"username": "wangwu", "email": "wangwu@example.com", "password": "abcdef"}
]
batch_users = batch_create_users(db, user_list)
print(batch_users) # 输出两个User实例
db.close()
2. 查询数据(Read)
SQLAlchemy提供丰富的查询方法,支持条件筛选、排序、分页、关联查询等,核心是session.query()
(或db.query()
)。
def query_users(db):# 1. 查询所有数据(返回列表)all_users = db.query(User).all()print("所有用户:", all_users)# 2. 查询单个数据(按主键,不存在则返回None)user_by_id = db.query(User).get(1) # get()仅支持主键查询print("按ID查询:", user_by_id)# 3. 条件筛选(filter():支持复杂表达式,参数是类属性+运算符)# 查找用户名是"zhangsan"的用户user_by_name = db.query(User).filter(User.username == "zhangsan").first() # first()返回第一条print("按用户名查询:", user_by_name)# 查找邮箱包含"example.com"且激活的用户active_users = db.query(User).filter(User.email.endswith("example.com"), # 模糊查询:邮箱以example.com结尾User.is_active == True).all()print("激活且邮箱符合条件的用户:", active_users)# 4. 条件筛选(filter_by():关键字参数,仅支持简单等值判断)user_by_email = db.query(User).filter_by(email="lisi@example.com").first()print("按邮箱查询(filter_by):", user_by_email)# 5. 排序(order_by(),desc()降序,asc()升序)users_ordered = db.query(User).order_by(User.created_at.desc()).all() # 按创建时间降序print("按创建时间降序:", users_ordered)# 6. 分页(limit()限制数量,offset()偏移量)page1_users = db.query(User).limit(2).offset(0).all() # 第1页,每页2条page2_users = db.query(User).limit(2).offset(2).all() # 第2页print("第1页用户:", page1_users)print("第2页用户:", page2_users)# 7. 统计数量(count())active_user_count = db.query(User).filter(User.is_active == True).count()print("激活用户数量:", active_user_count)return all_users# 调用示例
db = next(get_db())
query_users(db)
db.close()
关键方法对比:
filter()
:支持复杂条件(如User.age > 18
、User.name.like("%张%")
),参数是「类属性+表达式」。filter_by()
:仅支持等值判断,参数是「关键字参数」(如username="zhangsan"
),语法更简洁。first()
:返回查询结果的第一条数据(无结果返回None)。all()
:返回所有结果(列表)。get()
:仅按主键查询(效率高,无结果返回None)。
3. 更新数据(Update)
有两种更新方式:「实例赋值更新」(适合单个数据)和「批量更新」(适合多条数据)。
def update_user(db):# 1. 实例赋值更新(单个数据)user = db.query(User).get(1) # 先查询出要更新的实例if user:user.username = "zhangsan_new" # 直接修改属性user.email = "zhangsan_new@example.com"db.commit() # 提交事务db.refresh(user) # 刷新实例,获取最新数据print("更新后的数据:", user)# 2. 批量更新(无需先查询,直接更新符合条件的数据)# 将所有用户名包含"new"的用户设为非激活update_count = db.query(User).filter(User.username.contains("new")).update({User.is_active: False}, synchronize_session=False)db.commit()print("批量更新的用户数量:", update_count) # 返回更新的行数# 调用示例
db = next(get_db())
update_user(db)
db.close()
4. 删除数据(Delete)
支持单个删除和批量删除,需调用session.delete()
(单个)或query.delete()
(批量),再提交事务。
def delete_user(db):# 1. 单个删除(先查询,再删除)user = db.query(User).get(3) # 要删除的用户ID=3if user:db.delete(user) # 删除实例db.commit()print(f"删除用户:{user}")# 2. 批量删除(删除符合条件的数据)# 删除非激活的用户delete_count = db.query(User).filter(User.is_active == False).delete(synchronize_session=False)db.commit()print("批量删除的用户数量:", delete_count)# 调用示例
db = next(get_db())
delete_user(db)
db.close()
五、进阶用法:应对复杂业务场景
除了基础CRUD,SQLAlchemy还支持关联查询、原生SQL、事务管理等进阶功能,满足中大型项目需求。
1. 关联查询(一对多关系)
实际项目中表之间常有关联(如「用户-地址」一对多:一个用户可有多条地址),SQLAlchemy通过relationship
和ForeignKey
实现关联。
步骤1:定义关联模型
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship# 地址模型(与User是一对多关系)
class Address(Base):__tablename__ = "addresses"id = Column(Integer, primary_key=True, autoincrement=True)user_id = Column(Integer, ForeignKey("users.id"), nullable=False, comment="关联的用户ID") # 外键:关联users表的idcity = Column(String(50), nullable=False, comment="城市")street = Column(String(100), nullable=False, comment="街道")created_at = Column(DateTime, default=datetime.now)# 定义关联:通过Address.user获取对应的User实例user = relationship("User", back_populates="addresses") # back_populates与User的addresses对应# 修改User模型,添加关联字段
class User(Base):__tablename__ = "users"# ... 其他字段不变 ...# 定义关联:通过User.addresses获取该用户的所有Address实例addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan") # cascade:删除用户时自动删除关联的地址
步骤2:关联查询示例
def query_user_with_addresses(db):# 1. 通过用户查地址(一对多)user = db.query(User).get(1)if user:print(f"用户{user.username}的地址:", user.addresses) # 直接通过user.addresses获取所有地址# 2. 通过地址查用户(多对一)address = db.query(Address).get(1)if address:print(f"地址{address.id}所属用户:", address.user) # 直接通过address.user获取用户# 3. 关联查询(JOIN):查询有北京地址的用户users_in_beijing = db.query(User).join(Address).filter(Address.city == "北京").all()print("有北京地址的用户:", users_in_beijing)# 调用示例
db = next(get_db())
# 先给用户添加地址
user = db.query(User).get(1)
new_address = Address(user_id=user.id, city="北京", street="朝阳路1号")
db.add(new_address)
db.commit()# 关联查询
query_user_with_addresses(db)
db.close()
关键参数说明:
ForeignKey("users.id")
:定义外键,关联users
表的id
字段(确保数据完整性)。relationship
:定义模型间的关联关系,实现「通过一个模型获取另一个模型的数据」。cascade="all, delete-orphan"
:级联操作,删除用户时自动删除关联的地址(避免孤儿数据)。
2. 原生SQL:复杂查询的补充方案
虽然ORM强大,但复杂查询(如多表JOIN、子查询)用原生SQL更直观。SQLAlchemy支持通过text()
函数执行原生SQL,同时保持参数绑定的安全性。
from sqlalchemy import textdef run_raw_sql(db):# 1. 执行原生查询(参数绑定,避免SQL注入)# 查询用户名包含"zhang"的用户sql = text("SELECT * FROM users WHERE username LIKE :name")result = db.execute(sql, {"name": "%zhang%"}).fetchall() # 参数通过字典传递print("原生SQL查询结果:", result)# 2. 执行原生更新sql_update = text("UPDATE users SET is_active = :active WHERE id = :id")db.execute(sql_update, {"active": True, "id": 1})db.commit()print("原生SQL更新成功")# 3. 执行原生删除sql_delete = text("DELETE FROM addresses WHERE city = :city")delete_count = db.execute(sql_delete, {"city": "上海"}).rowcountdb.commit()print(f"原生SQL删除的地址数量:{delete_count}")# 调用示例
db = next(get_db())
run_raw_sql(db)
db.close()
注意:参数必须通过execute()
的第二个参数传递(如{"name": "%zhang%"}
),不可手动拼接SQL字符串,否则有注入风险。
3. 事务管理:确保数据一致性
事务的核心是「ACID原则」:原子性(要么全成,要么全败)、一致性、隔离性、持久性。SQLAlchemy通过Session.commit()
和Session.rollback()
实现事务管理。
def transaction_demo(db):try:# 步骤1:创建用户new_user = User(username="transaction", email="transaction@example.com", password="123456")db.add(new_user)# 步骤2:给用户添加地址(模拟业务流程)new_address = Address(user_id=new_user.id, city="广州", street="天河路1号")db.add(new_address)# 提交事务(所有操作生效)db.commit()print("事务提交成功:用户和地址都创建完成")except Exception as e:# 若发生异常,回滚事务(所有操作撤销)db.rollback()print(f"事务回滚:{str(e)}")# 调用示例
db = next(get_db())
transaction_demo(db)
db.close()
场景说明:如果创建用户成功,但添加地址失败(如字段为空),rollback()
会撤销「创建用户」的操作,避免数据库中出现「有用户但无地址」的不一致数据。
六、实战:完整用户管理系统示例
将前面的知识点整合,实现一个完整的「用户管理系统」,包含模型定义、CRUD封装、关联操作和事务处理:
# 完整示例:user_management.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime# 1. 配置数据库连接
DB_URL = "mysql+pymysql://root:123456@localhost:3306/test_db?charset=utf8mb4"
engine = create_engine(DB_URL, echo=False)# 2. 创建基类
Base = declarative_base()# 3. 定义模型(User + Address)
class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, autoincrement=True)username = Column(String(50), nullable=False, unique=True)email = Column(String(100), nullable=False, unique=True)password = Column(String(100), nullable=False)is_active = Column(Boolean, default=True)created_at = Column(DateTime, default=datetime.now)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)# 关联地址addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")def __repr__(self):return f"<User(id={self.id}, username={self.username})>"class Address(Base):__tablename__ = "addresses"id = Column(Integer, primary_key=True, autoincrement=True)user_id = Column(Integer, ForeignKey("users.id"), nullable=False)city = Column(String(50), nullable=False)street = Column(String(100), nullable=False)created_at = Column(DateTime, default=datetime.now)# 关联用户user = relationship("User", back_populates="addresses")def __repr__(self):return f"<Address(id={self.id}, city={self.city}, street={self.street})>"# 4. 创建表
Base.metadata.create_all(bind=engine)# 5. 创建Session
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)def get_db():db = SessionLocal()try:yield dbfinally:db.close()# 6. 业务逻辑封装
def create_user_with_address(db, username, email, password, address_info):"""创建用户并添加地址(事务)"""try:user = User(username=username, email=email, password=password)db.add(user)db.flush() # 刷新Session,获取user.id(无需commit)# 添加地址address = Address(user_id=user.id,city=address_info["city"],street=address_info["street"])db.add(address)db.commit()db.refresh(user)return userexcept Exception as e:db.rollback()raise edef get_user_detail(db, user_id):"""获取用户详情(含地址)"""user = db.query(User).get(user_id)if not user:raise ValueError(f"用户ID {user_id} 不存在")return {"id": user.id,"username": user.username,"email": user.email,"is_active": user.is_active,"created_at": user.created_at,"addresses": [{"city": addr.city, "street": addr.street} for addr in user.addresses]}# 7. 实际调用
if __name__ == "__main__":db = next(get_db())try:# 创建用户并添加地址user = create_user_with_address(db,username="demo_user",email="demo@example.com",password="demo123",address_info={"city": "深圳", "street": "南山路100号"})print("创建用户成功:", user)# 获取用户详情user_detail = get_user_detail(db, user.id)print("用户详情:", user_detail)except Exception as e:print(f"操作失败:{str(e)}")finally:db.close()
七、总结:为什么推荐SQLAlchemy?
代码更优雅:用面向对象替代原生SQL,业务逻辑与数据操作分离,可读性大幅提升。
跨数据库兼容:一套代码支持MySQL、PostgreSQL、SQLite等,切换数据库无需修改核心逻辑。
安全性更高:自动处理参数绑定,避免SQL注入,无需手动拼接SQL。
功能更全面:支持事务、关联查询、索引、分页等企业级需求,开箱即用。
如果你正在开发中大型Python项目,或者厌倦了原生SQL的繁琐,SQLAlchemy绝对值得一试——它会让你的数据库操作效率提升一个档次!
最后:注意事项
数据库驱动:不同数据库需安装对应的驱动(如MySQL用pymysql,PostgreSQL用psycopg2)。
Session管理:用
get_db()
这类工厂函数创建Session,避免线程安全问题,用完及时关闭。事务谨慎:涉及多步操作时,务必用
try-except
包裹,异常时调用rollback()
。性能优化:批量操作用
add_all()
/update()
/delete()
,减少数据库交互次数;复杂查询优先用ORM,实在不行再用原生SQL。