Python 数据库编程
一、数据库连接基础
 1. 标准流程
 import database_module  # 如mysql.connector, sqlite3等
# 1. 建立连接
 connection = database_module.connect(
     host="localhost",
     user="username",
     password="password",
     database="dbname"
 )
# 2. 创建游标
 cursor = connection.cursor()
# 3. 执行SQL
 cursor.execute("SELECT * FROM users")
# 4. 获取结果
 results = cursor.fetchall()
# 5. 关闭资源
 cursor.close()
 connection.close()
 2. 上下文管理器(自动关闭连接)
 
 with database_module.connect(...) as connection:
     with connection.cursor() as cursor:
         cursor.execute("INSERT INTO users VALUES (?, ?, ?)", (1, "Alice", 25))
         connection.commit()  # 提交事务
 二、主流数据库连接方案
 1. SQLite(内置,适合小型应用)
 
 import sqlite3
# 连接(自动创建数据库文件)
 conn = sqlite3.connect("example.db")
# 创建表
 conn.execute("""
 CREATE TABLE IF NOT EXISTS users (
     id INTEGER PRIMARY KEY,
     name TEXT,
     age INTEGER
 )
 """)
# 插入数据
 conn.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Bob", 30))
 conn.commit()
# 查询数据
 for row in conn.execute("SELECT * FROM users"):
     print(row)
 2. MySQL(使用 mysql-connector-python)
 
 import mysql.connector
conn = mysql.connector.connect(
     host="localhost",
     user="root",
     password="password",
     database="test"
 )
cursor = conn.cursor()
 cursor.execute("SELECT VERSION()")
 print(f"MySQL版本: {cursor.fetchone()}")
 3. PostgreSQL(使用 psycopg2)
 python
 运行
 import psycopg2
conn = psycopg2.connect(
     host="localhost",
     database="mydb",
     user="user",
     password="pass",
     port=5432
 )
with conn.cursor() as cursor:
     cursor.execute("SELECT * FROM products LIMIT 5")
     print(cursor.fetchall())
 4. SQL Server(使用 pyodbc)
 
 import pyodbc
conn = pyodbc.connect(
     "DRIVER={SQL Server};"
     "SERVER=localhost;"
     "DATABASE=mydb;"
     "UID=user;"
     "PWD=password"
 )
cursor = conn.cursor()
 cursor.execute("SELECT @@VERSION")
 print(cursor.fetchone())
 三、高级操作技术
 1. 参数化查询(防止 SQL 注入)
 
 # SQLite/PostgreSQL风格(?占位符)
 cursor.execute("SELECT * FROM users WHERE age > ?", (18,))
# MySQL风格(%s占位符)
 cursor.execute("INSERT INTO users (name) VALUES (%s)", ("Charlie",))
# SQL Server风格(:name命名参数)
 cursor.execute("UPDATE users SET age=:age WHERE id=:id", {"age": 30, "id": 1})
 2. 批量操作
 
 # 批量插入
 data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
 cursor.executemany("INSERT INTO users (name, age) VALUES (?, ?)", data)
# 批量更新(使用字典参数)
 update_data = [
     {"id": 1, "age": 26},
     {"id": 2, "age": 31}
 ]
 cursor.executemany("UPDATE users SET age=:age WHERE id=:id", update_data)
 3. 事务处理
 
 try:
     with conn:  # 自动提交/回滚
         cursor.execute("BEGIN TRANSACTION")
         cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
         cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
 except Exception as e:
     print(f"事务失败: {e}")
 四、ORM 框架(对象关系映射)
 1. SQLAlchemy(通用 ORM)
 
 from sqlalchemy import create_engine, Column, Integer, String
 from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import sessionmaker
# 创建引擎
 engine = create_engine("sqlite:///example.db")
# 定义模型
 Base = declarative_base()
 class User(Base):
     __tablename__ = "users"
     id = Column(Integer, primary_key=True)
     name = Column(String)
     age = Column(Integer)
# 创建表
 Base.metadata.create_all(engine)
# 创建会话
 Session = sessionmaker(bind=engine)
 session = Session()
# 添加数据
 new_user = User(name="David", age=40)
 session.add(new_user)
 session.commit()
# 查询数据
 users = session.query(User).filter(User.age > 30).all()
 print([u.name for u in users])  # 输出: ['David']
 2. Django ORM(Django 框架内置)
 python
 运行
 # models.py
 from django.db import models
class Product(models.Model):
     name = models.CharField(max_length=100)
     price = models.DecimalField(max_digits=10, decimal_places=2)
     created_at = models.DateTimeField(auto_now_add=True)
# 使用示例
 products = Product.objects.filter(price__gt=100).order_by("-created_at")
 五、数据库连接池
 1. 使用 SQLAlchemy 连接池
 python
 运行
 from sqlalchemy import create_engine
engine = create_engine(
     "mysql+pymysql://user:pass@host/db",
     pool_size=5,             # 连接池大小
     max_overflow=10,         # 最大溢出连接数
     pool_timeout=30,         # 连接超时时间
     pool_recycle=3600        # 连接回收时间
 )
# 从连接池获取连接
 with engine.connect() as conn:
     result = conn.execute("SELECT * FROM users")
 2. 自定义连接池(以 MySQL 为例)
 python
 运行
 from mysql.connector import pooling
pool = pooling.MySQLConnectionPool(
     pool_name="mypool",
     pool_size=5,
     host="localhost",
     user="user",
     password="pass",
     database="test"
 )
# 从池中获取连接
 connection = pool.get_connection()
 cursor = connection.cursor()
 六、性能优化技巧
 批量操作替代循环:
 python
 运行
 # 低效:
 for row in data:
     cursor.execute("INSERT INTO table VALUES (%s)", (row,))
# 高效:
 cursor.executemany("INSERT INTO table VALUES (%s)", data)
连接复用:
 python
 运行
 # 避免频繁创建连接
 # 使用单例模式或连接池管理连接
索引优化:
 python
 运行
 # 确保查询字段有索引
 cursor.execute("CREATE INDEX idx_age ON users (age)")
分页查询:
 python
 运行
 page_size = 100
 offset = 0
 while True:
     cursor.execute("SELECT * FROM large_table LIMIT %s OFFSET %s", (page_size, offset))
     results = cursor.fetchall()
     if not results:
         break
     # 处理结果
     offset += page_size
七、错误处理最佳实践
 python
 运行
 import database_module
try:
     conn = database_module.connect(...)
     with conn.cursor() as cursor:
         cursor.execute("SELECT * FROM non_existent_table")  # 故意错误
 except database_module.Error as e:
     if e.errno == 1146:  # 表不存在
         print("表不存在,请检查SQL")
     elif e.errno == 2006:  # 连接丢失
         print("数据库连接已断开")
     else:
         print(f"未知错误: {e}")
 finally:
     if conn and conn.is_connected():
         conn.close()
 八、安全注意事项
 密码管理:
 python
 运行
 # 不要硬编码密码
 import os
 password = os.environ.get("DB_PASSWORD")
输入验证:
 python
 运行
 # 对用户输入进行验证
 if not isinstance(age, int):
     raise ValueError("年龄必须为整数")
最小权限原则:
 sql
 -- 数据库层面:为应用创建仅拥有必要权限的用户
 GRANT SELECT, INSERT ON db.table TO 'app_user'@'localhost';
