Python面试题及详细答案150道(126-135) -- 数据库交互篇
《前后端面试题
》专栏集合了前后端各个知识模块的面试题,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,SQL,Linux… 。
文章目录
- 一、本文面试题目录
- 126. Python中如何连接MySQL数据库?(如`pymysql`库)
- 127. 什么是ORM?Python中常用的ORM框架有哪些?(如SQLAlchemy、Django ORM)
- 128. 如何使用`sqlite3`模块操作SQLite数据库?
- 129. 什么是数据库事务?如何保证事务的ACID特性?
- 130. 如何防止SQL注入攻击?
- 131. 连接数据库时,为什么需要使用连接池?
- 132. 如何执行批量插入操作以提高效率?
- 133. MongoDB的Python驱动是什么?如何连接和操作MongoDB?
- 134. 什么是游标(Cursor)?在数据库操作中的作用?
- 135. 如何处理数据库连接异常?
- 二、150道Python面试题目录列表
一、本文面试题目录
126. Python中如何连接MySQL数据库?(如pymysql
库)
原理说明:
MySQL是一种关系型数据库,Python需通过数据库驱动(如pymysql
)与MySQL服务器建立连接,发送SQL语句并接收执行结果。pymysql
是Python中常用的MySQL驱动,遵循DB API 2.0规范,支持连接管理、SQL执行、结果处理等功能。
示例代码:
import pymysql
from pymysql.cursors import DictCursor# 1. 建立数据库连接
conn = pymysql.connect(host='localhost', # 数据库主机地址port=3306, # 端口(默认3306)user='root', # 用户名password='password', # 密码database='test_db', # 数据库名charset='utf8mb4', # 字符集cursorclass=DictCursor # 游标返回字典格式(默认元组)
)try:# 2. 创建游标对象(用于执行SQL)with conn.cursor() as cursor:# 3. 执行SQL查询sql = "SELECT * FROM users WHERE age > %s"cursor.execute(sql, (18,)) # 使用参数化查询防止SQL注入# 4. 获取结果results = cursor.fetchall() # 获取所有结果for row in results:print(f"ID: {row['id']}, Name: {row['name']}")# 5. 执行插入操作(需提交事务)insert_sql = "INSERT INTO users (name, age) VALUES (%s, %s)"cursor.execute(insert_sql, ("Alice", 25))conn.commit() # 提交事务(插入/更新/删除需显式提交)except pymysql.MySQLError as e:print(f"数据库错误:{e}")conn.rollback() # 出错时回滚事务
finally:# 6. 关闭连接conn.close()
关键步骤:
- 通过
pymysql.connect()
建立连接,需指定数据库地址、认证信息等。 - 使用游标(
cursor
)执行SQL语句,DictCursor
可返回字典格式结果(字段名作为键)。 - 对写操作(插入/更新/删除)需调用
conn.commit()
提交事务,出错时用rollback()
回滚。 - 操作完成后必须关闭连接,避免资源泄漏。
127. 什么是ORM?Python中常用的ORM框架有哪些?(如SQLAlchemy、Django ORM)
原理说明:
ORM(Object-Relational Mapping,对象关系映射)是一种编程技术,将数据库中的表结构映射为Python中的类,将表记录映射为类的实例,使开发者可通过操作对象而非编写SQL语句来实现数据库交互。ORM简化了数据库操作,屏蔽了不同数据库的语法差异。
常用ORM框架:
-
SQLAlchemy
- 功能全面的第三方ORM框架,支持多种数据库(MySQL、PostgreSQL、SQLite等),可通过“声明式基类”定义数据模型。
- 示例:
from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker# 初始化引擎(连接数据库) engine = create_engine('mysql+pymysql://root:password@localhost/test_db') Base = declarative_base() # 声明式基类# 定义数据模型(映射到users表) class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(50))age = Column(Integer)# 创建表(若不存在) Base.metadata.create_all(engine)# 创建会话(类似游标) Session = sessionmaker(bind=engine) session = Session()# 插入数据(操作对象) new_user = User(name="Bob", age=30) session.add(new_user) session.commit()# 查询数据 users = session.query(User).filter(User.age > 20).all() for user in users:print(f"ID: {user.id}, Name: {user.name}")session.close()
-
Django ORM
- Django框架内置的ORM,与Django无缝集成,适合快速开发Web应用。
- 示例(定义在Django的
models.py
中):from django.db import modelsclass User(models.Model):name = models.CharField(max_length=50)age = models.IntegerField()# 使用ORM(无需手动连接数据库,配置在settings.py中) # 插入 User.objects.create(name="Charlie", age=28) # 查询 adults = User.objects.filter(age__gt=18) # age > 18
优势:
- 无需编写SQL,降低学习成本和出错概率。
- 代码更易维护,数据库迁移(如表结构修改)更方便。
- 支持多种数据库,切换数据库时无需修改业务逻辑。
128. 如何使用sqlite3
模块操作SQLite数据库?
原理说明:
SQLite是一种嵌入式数据库(无需独立服务器),数据存储在单一文件中,适合小型应用或本地数据存储。Python标准库中的sqlite3
模块提供了操作SQLite的接口,用法与其他数据库驱动类似。
示例代码:
import sqlite3# 1. 连接数据库(文件不存在则自动创建)
conn = sqlite3.connect('mydb.db') # 数据库文件路径# 2. 创建游标
cursor = conn.cursor()# 3. 创建表
cursor.execute('''CREATE TABLE IF NOT EXISTS books (id INTEGER PRIMARY KEY AUTOINCREMENT,title TEXT NOT NULL,author TEXT NOT NULL,publish_year INTEGER)
''')# 4. 插入数据
books = [("Python编程", "张三", 2020),("SQL基础", "李四", 2019)
]
cursor.executemany('INSERT INTO books (title, author, publish_year) VALUES (?, ?, ?)', books)
conn.commit() # 提交事务# 5. 查询数据
cursor.execute('SELECT * FROM books WHERE publish_year > 2019')
rows = cursor.fetchall() # 返回元组列表
for row in rows:print(f"ID: {row[0]}, 书名: {row[1]}, 作者: {row[2]}")# 6. 更新数据
cursor.execute('UPDATE books SET publish_year = ? WHERE title = ?', (2021, "Python编程"))
conn.commit()# 7. 关闭连接
cursor.close()
conn.close()
特点:
- 无需安装额外驱动(
sqlite3
是标准库)。 - 连接时直接指定数据库文件路径(
connect('file.db')
),内存数据库可使用connect(':memory:')
。 - SQL参数占位符使用
?
(不同于pymysql
的%s
)。 - 适合开发测试、本地存储、小型应用,不适合高并发场景。
129. 什么是数据库事务?如何保证事务的ACID特性?
原理说明:
数据库事务(Transaction)是一组不可分割的SQL操作,要么全部执行成功,要么全部失败(如转账时“扣款”和“收款”必须同时完成)。事务通过ACID特性保证数据一致性:
- 原子性(Atomicity):事务中的操作要么全成,要么全败,无中间状态。
- 一致性(Consistency):事务执行前后,数据库从一个有效状态转换到另一个有效状态(如转账总金额不变)。
- 隔离性(Isolation):多个事务并发执行时,彼此互不干扰,结果等同于串行执行。
- 持久性(Durability):事务提交后,修改永久保存,即使系统崩溃也不丢失。
如何保证ACID:
- 原子性与一致性:通过日志(如undo log)实现,失败时回滚未完成的操作。
- 隔离性:通过锁机制和多版本控制(MVCC)实现,避免并发冲突(如脏读、不可重复读)。
- 持久性:通过redo log实现,事务提交后将修改写入磁盘。
示例(Python中使用事务):
import pymysqlconn = pymysql.connect(host='localhost', user='root', password='password', database='bank')
try:with conn.cursor() as cursor:# 开启事务(默认自动开启)# 转账操作:A扣100,B加100cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE name = 'A'")cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE name = 'B'")# 模拟异常(如余额不足)# if some_error:# raise Exception("转账失败")conn.commit() # 提交事务(所有操作生效)print("转账成功")
except Exception as e:conn.rollback() # 回滚事务(所有操作撤销)print(f"事务回滚:{e}")
finally:conn.close()
关键操作:
- 事务开始:连接建立后默认开启自动提交(
autocommit=True
),需关闭自动提交(conn.autocommit(False)
)以手动控制事务。 - 提交:
conn.commit()
使所有操作永久生效。 - 回滚:
conn.rollback()
在出错时撤销所有未提交的操作。
130. 如何防止SQL注入攻击?
原理说明:
SQL注入是攻击者通过构造恶意SQL片段插入输入参数,篡改原有SQL逻辑的攻击方式(如通过输入' OR '1'='1
绕过登录验证)。防止SQL注入的核心是避免将用户输入直接拼接进SQL语句,而应使用参数化查询。
防御方法及示例:
-
使用参数化查询(推荐)
通过占位符传递参数,数据库驱动会自动处理特殊字符,避免SQL注入。import pymysqldef get_user_safe(username):conn = pymysql.connect(host='localhost', database='test')try:with conn.cursor() as cursor:# 安全:使用%s作为占位符,参数单独传递sql = "SELECT * FROM users WHERE username = %s"cursor.execute(sql, (username,)) # 参数作为元组传入return cursor.fetchone()finally:conn.close()# 即使输入恶意字符串,也会被当作普通参数处理 get_user_safe("' OR '1'='1") # 不会返回所有用户
-
避免直接拼接SQL字符串(危险)
禁止将用户输入直接拼接到SQL中,以下代码存在注入风险:def get_user_unsafe(username):# 危险:直接拼接用户输入,可能被注入sql = f"SELECT * FROM users WHERE username = '{username}'"# 若username为"' OR '1'='1",SQL变为:# SELECT * FROM users WHERE username = '' OR '1'='1'(返回所有用户)
-
使用ORM框架
ORM自动实现参数化查询,进一步降低注入风险:# SQLAlchemy示例 from sqlalchemy.orm import sessionmaker from mymodels import Usersession = Session() # ORM自动处理参数,无注入风险 user = session.query(User).filter(User.username == username).first()
-
输入验证与过滤
对用户输入进行类型检查和格式验证(如只允许字母数字),但不能替代参数化查询:import redef validate_username(username):# 只允许字母、数字和下划线if not re.match(r'^[a-zA-Z0-9_]+$', username):raise ValueError("无效用户名")
总结:参数化查询是防止SQL注入的最有效手段,配合ORM和输入验证可进一步提升安全性。
131. 连接数据库时,为什么需要使用连接池?
原理说明:
数据库连接是稀缺资源,创建和关闭连接需消耗CPU、内存和网络资源(如TCP握手、认证)。连接池通过预先创建一定数量的连接并复用它们,避免频繁创建/销毁连接的开销,提高系统性能和稳定性。
连接池的优势:
- 减少资源消耗:复用现有连接,避免重复创建连接的开销(尤其在高并发场景)。
- 控制连接数量:防止并发连接过多导致数据库负载过高或连接失败。
- 提高响应速度:连接已预先创建,请求时无需等待连接建立。
Python中使用连接池(以DBUtils
为例):
from DBUtils.PooledDB import PooledDB
import pymysql# 配置连接池
pool = PooledDB(creator=pymysql, # 数据库驱动maxconnections=10, # 最大连接数mincached=2, # 初始化时保持的空闲连接数maxcached=5, # 最大空闲连接数host='localhost',user='root',password='password',database='test_db'
)# 从连接池获取连接
def query_data(sql):conn = pool.connection() # 获取连接(非新建)try:with conn.cursor() as cursor:cursor.execute(sql)return cursor.fetchall()finally:conn.close() # 归还连接到池(非真正关闭)# 并发场景下复用连接
for _ in range(20):query_data("SELECT 1") # 连接池自动管理连接分配与回收
适用场景:
- 高并发应用(如Web服务),频繁需要数据库连接。
- 数据库服务器对并发连接数有限制的场景。
注意:连接池大小需合理配置(通常设为CPU核心数的2-4倍),过大可能导致数据库压力过大,过小则无法充分利用资源。
132. 如何执行批量插入操作以提高效率?
原理说明:
单条插入(INSERT
)操作需多次与数据库交互(网络往返、事务提交),效率低下。批量插入通过一次SQL请求插入多条记录,减少交互次数,显著提升插入大量数据的效率。
实现方法及示例:
-
使用
executemany()
方法
大多数数据库驱动支持executemany()
,一次性执行多条相同结构的SQL语句。import pymysqlconn = pymysql.connect(host='localhost', database='test', user='root', password='password') try:with conn.cursor() as cursor:# 批量插入数据(1000条记录)data = [("user" + str(i), 20 + i % 30) for i in range(1000)]sql = "INSERT INTO users (name, age) VALUES (%s, %s)"# 批量执行(一次网络请求)cursor.executemany(sql, data)conn.commit()print(f"插入{len(data)}条记录成功") finally:conn.close()
-
拼接多行
INSERT
语句
部分数据库支持INSERT INTO ... VALUES (...), (...), (...)
语法,进一步减少SQL解析开销。# 适用于MySQL/SQLite等支持多行VALUES的数据库 def batch_insert(conn, data):with conn.cursor() as cursor:# 生成VALUES部分:(%s,%s), (%s,%s), ...placeholders = ", ".join(["(%s, %s)"] * len(data))sql = f"INSERT INTO users (name, age) VALUES {placeholders}"# 扁平化数据列表([(a,b), (c,d)] → [a,b,c,d])flat_data = [item for row in data for item in row]cursor.execute(sql, flat_data)conn.commit()
-
ORM框架的批量插入
SQLAlchemy和Django ORM提供批量插入API,内部优化执行效率:# SQLAlchemy示例 from sqlalchemy.orm import sessionmaker from mymodels import Usersession = Session() # 批量创建对象 users = [User(name=f"user{i}", age=20 + i % 30) for i in range(1000)] session.bulk_save_objects(users) # 批量插入 session.commit()
性能对比:
- 单条插入1000条记录:需1000次网络交互,耗时较长。
- 批量插入:1次网络交互,耗时通常为单条插入的1/10~1/50。
注意:批量插入的数据量不宜过大(如单次不超过1万条),避免SQL语句过长或占用过多内存。
133. MongoDB的Python驱动是什么?如何连接和操作MongoDB?
原理说明:
MongoDB是一种文档型NoSQL数据库,数据以BSON(类JSON)格式存储。Python中官方推荐的MongoDB驱动是pymongo
,用于连接MongoDB并执行CRUD(创建、读取、更新、删除)操作。
安装与基本操作示例:
-
安装pymongo
pip install pymongo
-
连接MongoDB并操作
from pymongo import MongoClient from pymongo.errors import PyMongoError# 1. 连接MongoDB(本地默认端口27017) client = MongoClient('mongodb://localhost:27017/')# 2. 获取数据库(不存在则自动创建) db = client['mydatabase']# 3. 获取集合(类似关系型数据库的表) collection = db['users']try:# 4. 插入文档(类似记录)# 插入单条user1 = {"name": "Alice", "age": 25, "hobbies": ["reading", "hiking"]}result = collection.insert_one(user1)print(f"插入ID:{result.inserted_id}")# 插入多条users = [{"name": "Bob", "age": 30, "hobbies": ["gaming"]},{"name": "Charlie", "age": 35}]result = collection.insert_many(users)print(f"插入多条ID:{result.inserted_ids}")# 5. 查询文档# 查询单条alice = collection.find_one({"name": "Alice"})print(f"查询Alice:{alice}")# 查询多条(年龄>28)adults = collection.find({"age": {"$gt": 28}}) # $gt表示大于for user in adults:print(f"成年用户:{user['name']},年龄:{user['age']}")# 6. 更新文档collection.update_one({"name": "Bob"},{"$set": {"age": 31}} # $set表示更新字段)# 7. 删除文档collection.delete_one({"name": "Charlie"})except PyMongoError as e:print(f"MongoDB操作错误:{e}")
关键概念:
- 文档(Document):类似JSON的键值对数据(如
{"name": "Alice"}
)。 - 集合(Collection):文档的集合,无需预先定义结构。
- 查询操作符:如
$gt
(大于)、$in
(包含)、$set
(更新)等,用于复杂查询和更新。
适用场景:
适合存储非结构化/半结构化数据(如日志、用户行为)、需要快速迭代的场景,不适合强事务性需求(如金融交易)。
134. 什么是游标(Cursor)?在数据库操作中的作用?
原理说明:
游标(Cursor)是数据库操作中用于遍历查询结果的指针或迭代器。当执行查询语句(如SELECT
)时,数据库返回的结果可能包含大量记录,游标允许客户端逐条访问这些记录,而非一次性加载全部数据到内存。
作用:
- 分批处理大数据集:避免一次性加载大量数据导致内存溢出。
- 节省资源:仅在需要时获取下一条记录,减少网络传输和内存占用。
- 支持逐行操作:便于对每条记录进行处理(如过滤、转换)后再处理下一条。
示例(关系型数据库与MongoDB中的游标):
-
关系型数据库(如MySQL)中的游标
import pymysqlconn = pymysql.connect(host='localhost', database='test') with conn.cursor() as cursor:# 执行查询,返回游标cursor.execute("SELECT * FROM large_table")# 逐行获取(内存友好)while True:row = cursor.fetchone() # 获取下一条记录if not row:break # 遍历结束# 处理单条记录print(f"处理:{row}") conn.close()
-
MongoDB中的游标
from pymongo import MongoClientclient = MongoClient() collection = client['mydb']['large_collection']# find()返回游标对象 cursor = collection.find()# 迭代游标(逐行获取) for doc in cursor:print(f"处理文档:{doc['_id']}")
游标特性:
- 惰性加载:游标创建时不立即获取所有数据,仅在调用
fetchone()
、next()
或迭代时才请求数据。 - 一次性使用:游标遍历结束后无法重置,需重新执行查询获取新游标。
- 可配置:部分数据库支持设置游标批次大小(如
cursor.arraysize = 100
),控制每次从服务器获取的记录数。
总结:游标是处理大数据集的核心工具,通过按需加载数据,平衡了查询效率和内存消耗。
135. 如何处理数据库连接异常?
原理说明:
数据库连接过程中可能出现多种异常(如网络中断、认证失败、数据库宕机),若不妥善处理,可能导致程序崩溃或资源泄漏。处理连接异常需捕获特定错误、释放资源,并实现重试机制(可选)。
处理方法及示例:
-
捕获特定异常类型
不同数据库驱动定义了特定异常类(如pymysql.MySQLError
、sqlite3.Error
),需针对性捕获。import pymysql from pymysql import OperationalError, ProgrammingErrordef safe_query(sql):conn = Nonetry:# 尝试连接conn = pymysql.connect(host='localhost',user='root',password='password',database='test',connect_timeout=5 # 连接超时时间(秒))with conn.cursor() as cursor:cursor.execute(sql)return cursor.fetchall()except OperationalError as e:# 处理连接相关错误(如网络问题、数据库未启动)print(f"连接错误:{e}")except ProgrammingError as e:# 处理SQL语法错误print(f"SQL错误:{e}")except Exception as e:# 捕获其他未知错误print(f"未知错误:{e}")finally:# 确保连接关闭(无论是否出错)if conn:try:conn.close()except Exception as e:print(f"关闭连接失败:{e}")return None
-
实现重试机制
对临时性错误(如网络波动),可通过重试提高成功率。import timedef query_with_retry(sql, max_retries=3, delay=2):for retry in range(max_retries):try:# 调用上面的safe_query函数result = safe_query(sql)if result is not None:return resultexcept OperationalError:# 仅对连接错误重试if retry < max_retries - 1:print(f"重试第{retry+1}次...")time.sleep(delay) # 等待后重试print(f"超过最大重试次数({max_retries}次)")return None
-
使用连接池的异常处理
连接池可自动检测无效连接并创建新连接,减少手动处理成本:from DBUtils.PooledDB import PooledDB import pymysql# 配置连接池(自动处理无效连接) pool = PooledDB(creator=pymysql,maxconnections=10,ping=1 # 每次获取连接时检查连接有效性 )def pool_query(sql):try:conn = pool.connection()with conn.cursor() as cursor:cursor.execute(sql)return cursor.fetchall()except Exception as e:print(f"查询错误:{e}")finally:if 'conn' in locals():conn.close() # 归还到池,池会处理无效连接
最佳实践:
- 明确捕获特定异常(避免使用
except Exception
捕获所有错误)。 - 确保
finally
块中关闭连接,防止资源泄漏。 - 对临时性错误实现有限次数的重试(避免无限循环)。
- 记录详细错误日志(而非仅打印),便于排查问题。
二、150道Python面试题目录列表
文章序号 | Python面试题150道 |
---|---|
1 | Python面试题及详细答案150道(01-15) |
2 | Python面试题及详细答案150道(16-30) |
3 | Python面试题及详细答案150道(31-40) |
4 | Python面试题及详细答案150道(41-55) |
5 | Python面试题及详细答案150道(56-70) |
6 | Python面试题及详细答案150道(71-80) |
7 | Python面试题及详细答案150道(81-90) |
8 | Python面试题及详细答案150道(91-100) |
9 | Python面试题及详细答案150道(101-115) |
10 | Python面试题及详细答案150道(116-125) |
11 | Python面试题及详细答案150道(126-135) |
12 | Python面试题及详细答案150道(136-150) |