当前位置: 首页 > news >正文

Python SQLAlchemy:告别原生 SQL,用 ORM 优雅操作数据库

在Python中操作数据库时,你是否遇到过这些问题?写原生SQL容易出现语法错误,手动拼接参数有注入风险,切换MySQL/PostgreSQL时要改大量SQL语句,代码里SQL和业务逻辑混在一起难以维护……

如果你有这些困扰,**SQLAlchemy** 就是解决方案。它是Python最流行的ORM(对象关系映射)框架,能将数据库表映射为Python类,用面向对象的方式操作数据库,彻底告别繁琐的原生SQL,同时支持跨数据库、事务管理、复杂查询等核心需求。

今天这篇博客,我们从「入门到实战」,用10+示例带你掌握SQLAlchemy的核心用法,让数据库操作变得优雅又高效。

一、什么是SQLAlchemy?先搞懂核心定位

SQLAlchemy并非直接操作数据库,而是通过「ORM层」和「核心层」两层结构实现数据库交互:

  • ORM层:将数据库表映射为Python类(如User类对应users表),通过类实例的创建/修改/删除实现数据增删改查,适合大多数业务场景。

  • 核心层:支持编写半原生SQL(通过SQL表达式),兼顾灵活性和安全性,适合复杂查询场景。

它的核心优势:

  1. 跨数据库兼容:一套代码无缝切换MySQL、PostgreSQL、SQLite等,无需修改操作逻辑。

  2. 防SQL注入:自动处理参数绑定,避免手动拼接SQL的安全风险。

  3. 代码解耦:ORM将数据模型与业务逻辑分离,代码可读性和可维护性大幅提升。

  4. 功能全面:支持事务、关联查询、索引、分页等企业级需求。

二、环境准备:5分钟搭好运行环境

首先需要安装SQLAlchemy和对应的数据库驱动(不同数据库驱动不同),以最常用的MySQL为例。

1. 安装依赖

# 安装SQLAlchemy核心库
pip install sqlalchemy# 安装数据库驱动(根据你的数据库选择)
# MySQL驱动:pymysql
pip install pymysql
# PostgreSQL驱动:psycopg2-binary
# pip install psycopg2-binary
# SQLite无需额外驱动(Python内置)

2. 数据库连接URL格式

SQLAlchemy通过「连接URL」识别数据库类型和连接信息,不同数据库的URL格式略有差异:

数据库

URL格式

示例

MySQL

mysql+pymysql://用户名:密码@主机:端口/数据库名?参数

mysql+pymysql://root:123456@localhost:3306/test_db?charset=utf8mb4

PostgreSQL

postgresql+psycopg2://用户名:密码@主机:端口/数据库名

postgresql+psycopg2://postgres:123456@localhost:5432/test_db

SQLite

sqlite:///数据库文件路径(相对/绝对)

sqlite:///test.db(当前目录下的test.db)

注意:MySQL连接需添加charset=utf8mb4参数,避免中文乱码。

三、核心概念:3个基础组件打通ORM流程

SQLAlchemy的核心流程是「创建连接→定义模型→通过会话操作数据」,对应的3个关键组件是:Engine(连接池)、Model(数据模型)、Session(会话)。

我们先通过一个「用户表(users)」的示例,理解这3个组件的用法。

1. 组件1:Engine(数据库连接池)

Engine是SQLAlchemy与数据库的「连接入口」,负责管理数据库连接池(避免频繁创建/关闭连接,提升性能)。**一个项目通常只需要创建一个Engine实例**。

from sqlalchemy import create_engine# 1. 定义MySQL连接URL(替换为你的数据库信息)
DB_URL = "mysql+pymysql://root:123456@localhost:3306/test_db?charset=utf8mb4"# 2. 创建Engine(关键参数说明)
engine = create_engine(DB_URL,echo=False,  # 若为True,会打印SQL执行日志(调试时有用,生产环境关闭)pool_size=5,  # 连接池默认大小max_overflow=10,  # 连接池最大溢出数量(临时超过pool_size的连接数)pool_recycle=3600  # 连接超时时间(秒),避免长时间闲置连接被数据库关闭
)# 验证连接(可选,首次使用时触发连接)
with engine.connect() as conn:result = conn.execute("SELECT 1")print(result.scalar())  # 输出1表示连接成功

2. 组件2:Model(数据模型,映射数据库表)

Model是ORM的核心,通过Python类映射数据库表——类名对应表名,类属性对应表字段。

需要先创建「基类」,所有数据模型都继承这个基类,基类会自动管理表的元数据(如字段、索引)。

from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from datetime import datetime# 1. 创建基类(所有模型的父类)
Base = declarative_base()# 2. 定义User模型(映射users表)
class User(Base):# 必须指定表名(数据库中实际的表名)__tablename__ = "users"# 定义字段(Column参数对应数据库字段属性)id = Column(Integer, primary_key=True, autoincrement=True, comment="用户ID")username = Column(String(50), nullable=False, unique=True, comment="用户名")email = Column(String(100), nullable=False, unique=True, comment="邮箱")password = Column(String(100), nullable=False, comment="加密后的密码")is_active = Column(Boolean, default=True, comment="是否激活")created_at = Column(DateTime, default=datetime.now, comment="创建时间")updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment="更新时间")# 可选:定义__repr__方法,打印实例时更友好def __repr__(self):return f"<User(id={self.id}, username={self.username}, email={self.email})>"# 3. 创建数据库表(首次运行时执行,若表已存在则不重复创建)
Base.metadata.create_all(bind=engine)

关键参数说明

  • primary_key=True:设为主键(唯一标识一条数据)。

  • autoincrement=True:自增(适合ID字段)。

  • nullable=False:不允许为空(必填字段)。

  • unique=True:字段值唯一(如用户名、邮箱不允许重复)。

  • default:默认值(如is_active默认True,created_at默认当前时间)。

  • onupdate:更新时自动触发(如updated_at更新时自动设为当前时间)。

3. 组件3:Session(会话,操作数据的入口)

Session相当于数据库的「游标」,所有数据增删改查都通过Session完成。它的核心作用是:管理数据对象的生命周期,批量处理操作,确保事务一致性。

通常用「工厂函数」创建Session,确保每次操作使用独立的会话(避免线程安全问题):

from sqlalchemy.orm import sessionmaker# 1. 创建Session工厂(绑定Engine)
SessionLocal = sessionmaker(bind=engine,autoflush=False,  # 关闭自动刷新(避免未提交的修改被自动同步到数据库)autocommit=False   # 关闭自动提交(需手动调用commit()提交事务)
)# 2. 获取Session实例(推荐用上下文管理器,自动关闭会话)
def get_db():db = SessionLocal()try:yield db  # 提供Session给外部使用finally:db.close()  # 无论是否报错,都关闭会话(释放连接)# 使用示例
if __name__ == "__main__":db = next(get_db())  # 获取Sessionprint(type(db))  # <class 'sqlalchemy.orm.session.Session'>db.close()  # 手动关闭(实际项目中用上下文管理器无需手动关)

四、基础CRUD:用ORM优雅操作数据

掌握了Engine、Model、Session后,就能轻松实现增删改查。以下示例基于上面定义的User模型和get_db()函数。

1. 新增数据(Create)

支持单个数据新增和批量新增,需调用session.add()(单个)/session.add_all()(批量),再用session.commit()提交事务。

from datetime import datetime# 1. 单个数据新增
def create_user(db, username: str, email: str, password: str):# 创建User实例(相当于构造一条数据)new_user = User(username=username,email=email,password=password,  # 实际项目中需加密(如用bcrypt),此处简化is_active=True,created_at=datetime.now())db.add(new_user)  # 将实例添加到Sessiondb.commit()       # 提交事务(同步到数据库)db.refresh(new_user)  # 刷新实例,获取数据库自动生成的字段(如id、created_at)return new_user# 调用示例
db = next(get_db())
user1 = create_user(db, "zhangsan", "zhangsan@example.com", "123456")
print(user1)  # <User(id=1, username=zhangsan, email=zhangsan@example.com)>
db.close()# 2. 批量新增(效率更高,减少数据库交互次数)
def batch_create_users(db, user_list: list):# user_list格式:[{"username": "...", "email": "...", "password": "..."}]users = [User(**user) for user in user_list]  # 批量创建实例db.add_all(users)  # 批量添加到Sessiondb.commit()for user in users:db.refresh(user)return users# 调用示例
db = next(get_db())
user_list = [{"username": "lisi", "email": "lisi@example.com", "password": "654321"},{"username": "wangwu", "email": "wangwu@example.com", "password": "abcdef"}
]
batch_users = batch_create_users(db, user_list)
print(batch_users)  # 输出两个User实例
db.close()

2. 查询数据(Read)

SQLAlchemy提供丰富的查询方法,支持条件筛选、排序、分页、关联查询等,核心是session.query()(或db.query())。

def query_users(db):# 1. 查询所有数据(返回列表)all_users = db.query(User).all()print("所有用户:", all_users)# 2. 查询单个数据(按主键,不存在则返回None)user_by_id = db.query(User).get(1)  # get()仅支持主键查询print("按ID查询:", user_by_id)# 3. 条件筛选(filter():支持复杂表达式,参数是类属性+运算符)# 查找用户名是"zhangsan"的用户user_by_name = db.query(User).filter(User.username == "zhangsan").first()  # first()返回第一条print("按用户名查询:", user_by_name)# 查找邮箱包含"example.com"且激活的用户active_users = db.query(User).filter(User.email.endswith("example.com"),  # 模糊查询:邮箱以example.com结尾User.is_active == True).all()print("激活且邮箱符合条件的用户:", active_users)# 4. 条件筛选(filter_by():关键字参数,仅支持简单等值判断)user_by_email = db.query(User).filter_by(email="lisi@example.com").first()print("按邮箱查询(filter_by):", user_by_email)# 5. 排序(order_by(),desc()降序,asc()升序)users_ordered = db.query(User).order_by(User.created_at.desc()).all()  # 按创建时间降序print("按创建时间降序:", users_ordered)# 6. 分页(limit()限制数量,offset()偏移量)page1_users = db.query(User).limit(2).offset(0).all()  # 第1页,每页2条page2_users = db.query(User).limit(2).offset(2).all()  # 第2页print("第1页用户:", page1_users)print("第2页用户:", page2_users)# 7. 统计数量(count())active_user_count = db.query(User).filter(User.is_active == True).count()print("激活用户数量:", active_user_count)return all_users# 调用示例
db = next(get_db())
query_users(db)
db.close()

关键方法对比

  • filter():支持复杂条件(如User.age > 18User.name.like("%张%")),参数是「类属性+表达式」。

  • filter_by():仅支持等值判断,参数是「关键字参数」(如username="zhangsan"),语法更简洁。

  • first():返回查询结果的第一条数据(无结果返回None)。

  • all():返回所有结果(列表)。

  • get():仅按主键查询(效率高,无结果返回None)。

3. 更新数据(Update)

有两种更新方式:「实例赋值更新」(适合单个数据)和「批量更新」(适合多条数据)。

def update_user(db):# 1. 实例赋值更新(单个数据)user = db.query(User).get(1)  # 先查询出要更新的实例if user:user.username = "zhangsan_new"  # 直接修改属性user.email = "zhangsan_new@example.com"db.commit()  # 提交事务db.refresh(user)  # 刷新实例,获取最新数据print("更新后的数据:", user)# 2. 批量更新(无需先查询,直接更新符合条件的数据)# 将所有用户名包含"new"的用户设为非激活update_count = db.query(User).filter(User.username.contains("new")).update({User.is_active: False}, synchronize_session=False)db.commit()print("批量更新的用户数量:", update_count)  # 返回更新的行数# 调用示例
db = next(get_db())
update_user(db)
db.close()

4. 删除数据(Delete)

支持单个删除和批量删除,需调用session.delete()(单个)或query.delete()(批量),再提交事务。

def delete_user(db):# 1. 单个删除(先查询,再删除)user = db.query(User).get(3)  # 要删除的用户ID=3if user:db.delete(user)  # 删除实例db.commit()print(f"删除用户:{user}")# 2. 批量删除(删除符合条件的数据)# 删除非激活的用户delete_count = db.query(User).filter(User.is_active == False).delete(synchronize_session=False)db.commit()print("批量删除的用户数量:", delete_count)# 调用示例
db = next(get_db())
delete_user(db)
db.close()

五、进阶用法:应对复杂业务场景

除了基础CRUD,SQLAlchemy还支持关联查询、原生SQL、事务管理等进阶功能,满足中大型项目需求。

1. 关联查询(一对多关系)

实际项目中表之间常有关联(如「用户-地址」一对多:一个用户可有多条地址),SQLAlchemy通过relationshipForeignKey实现关联。

步骤1:定义关联模型
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship# 地址模型(与User是一对多关系)
class Address(Base):__tablename__ = "addresses"id = Column(Integer, primary_key=True, autoincrement=True)user_id = Column(Integer, ForeignKey("users.id"), nullable=False, comment="关联的用户ID")  # 外键:关联users表的idcity = Column(String(50), nullable=False, comment="城市")street = Column(String(100), nullable=False, comment="街道")created_at = Column(DateTime, default=datetime.now)# 定义关联:通过Address.user获取对应的User实例user = relationship("User", back_populates="addresses")  # back_populates与User的addresses对应# 修改User模型,添加关联字段
class User(Base):__tablename__ = "users"# ... 其他字段不变 ...# 定义关联:通过User.addresses获取该用户的所有Address实例addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")  # cascade:删除用户时自动删除关联的地址

步骤2:关联查询示例
def query_user_with_addresses(db):# 1. 通过用户查地址(一对多)user = db.query(User).get(1)if user:print(f"用户{user.username}的地址:", user.addresses)  # 直接通过user.addresses获取所有地址# 2. 通过地址查用户(多对一)address = db.query(Address).get(1)if address:print(f"地址{address.id}所属用户:", address.user)  # 直接通过address.user获取用户# 3. 关联查询(JOIN):查询有北京地址的用户users_in_beijing = db.query(User).join(Address).filter(Address.city == "北京").all()print("有北京地址的用户:", users_in_beijing)# 调用示例
db = next(get_db())
# 先给用户添加地址
user = db.query(User).get(1)
new_address = Address(user_id=user.id, city="北京", street="朝阳路1号")
db.add(new_address)
db.commit()# 关联查询
query_user_with_addresses(db)
db.close()

关键参数说明

  • ForeignKey("users.id"):定义外键,关联users表的id字段(确保数据完整性)。

  • relationship:定义模型间的关联关系,实现「通过一个模型获取另一个模型的数据」。

  • cascade="all, delete-orphan":级联操作,删除用户时自动删除关联的地址(避免孤儿数据)。

2. 原生SQL:复杂查询的补充方案

虽然ORM强大,但复杂查询(如多表JOIN、子查询)用原生SQL更直观。SQLAlchemy支持通过text()函数执行原生SQL,同时保持参数绑定的安全性。

from sqlalchemy import textdef run_raw_sql(db):# 1. 执行原生查询(参数绑定,避免SQL注入)# 查询用户名包含"zhang"的用户sql = text("SELECT * FROM users WHERE username LIKE :name")result = db.execute(sql, {"name": "%zhang%"}).fetchall()  # 参数通过字典传递print("原生SQL查询结果:", result)# 2. 执行原生更新sql_update = text("UPDATE users SET is_active = :active WHERE id = :id")db.execute(sql_update, {"active": True, "id": 1})db.commit()print("原生SQL更新成功")# 3. 执行原生删除sql_delete = text("DELETE FROM addresses WHERE city = :city")delete_count = db.execute(sql_delete, {"city": "上海"}).rowcountdb.commit()print(f"原生SQL删除的地址数量:{delete_count}")# 调用示例
db = next(get_db())
run_raw_sql(db)
db.close()

注意:参数必须通过execute()的第二个参数传递(如{"name": "%zhang%"}),不可手动拼接SQL字符串,否则有注入风险。

3. 事务管理:确保数据一致性

事务的核心是「ACID原则」:原子性(要么全成,要么全败)、一致性、隔离性、持久性。SQLAlchemy通过Session.commit()Session.rollback()实现事务管理。

def transaction_demo(db):try:# 步骤1:创建用户new_user = User(username="transaction", email="transaction@example.com", password="123456")db.add(new_user)# 步骤2:给用户添加地址(模拟业务流程)new_address = Address(user_id=new_user.id, city="广州", street="天河路1号")db.add(new_address)# 提交事务(所有操作生效)db.commit()print("事务提交成功:用户和地址都创建完成")except Exception as e:# 若发生异常,回滚事务(所有操作撤销)db.rollback()print(f"事务回滚:{str(e)}")# 调用示例
db = next(get_db())
transaction_demo(db)
db.close()

场景说明:如果创建用户成功,但添加地址失败(如字段为空),rollback()会撤销「创建用户」的操作,避免数据库中出现「有用户但无地址」的不一致数据。

六、实战:完整用户管理系统示例

将前面的知识点整合,实现一个完整的「用户管理系统」,包含模型定义、CRUD封装、关联操作和事务处理:

# 完整示例:user_management.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime# 1. 配置数据库连接
DB_URL = "mysql+pymysql://root:123456@localhost:3306/test_db?charset=utf8mb4"
engine = create_engine(DB_URL, echo=False)# 2. 创建基类
Base = declarative_base()# 3. 定义模型(User + Address)
class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, autoincrement=True)username = Column(String(50), nullable=False, unique=True)email = Column(String(100), nullable=False, unique=True)password = Column(String(100), nullable=False)is_active = Column(Boolean, default=True)created_at = Column(DateTime, default=datetime.now)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)# 关联地址addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")def __repr__(self):return f"<User(id={self.id}, username={self.username})>"class Address(Base):__tablename__ = "addresses"id = Column(Integer, primary_key=True, autoincrement=True)user_id = Column(Integer, ForeignKey("users.id"), nullable=False)city = Column(String(50), nullable=False)street = Column(String(100), nullable=False)created_at = Column(DateTime, default=datetime.now)# 关联用户user = relationship("User", back_populates="addresses")def __repr__(self):return f"<Address(id={self.id}, city={self.city}, street={self.street})>"# 4. 创建表
Base.metadata.create_all(bind=engine)# 5. 创建Session
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)def get_db():db = SessionLocal()try:yield dbfinally:db.close()# 6. 业务逻辑封装
def create_user_with_address(db, username, email, password, address_info):"""创建用户并添加地址(事务)"""try:user = User(username=username, email=email, password=password)db.add(user)db.flush()  # 刷新Session,获取user.id(无需commit)# 添加地址address = Address(user_id=user.id,city=address_info["city"],street=address_info["street"])db.add(address)db.commit()db.refresh(user)return userexcept Exception as e:db.rollback()raise edef get_user_detail(db, user_id):"""获取用户详情(含地址)"""user = db.query(User).get(user_id)if not user:raise ValueError(f"用户ID {user_id} 不存在")return {"id": user.id,"username": user.username,"email": user.email,"is_active": user.is_active,"created_at": user.created_at,"addresses": [{"city": addr.city, "street": addr.street} for addr in user.addresses]}# 7. 实际调用
if __name__ == "__main__":db = next(get_db())try:# 创建用户并添加地址user = create_user_with_address(db,username="demo_user",email="demo@example.com",password="demo123",address_info={"city": "深圳", "street": "南山路100号"})print("创建用户成功:", user)# 获取用户详情user_detail = get_user_detail(db, user.id)print("用户详情:", user_detail)except Exception as e:print(f"操作失败:{str(e)}")finally:db.close()

七、总结:为什么推荐SQLAlchemy?

  1. 代码更优雅:用面向对象替代原生SQL,业务逻辑与数据操作分离,可读性大幅提升。

  2. 跨数据库兼容:一套代码支持MySQL、PostgreSQL、SQLite等,切换数据库无需修改核心逻辑。

  3. 安全性更高:自动处理参数绑定,避免SQL注入,无需手动拼接SQL。

  4. 功能更全面:支持事务、关联查询、索引、分页等企业级需求,开箱即用。

如果你正在开发中大型Python项目,或者厌倦了原生SQL的繁琐,SQLAlchemy绝对值得一试——它会让你的数据库操作效率提升一个档次!

最后:注意事项

  1. 数据库驱动:不同数据库需安装对应的驱动(如MySQL用pymysql,PostgreSQL用psycopg2)。

  2. Session管理:用get_db()这类工厂函数创建Session,避免线程安全问题,用完及时关闭。

  3. 事务谨慎:涉及多步操作时,务必用try-except包裹,异常时调用rollback()

  4. 性能优化:批量操作用add_all()/update()/delete(),减少数据库交互次数;复杂查询优先用ORM,实在不行再用原生SQL。

http://www.dtcms.com/a/490358.html

相关文章:

  • 鸿蒙Harmony实战开发教学(No.5)-TextInput组件基础到进阶篇
  • 【Qt】8.信号和槽_自定义信号和槽​
  • WPF——动画
  • 医院做网站怎么做wordpress还能用
  • YOLO系列目标检测算法全面解析
  • 目标检测全解析:从基础概念到深度学习实战技术
  • 基于深度学习计算机视觉的风格迁移技术原理与经典实现解析
  • Redis Key设计与Value存储
  • Pytest+requests进行接口自动化测试8.0(Allure进阶 + 文件上传接口 + 单接口多用例)
  • Kubernetes全景解读:从云原生基石到卓越实践
  • 【JUnit实战3_02】第二章:探索 JUnit 的核心功能(一)
  • 计算机视觉(opencv)——实时颜色检测
  • 宣传网站怎么做网站制作洋网络
  • 网站排名优化化快排优化网站服务器搭建的步骤
  • 本地用docling实现pdf转markdown操作笔记
  • iOS 26 APP 性能测试实战攻略:多工具组合辅助方案
  • 《Linux运维总结:基于X86_64+ARM64架构CPU使用docker-compose一键离线部署consul 1.21.5容器版集群》
  • wordpress 购物东莞网站优化方法有哪些
  • 接线盒工程量-图形识别高效运算
  • 后厨手套穿戴检测保障食品安全 手套佩戴检测 未戴手套检测 未佩戴手套实时报警 高危行业手套佩戴实时监控
  • 原位PL光谱测试教学(实操版)
  • 技术报告:高仿真虚构内容对主流大模型的现实感幻觉测试
  • 大模型提示词简介
  • R语言术语(2)
  • 广州网站建设推广谷歌官网首页
  • 【Python】基于Tkinter库实现文件夹拖拽与选择功能
  • Spring Boot 官方文档精解:构建与依赖管理
  • ONLYOFFICE 桌面编辑器9.1版本已发布:PDF密文功能和全新注释、工作表公式优化及文件恢复便捷化等
  • 重视网站阵地建设广州市天河区建设局官方网站
  • QPS、TPS、RPS 详解