四、Python 脚本常用模块(续)
4.3 Python 脚本与数据库交互
在 Python 脚本开发中,数据库是实现数据持久化存储、批量数据处理、多脚本数据共享的核心工具。无论是日常办公中的数据统计(如员工信息管理)、运维场景下的日志存储(如服务器运行状态记录),还是数据分析中的数据预处理(如用户行为数据清洗),都需要通过数据库实现高效的数据管理。
Python 对数据库的支持极为完善:一方面,标准库内置了sqlite3模块,可直接操作轻量级 SQLite 数据库,无需额外安装;另一方面,针对 MySQL、PostgreSQL 等主流关系型数据库,以及 MongoDB 等非关系型数据库,都有成熟的第三方库(如pymysql、psycopg2、pymongo),能满足不同场景下的需求。
本节将从数据库基础认知出发,分场景讲解 Python 操作各类数据库的核心知识,每个模块均包含「环境准备 - 连接配置 - 核心操作 - 实战示例 - 常见问题」,确保你能快速将数据库能力融入 Python 脚本开发。
4.3.1 数据库基础认知
在学习 Python 操作数据库前,需先明确数据库的核心分类与适用场景,避免开发时选错工具。
4.3.1.1 数据库的核心分类
数据库主要分为关系型数据库(RDBMS) 和非关系型数据库(NoSQL) 两大类,二者在数据结构、存储方式、适用场景上差异显著:
类型 | 核心特点 | 代表产品 | 适用场景 | 数据存储形式 |
关系型数据库 | 基于关系模型(二维表),支持 SQL 查询,事务 ACID 特性 | MySQL、PostgreSQL、SQLite | 数据结构固定(如用户表、订单表)、需事务保证(如支付) | 表(行 + 列) |
非关系型数据库 | 无固定 schema,灵活存储,查询效率高 | MongoDB、Redis | 非结构化数据(如日志、JSON)、高频读写(如缓存) | 文档(JSON)、键值对 |
对 Python 脚本而言:
- 若需轻量级本地存储(如脚本运行日志、单机数据统计),优先选SQLite(无需服务,文件式存储);
- 若需多脚本共享数据(如团队协作的业务数据),优先选MySQL/PostgreSQL(支持网络访问,事务可靠);
- 若需存储非结构化数据(如爬虫获取的 HTML 内容、用户行为日志),优先选MongoDB(文档式存储,灵活适配动态字段)。
4.3.1.2 核心概念辨析
无论哪种数据库,Python 脚本操作时都需理解以下基础概念:
- 连接(Connection):脚本与数据库的通信通道,所有操作需建立在连接基础上。使用后需关闭,避免资源泄漏;
- 游标(Cursor):关系型数据库中执行 SQL 语句的工具,通过游标执行查询、获取结果;
- 事务(Transaction):一组不可分割的操作(如 “扣款 + 下单”),需满足 ACID 特性(原子性、一致性、隔离性、持久性),避免数据异常;
- CRUD:数据库核心操作的缩写,即创建(Create)、读取(Read)、更新(Update)、删除(Delete),是脚本与数据库交互的核心场景。
4.3.2 标准库 sqlite3:轻量级本地数据库
sqlite3是 Python 标准库内置模块,无需额外安装,可直接操作 SQLite 数据库。SQLite 是文件式数据库(一个数据库对应一个.db文件),无需部署服务器,适合单机脚本、轻量级数据存储场景(如脚本配置存储、本地日志记录)。
4.3.2.1 环境准备
无需安装任何软件,Python 3.x 默认自带sqlite3模块,直接导入即可使用。
4.3.2.2 核心操作:从连接到 CRUD
步骤 1:建立数据库连接
通过sqlite3.connect()创建连接,参数为数据库文件路径(若文件不存在,会自动在指定路径创建):
import sqlite3
from sqlite3 import Error
def create_sqlite_connection(db_file):
"""
创建SQLite数据库连接
Args:
db_file (str): 数据库文件路径(如"script_log.db")
Returns:
sqlite3.Connection: 数据库连接对象,失败则返回None
"""
conn = None
try:
# 建立连接,check_same_thread=False允许跨线程使用(脚本开发常用)
conn = sqlite3.connect(db_file, check_same_thread=False)
print(f"SQLite连接成功(版本:{sqlite3.version})")
# 创建游标对象(用于执行SQL)
cursor = conn.cursor()
return conn, cursor
except Error as e:
print(f"SQLite连接失败:{e}")
return None, None
# 调用函数创建连接(数据库文件将保存在当前工作目录)
conn, cursor = create_sqlite_connection("script_log.db")
步骤 2:创建表(Create Table)
通过游标执行CREATE TABLE语句,定义表结构(如创建 “脚本运行日志表”,记录时间、脚本名、状态、日志内容):
def create_log_table(cursor):
"""创建脚本运行日志表"""
try:
# SQL语句:定义表名(script_logs)和字段(id为主键自增)
create_table_sql = '''
CREATE TABLE IF NOT EXISTS script_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_time TEXT NOT NULL, -- 运行时间(字符串格式,如"2025-08-26 16:30:00")
script_name TEXT NOT NULL, -- 脚本文件名(如"excel_updater.py")
status TEXT NOT NULL, -- 运行状态("success"或"failed")
log_content TEXT -- 日志内容(如错误信息)
);
'''
# 执行SQL语句
cursor.execute(create_table_sql)
print("脚本运行日志表创建成功(若不存在)")
except Error as e:
print(f"创建表失败:{e}")
# 调用函数创建表
if cursor:
create_log_table(cursor)
步骤 3:核心操作(CRUD)
SQLite 的 CRUD 操作均通过游标执行 SQL 语句实现,需注意参数化查询(避免 SQL 注入,禁止字符串拼接 SQL)。
3.1 插入数据(Create)
通过cursor.execute()执行INSERT语句,用?作为占位符传递参数:
from datetime import datetime
def insert_log(cursor, conn, script_name, status, log_content=""):
"""
插入一条脚本运行日志
Args:
cursor: 游标对象
conn: 数据库连接对象(用于提交事务)
script_name (str): 脚本文件名
status (str): 运行状态("success"或"failed")
log_content (str): 日志内容(可选)
"""
try:
# 1. 定义插入SQL(?为参数占位符)
insert_sql = '''
INSERT INTO script_logs (run_time, script_name, status, log_content)
VALUES (?, ?, ?, ?);
'''
# 2. 准备参数(与占位符顺序一致)
run_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
params = (run_time, script_name, status, log_content)
# 3. 执行SQL
cursor.execute(insert_sql, params)
# 4. 提交事务(SQLite默认开启事务,需手动提交才会写入数据库)
conn.commit()
print(f"日志插入成功(ID:{cursor.lastrowid})") # lastrowid获取插入数据的主键ID
except Error as e:
# 若出错,回滚事务(避免数据部分插入)
conn.rollback()
print(f"日志插入失败:{e}")
# 调用函数插入2条日志(1条成功,1条失败)
if conn and cursor:
insert_log(cursor, conn, "excel_updater.py", "success", "Excel数据更新完成,共13条记录")
insert_log(cursor, conn, "api_fetcher.py", "failed", "请求超时,错误信息:Connection timed out")
3.2 查询数据(Read)
通过cursor.execute()执行SELECT语句,用cursor.fetchone()(获取一条)、cursor.fetchall()(获取所有)、cursor.fetchmany(n)(获取 n 条)获取结果:
def query_logs(cursor, status=None, script_name=None):
"""
查询脚本运行日志
Args:
cursor: 游标对象
status (str): 筛选状态(可选,如"failed")
script_name (str): 筛选脚本名(可选,如"excel_updater.py")
Returns:
list: 查询结果列表(每条记录为元组,对应字段顺序)
"""
try:
# 基础查询SQL
query_sql = "SELECT * FROM script_logs"
params = []
# 动态添加筛选条件(避免字符串拼接,用参数化查询)
conditions = []
if status:
conditions.append("status = ?")
params.append(status)
if script_name:
conditions.append("script_name = ?")
params.append(script_name)
# 拼接筛选条件
if conditions:
query_sql += " WHERE " + " AND ".join(conditions)
# 按运行时间倒序(最新日志在前)
query_sql += " ORDER BY run_time DESC;"
# 执行查询
cursor.execute(query_sql, params)
# 获取所有结果(若数据量大,用fetchmany分批获取,避免内存占用过高)
results = cursor.fetchall()
print(f"查询到 {len(results)} 条日志:")
# 遍历结果(每条记录的索引对应表字段顺序:id[0], run_time[1], script_name[2], status[3], log_content[4])
for idx, log in enumerate(results, 1):
print(f"\n{idx}. 日志ID:{log[0]}")
print(f" 运行时间:{log[1]}")
print(f" 脚本名称:{log[2]}")
print(f" 运行状态:{log[3]}")
print(f" 日志内容:{log[4]}")
return results
except Error as e:
print(f"日志查询失败:{e}")
return []
# 调用函数查询(示例1:查询所有失败日志;示例2:查询指定脚本的日志)
if cursor:
print("=== 查询所有失败日志 ===")
query_logs(cursor, status="failed")
print("\n=== 查询excel_updater.py的所有日志 ===")
query_logs(cursor, script_name="excel_updater.py")
3.3 更新数据(Update)
通过cursor.execute()执行UPDATE语句,更新已有数据(如修正错误的日志状态):
def update_log_status(cursor, conn, log_id, new_status):
"""
更新日志的运行状态
Args:
cursor: 游标对象
conn: 数据库连接对象
log_id (int): 日志ID(主键,用于定位数据)
new_status (str): 新状态("success"或"failed")
"""
try:
update_sql = "UPDATE script_logs SET status = ? WHERE id = ?;"
params = (new_status, log_id)
cursor.execute(update_sql, params)
conn.commit()
# rowcount获取受影响的行数
if cursor.rowcount > 0:
print(f"日志ID {log_id} 状态更新成功(新状态:{new_status})")
else:
print(f"未找到日志ID {log_id},无数据更新")
except Error as e:
conn.rollback()
print(f"日志更新失败:{e}")
# 调用函数更新日志(假设将ID为2的日志状态从failed改为success)
if conn and cursor:
update_log_status(cursor, conn, log_id=2, new_status="success")
3.4 删除数据(Delete)
通过cursor.execute()执行DELETE语句,删除数据(如清理 30 天前的旧日志):
def delete_old_logs(cursor, conn, days=30):
"""
删除指定天数前的旧日志(避免数据库文件过大)
Args:
cursor: 游标对象
conn: 数据库连接对象
days (int): 保留天数(默认保留30天内的日志)
"""
try:
# 计算30天前的日期(SQLite支持日期函数,strftime('%Y-%m-%d', 'now', '-? days'))
delete_sql = '''
DELETE FROM script_logs
WHERE run_time < strftime('%Y-%m-%d', 'now', '-? days');
'''
params = (days,)
cursor.execute(delete_sql, params)
conn.commit()
print(f"成功删除 {cursor.rowcount} 条 {days} 天前的旧日志")
except Error as e:
conn.rollback()
print(f"日志删除失败:{e}")
# 调用函数删除90天前的旧日志
if conn and cursor:
delete_old_logs(cursor, conn, days=90)
步骤 4:关闭连接
操作完成后,需关闭游标和连接,释放资源:
def close_sqlite_connection(conn, cursor):
"""关闭SQLite连接和游标"""
try:
if cursor:
cursor.close()
print("游标已关闭")
if conn:
conn.close()
print("数据库连接已关闭")
except Error as e:
print(f"关闭连接失败:{e}")
# 调用函数关闭连接
if conn and cursor:
close_sqlite_connection(conn, cursor)
4.3.2.3 实战示例:脚本运行日志管理器
结合上述操作,编写一个完整的 “脚本运行日志管理器” 脚本,实现日志的自动记录、查询、清理:
import sqlite3
from sqlite3 import Error
from datetime import datetime
class SQLiteLogManager:
"""SQLite日志管理类(封装连接、CRUD、关闭操作)"""
def __init__(self, db_file="script_log.db"):
self.db_file = db_file
self.conn, self.cursor = self._create_connection()
def _create_connection(self):
"""内部方法:创建数据库连接"""
conn = None
try:
conn = sqlite3.connect(self.db_file, check_same_thread=False)
cursor = conn.cursor()
self._create_log_table(cursor) # 初始化时自动创建表
return conn, cursor
except Error as e:
print(f"连接失败:{e}")
return None, None
def _create_log_table(self, cursor):
"""内部方法:创建日志表"""
create_sql = '''
CREATE TABLE IF NOT EXISTS script_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_time TEXT NOT NULL,
script_name TEXT NOT NULL,
status TEXT NOT NULL,
log_content TEXT
);
'''
try:
cursor.execute(create_sql)
except Error as e:
print(f"创建表失败:{e}")
def record_log(self, script_name, status, log_content=""):
"""记录日志"""
if not self.conn or not self.cursor:
print("未建立数据库连接,无法记录日志")
return
try:
insert_sql = '''
INSERT INTO script_logs (run_time, script_name, status, log_content)
VALUES (?, ?, ?, ?);
'''
run_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.cursor.execute(insert_sql, (run_time, script_name, status, log_content))
self.conn.commit()
print(f"日志记录成功(ID:{self.cursor.lastrowid})")
except Error as e:
self.conn.rollback()
print(f"记录日志失败:{e}")
def query_logs(self, status=None, script_name=None):
"""查询日志"""
if not self.cursor:
print("未建立数据库连接,无法查询日志")
return []
try:
query_sql = "SELECT * FROM script_logs"
params = []
conditions = []
if status:
conditions.append("status = ?")
params.append(status)
if script_name:
conditions.append("script_name = ?")
params.append(script_name)
if conditions:
query_sql += " WHERE " + " AND ".join(conditions)
query_sql += " ORDER BY run_time DESC;"
self.cursor.execute(query_sql, params)
results = self.cursor.fetchall()
print(f"查询到 {len(results)} 条日志:")
for idx, log in enumerate(results, 1):
print(f"\n{idx}. ID:{log[0]} | 时间:{log[1]} | 脚本:{log[2]} | 状态:{log[3]}")
print(f" 内容:{log[4]}")
return results
except Error as e:
print(f"查询日志失败:{e}")
return []
def clean_old_logs(self, days=30):
"""清理旧日志"""
if not self.conn or not self.cursor:
print("未建立数据库连接,无法清理日志")
return
try:
delete_sql = '''
DELETE FROM script_logs
WHERE run_time < strftime('%Y-%m-%d', 'now', '-? days');
'''
self.cursor.execute(delete_sql, (days,))
self.conn.commit()
print(f"清理完成,共删除 {self.cursor.rowcount} 条旧日志")
except Error as e:
self.conn.rollback()
print(f"清理日志失败:{e}")
def close(self):
"""关闭连接"""
try:
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("连接已关闭")
except Error as e:
print(f"关闭连接失败:{e}")
# ------------------- 使用示例 -------------------
if __name__ == "__main__":
# 1. 初始化日志管理器
log_manager = SQLiteLogManager()
# 2. 记录2条脚本运行日志
log_manager.record_log(
script_name="excel_updater.py",
status="success",
log_content="更新Excel的L列数据完成,共13个unitid"
)
log_manager.record_log(
script_name="api_fetcher.py",
status="failed",
log_content="请求API时超时,错误:Connection timed out after 10s"
)
# 3. 查询失败日志
print("\n=== 查询所有失败日志 ===")
log_manager.query_logs(status="failed")
# 4. 清理90天前的旧日志
print("\n=== 清理90天前的旧日志 ===")
log_manager.clean_old_logs(days=90)
# 5. 关闭连接
log_manager.close()
4.3.2.4 常见问题与解决
- “数据库被锁定” 错误:
原因:多个脚本同时操作同一个 SQLite 文件,或前一个连接未关闭。
解决:① 确保操作完成后关闭连接;② 对写入操作加锁(用threading.Lock());③ 高并发场景改用 MySQL。
- 日期筛选无效:
原因:SQLite 的日期以字符串存储,需用strftime函数处理。
解决:参考delete_old_logs方法,用strftime('%Y-%m-%d', run_time)转换日期格式后筛选。
- 中文乱码:
原因:数据库默认编码不支持中文。
解决:建立连接时指定编码(conn = sqlite3.connect(db_file, check_same_thread=False, detect_types=sqlite3.PARSE_DECLTYPES)),且插入中文时确保字符串为 UTF-8 编码。
4.3.3 第三方库 pymysql:操作 MySQL 数据库
MySQL 是工业级关系型数据库,支持网络访问、多用户协作、高并发,适合多脚本共享数据的场景(如团队的业务数据统计、批量数据处理)。Python 通过pymysql库操作 MySQL,需先安装库并配置 MySQL 环境。
4.3.3.1 环境准备
1. 安装 MySQL 服务
- Windows/MacOS:下载 MySQL 安装包(https://dev.mysql.com/downloads/mysql/),按向导安装,记住 root 用户密码。
- Linux(Ubuntu):执行命令sudo apt update && sudo apt install mysql-server,安装后用sudo mysql_secure_installation设置密码。
2. 安装 pymysql 库
用pip安装:
pip install pymysql
3. 准备 MySQL 环境
- 登录 MySQL:mysql -u root -p(输入密码);
- 创建数据库(如script_db):CREATE DATABASE IF NOT EXISTS script_db DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci;;
- 授权用户(允许 Python 脚本访问):GRANT ALL PRIVILEGES ON script_db.* TO 'python_user'@'%' IDENTIFIED BY '123456';(python_user为用户名,123456为密码,%允许所有 IP 访问);
- 刷新权限:FLUSH PRIVILEGES;。
4.3.3.2 核心操作:从连接到 CRUD
步骤 1:建立数据库连接
通过pymysql.connect()创建连接,参数需与 MySQL 配置一致:
import pymysql
from pymysql import Error
from datetime import datetime
def create_mysql_connection(db_name="script_db", host="localhost", user="python_user", password="123456", port=3306):
"""
创建MySQL数据库连接
Args:
db_name (str): 数据库名
host (str): MySQL服务器地址(本地为localhost)
user (str): MySQL用户名
password (str): MySQL密码
port (int): MySQL端口(默认3306)
Returns:
pymysql.connections.Connection: 连接对象,失败则返回None
"""
conn = None
try:
conn = pymysql.connect(
host=host,
user=user,
password=password,
database=db_name,
port=port,
charset="utf8mb4", # 支持中文
cursorclass=pymysql.cursors.DictCursor # 游标结果以字典返回(默认元组)
)
print("MySQL连接成功")
return conn
except Error as e:
print(f"MySQL连接失败:{e}")
# 常见错误排查:① 用户名/密码错误;② 数据库不存在;③ 端口被占用;④ 远程访问未授权
if "Unknown database" in str(e):
print(f"提示:数据库 {db_name} 不存在,请先创建")
elif "Access denied" in str(e):
print("提示:用户名或密码错误,或无访问权限")
return None
# 调用函数创建连接
conn = create_mysql_connection()
步骤 2:创建表(Create Table)
通过conn.cursor()创建游标,执行CREATE TABLE语句(如创建 “员工信息表”,存储 ID、姓名、部门、入职时间、薪资):
def create_employee_table(conn):
"""创建员工信息表"""
if not conn:
print("未建立连接,无法创建表")
return
try:
# 创建游标
with conn.cursor() as cursor: # with语句自动关闭游标
create_table_sql = '''
CREATE TABLE IF NOT EXISTS employees (
id INT PRIMARY KEY AUTO_INCREMENT COMMENT '员工ID(主键)',
name VARCHAR(50) NOT NULL COMMENT '员工姓名',
department VARCHAR(50) NOT NULL COMMENT '部门(如技术部、人事部)',
hire_date DATE NOT NULL COMMENT '入职日期(YYYY-MM-DD)',
salary DECIMAL(10, 2) NOT NULL COMMENT '薪资(保留2位小数)',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='员工信息表';
'''
# 执行SQL
cursor.execute(create_table_sql)
# 提交事务(MySQL默认开启事务,需手动提交)
conn.commit()
print("员工信息表创建成功(若不存在)")
except Error as e:
# 回滚事务
conn.rollback()
print(f"创建表失败:{e}")
# 调用函数创建表
if conn:
create_employee_table(conn)
步骤 3:核心操作(CRUD)
3.1 插入数据(Create)
支持单条插入和批量插入,批量插入效率远高于单条循环插入(适合脚本批量处理数据):
def insert_employees(conn, employees):
"""
批量插入员工数据
Args:
conn: MySQL连接对象
employees (list): 员工数据列表(每个元素为字典,对应一条记录)
"""
if not conn or not employees:
print("无连接或无数据,无法插入")
return
try:
with conn.cursor() as cursor:
# 批量插入SQL(%s为参数占位符,与pymysql的参数化查询匹配)
insert_sql = '''
INSERT INTO employees (name, department, hire_date, salary)
VALUES (%s, %s, %s, %s);
'''
# 准备参数(将列表中的字典转换为元组,顺序与SQL占位符一致)
params = [
(emp["name"], emp["department"], emp["hire_date"], emp["salary"])
for emp in employees
]
# 批量执行(executemany效率高于多次execute)
cursor.executemany(insert_sql, params)
conn.commit()
print(f"批量插入成功,共 {cursor.rowcount} 条员工数据")
except Error as e:
conn.rollback()
print(f"插入失败:{e}")
# 准备批量插入的员工数据(示例:从Excel读取后整理成此格式)
employee_data = [
{"name": "张三", "department": "技术部", "hire_date": "2023-01-15", "salary": 8500.00},
{"name": "李四", "department": "人事部", "hire_date": "2023-03-20", "salary": 6800.00},
{"name": "王五", "department": "技术部", "hire_date": "2023-05-10", "salary": 9200.00},
{"name": "赵六", "department": "财务部", "hire_date": "2023-07-05", "salary": 7500.00}
]
# 调用函数批量插入
if conn:
insert_employees(conn, employee_data)
3.2 查询数据(Read)
通过cursor.fetchone()/fetchall()获取结果,支持条件筛选、排序、分页(适合脚本数据统计):
def query_employees(conn, department=None, min_salary=None, page=1, page_size=10):
"""
查询员工数据(支持按部门、薪资筛选,分页查询)
Args:
conn: MySQL连接对象
department (str): 部门筛选(可选)
min_salary (float): 最低薪资筛选(可选)
page (int): 页码(默认第1页)
page_size (int): 每页条数(默认10条)
Returns:
dict: 查询结果(total:总条数,data:当前页数据)
"""
if not conn:
print("未建立连接,无法查询")
return {"total": 0, "data": []}
try:
with conn.cursor() as cursor:
# 1. 查询总条数(用于分页)
count_sql = "SELECT COUNT(*) AS total FROM employees"
params = []
conditions = []
# 动态添加筛选条件
if department:
conditions.append("department = %s")
params.append(department)
if min_salary is not None:
conditions.append("salary >= %s")
params.append(min_salary)
if conditions:
count_sql += " WHERE " + " AND ".join(conditions)
cursor.execute(count_sql, params)
total = cursor.fetchone()["total"] # 因cursorclass是DictCursor,结果为字典
# 2. 查询当前页数据(分页公式:OFFSET (page-1)*page_size)
query_sql = "SELECT * FROM employees"
if conditions:
query_sql += " WHERE " + " AND ".join(conditions)
# 按入职时间倒序,分页
query_sql += f" ORDER BY hire_date DESC LIMIT %s OFFSET %s;"
# 分页参数(LIMIT后为page_size,OFFSET后为偏移量)
query_params = params + [page_size, (page-1)*page_size]
cursor.execute(query_sql, query_params)
data = cursor.fetchall()
# 打印结果
print(f"\n=== 第 {page} 页员工数据(共 {total} 条) ===")
for emp in data:
print(f"ID:{emp['id']} | 姓名:{emp['name']} | 部门:{emp['department']}")
print(f"入职时间:{emp['hire_date']} | 薪资:{emp['salary']} | 创建时间:{emp['create_time']}")
return {"total": total, "data": data}
except Error as e:
print(f"查询失败:{e}")
return {"total": 0, "data": []}
# 调用函数查询(示例1:查询技术部薪资≥8000的员工,第1页,每页2条)
if conn:
query_result = query_employees(
conn,
department="技术部",
min_salary=8000.00,
page=1,
page_size=2
)
print(f"\n查询结果:共 {query_result['total']} 条,当前页 {len(query_result['data'])} 条")
3.3 更新数据(Update)
通过UPDATE语句更新数据,支持批量更新(如调整某部门所有员工的薪资):
def update_employee_salary(conn, department, salary_increase):
"""
批量调整某部门员工的薪资(加薪)
Args:
conn: MySQL连接对象
department (str): 目标部门
salary_increase (float): 加薪金额(如500.00表示每人加500)
"""
if not conn or salary_increase <= 0:
print("未建立连接或加薪金额无效,无法更新")
return
try:
with conn.cursor() as cursor:
# SQL:salary = salary + %s(批量加薪)
update_sql = '''
UPDATE employees
SET salary = salary + %s
WHERE department = %s;
'''
params = (salary_increase, department)
cursor.execute(update_sql, params)
conn.commit()
if cursor.rowcount > 0:
print(f"部门 {department} 薪资更新成功,共 {cursor.rowcount} 名员工加薪 {salary_increase} 元")
else:
print(f"部门 {department} 无员工数据,未更新")
except Error as e:
conn.rollback()
print(f"薪资更新失败:{e}")
# 调用函数:给技术部员工每人加薪500元
if conn:
update_employee_salary(conn, department="技术部", salary_increase=500.00)
3.4 删除数据(Delete)
通过DELETE语句删除数据,需谨慎操作(建议先查询确认,或添加软删除字段如is_deleted):
def delete_employee_by_id(conn, emp_id):
"""
按员工ID删除数据(硬删除,谨慎使用)
Args:
conn: MySQL连接对象
emp_id (int): 员工ID
"""
if not conn:
print("未建立连接,无法删除")
return
# 先查询确认(避免误删)
with conn.cursor() as cursor:
cursor.execute("SELECT name FROM employees WHERE id = %s;", (emp_id,))
emp = cursor.fetchone()
if not emp:
print(f"未找到ID为 {emp_id} 的员工,无需删除")
return
print(f"确认删除员工:{emp['name']}(ID:{emp_id})")
# 确认后删除
try:
with conn.cursor() as cursor:
delete_sql = "DELETE FROM employees WHERE id = %s;"
cursor.execute(delete_sql, (emp_id,))
conn.commit()
print(f"员工ID {emp_id} 删除成功")
except Error as e:
conn.rollback()
print(f"删除失败:{e}")
# 调用函数删除ID为4的员工(示例,实际需谨慎)
if conn:
delete_employee_by_id(conn, emp_id=4)
步骤 4:关闭连接
def close_mysql_connection(conn):
"""关闭MySQL连接"""
if conn:
try:
conn.close()
print("MySQL连接已关闭")
except Error as e:
print(f"关闭连接失败:{e}")
# 调用函数关闭连接
if conn:
close_mysql_connection(conn)
4.3.3.3 实战示例:员工薪资统计脚本
结合pandas(之前章节讲解的 Excel 处理库),编写一个 “读取 Excel 员工数据→批量插入 MySQL→统计薪资→导出统计结果到 Excel” 的完整脚本:
import pymysql
from pymysql import Error
import pandas as pd
from datetime import datetime
class EmployeeSalaryManager:
"""员工薪资管理类(MySQL操作+Excel交互)"""
def __init__(self, db_config, excel_file="员工数据.xlsx"):
"""
初始化
Args:
db_config (dict): MySQL配置(host, user, password, db, port)
excel_file (str): Excel文件路径
"""
self.db_config = db_config
self.excel_file = excel_file
self.conn = self._create_mysql_connection()
self._create_employee_table() # 初始化表
def _create_mysql_connection(self):
"""创建MySQL连接"""
try:
conn = pymysql.connect(
host=self.db_config["host"],
user=self.db_config["user"],
password=self.db_config["password"],
database=self.db_config["db"],
port=self.db_config["port"],
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor
)
return conn
except Error as e:
print(f"MySQL连接失败:{e}")
return None
def _create_employee_table(self):
"""创建员工表"""
if not self.conn:
return
create_sql = '''
CREATE TABLE IF NOT EXISTS employees (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
department VARCHAR(50) NOT NULL,
hire_date DATE NOT NULL,
salary DECIMAL(10,2) NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
'''
try:
with self.conn.cursor() as cursor:
cursor.execute(create_sql)
self.conn.commit()
except Error as e:
self.conn.rollback()
print(f"创建表失败:{e}")
def import_from_excel(self):
"""从Excel读取员工数据并批量插入MySQL"""
if not self.conn:
print("未建立MySQL连接,无法导入")
return
try:
# 1. 读取Excel(假设Excel表头为:姓名、部门、入职日期、薪资)
df = pd.read_excel(self.excel_file, sheet_name="员工表")
print(f"从Excel读取到 {len(df)} 条员工数据")
# 2. 数据预处理(确保日期格式正确,薪资为数值)
df["入职日期"] = pd.to_datetime(df["入职日期"]).dt.strftime("%Y-%m-%d") # 转换为MySQL日期格式
df["薪资"] = df["薪资"].astype(float)
# 3. 批量插入MySQL
insert_sql = "INSERT INTO employees (name, department, hire_date, salary) VALUES (%s, %s, %s, %s);"
params = [
(row["姓名"], row["部门"], row["入职日期"], row["薪资"])
for _, row in df.iterrows()
]
with self.conn.cursor() as cursor:
cursor.executemany(insert_sql, params)
self.conn.commit()
print(f"成功导入 {cursor.rowcount} 条数据到MySQL")
except Exception as e:
if self.conn:
self.conn.rollback()
print(f"从Excel导入失败:{e}")
def salary_statistics(self, export_excel="薪资统计结果.xlsx"):
"""统计各部门薪资情况(平均薪资、最高/最低薪资、员工数量),并导出到Excel"""
if not self.conn:
print("未建立MySQL连接,无法统计")
return
try:
with self.conn.cursor() as cursor:
# 统计SQL:按部门分组,计算薪资指标
stat_sql = '''
SELECT
department AS 部门,
COUNT(*) AS 员工数量,
AVG(salary) AS 平均薪资,
MAX(salary) AS 最高薪资,
MIN(salary) AS 最低薪资
FROM employees
GROUP BY department
ORDER BY 平均薪资 DESC;
'''
cursor.execute(stat_sql)
stat_result = cursor.fetchall()
# 转换为DataFrame(便于导出Excel)
df_stat = pd.DataFrame(stat_result)
# 格式化薪资(保留2位小数)
df_stat["平均薪资"] = df_stat["平均薪资"].round(2)
df_stat["最高薪资"] = df_stat["最高薪资"].round(2)
df_stat["最低薪资"] = df_stat["最低薪资"].round(2)
# 打印统计结果
print("\n=== 各部门薪资统计结果 ===")
print(df_stat.to_string(index=False))
# 导出到Excel
df_stat.to_excel(export_excel, index=False, sheet_name="薪资统计")
print(f"\n统计结果已导出到 {export_excel}")
return df_stat
except Error as e:
print(f"薪资统计失败:{e}")
return None
def close(self):
"""关闭连接"""
if self.conn:
try:
self.conn.close()
print("MySQL连接已关闭")
except Error as e:
print(f"关闭连接失败:{e}")
# ------------------- 使用示例 -------------------
if __name__ == "__main__":
# MySQL配置(替换为你的实际配置)
mysql_config = {
"host": "localhost",
"user": "python_user",
"password": "123456",
"db": "script_db",
"port": 3306
}
# 初始化薪资管理器
salary_manager = EmployeeSalaryManager(
db_config=mysql_config,
excel_file="员工数据.xlsx" # 提前准备好此Excel文件
)
# 1. 从Excel导入数据到MySQL
print("=== 从Excel导入员工数据 ===")
salary_manager.import_from_excel()
# 2. 统计薪资并导出结果
print("\n=== 各部门薪资统计 ===")
salary_manager.salary_statistics(export_excel="2025年薪资统计.xlsx")
# 3. 关闭连接
salary_manager.close()
4.3.3.4 常见问题与解决
- “Access denied for user” 错误:
原因:用户名 / 密码错误,或无访问目标数据库的权限。
解决:① 核对create_mysql_connection的user和password参数;② 登录 MySQL 执行GRANT语句授权(参考环境准备步骤)。
- 中文乱码:
原因:数据库 / 表的编码不支持中文,或连接时未指定 charset。
解决:① 建立连接时添加charset="utf8mb4";② 创建数据库时指定DEFAULT CHARSET utf8mb4(参考环境准备步骤)。
- 批量插入效率低:
原因:未使用executemany,或 MySQL 的autocommit开启(默认关闭)。
解决:① 用cursor.executemany()批量执行;② 确保事务批量提交(避免每条插入都 commit)。
- 日期格式错误:
原因:Python 传递的日期格式与 MySQL 不匹配(MySQL 要求YYYY-MM-DD)。
解决:用datetime.strftime("%Y-%m-%d")或pandas转换日期格式后再插入。
4.3.4 第三方库 pymongo:操作 MongoDB 数据库
MongoDB 是主流的非关系型数据库(文档型),无固定 schema,适合存储非结构化 / 半结构化数据(如脚本爬取的网页内容、用户行为日志、JSON 格式数据)。Python 通过pymongo库操作 MongoDB,支持灵活的文档存储与查询。
4.3.4.1 环境准备
1. 安装 MongoDB 服务
- Windows/MacOS:下载 MongoDB Community Server(Download MongoDB Community Server | MongoDB),按向导安装。
- Linux(Ubuntu):执行命令sudo apt update && sudo apt install mongodb-org,安装后启动服务sudo systemctl start mongod。
2. 安装 pymongo 库
用pip安装:
pip install pymongo
3. 准备 MongoDB 环境
- 启动 MongoDB 后,默认端口为 27017,无需密码(本地开发);
- 若需远程访问或密码认证,需修改 MongoDB 配置文件(如/etc/mongod.conf),开启认证并创建用户。
4.3.4.2 核心概念
MongoDB 的核心概念与关系型数据库不同,需先理解对应关系:
关系型数据库 | MongoDB | 说明 |
数据库(DB) | 数据库(DB) | 存储数据的容器,一个 MongoDB 可包含多个数据库 |
表(Table) | 集合(Collection) | 存储文档的集合,类似表,但无固定结构 |
行(Row) | 文档(Document) | 一条数据,格式为 JSON(MongoDB 中为 BSON) |
列(Column) | 字段(Field) | 文档中的键值对,字段可动态添加 |
4.3.4.3 核心操作:从连接到 CRUD
步骤 1:建立 MongoDB 连接
通过pymongo.MongoClient()创建连接,获取数据库和集合(若不存在,操作时会自动创建):
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, PyMongoError
from datetime import datetime
import json
def create_mongo_connection(host="localhost", port=27017, db_name="script_db"):
"""
创建MongoDB连接,获取数据库对象
Args:
host (str): MongoDB服务器地址
port (int): MongoDB端口(默认27017)
db_name (str): 数据库名
Returns:
pymongo.database.Database: 数据库对象,失败则返回None
"""
try:
# 创建客户端连接
client = MongoClient(host, port, serverSelectionTimeoutMS=5000) # 5秒超时
# 验证连接(避免假连接)
client.admin.command('ping')
# 获取数据库(不存在则自动创建)
db = client[db_name]
print("MongoDB连接成功")
return db
except ConnectionFailure as e:
print(f"MongoDB连接失败(服务器不可达):{e}")
return None
except PyMongoError as e:
print(f"MongoDB操作错误:{e}")
return None
# 调用函数创建连接,获取数据库对象
db = create_mongo_connection(db_name="script_log_db")
步骤 2:获取集合(Collection)
集合类似关系型数据库的表,通过数据库对象获取(不存在则自动创建):
# 获取“脚本运行日志”集合(类似表)
log_collection = db["script_run_logs"]
# 获取“API请求日志”集合
api_collection = db["api_request_logs"]
print(f"当前数据库的所有集合:{db.list_collection_names()}") # 查看已存在的集合
步骤 3:核心操作(CRUD)
MongoDB 的 CRUD 操作通过集合对象实现,文档格式为 Python 字典(自动转换为 BSON)。
3.1 插入文档(Create)
支持单文档插入(insert_one())和多文档插入(insert_many()):
def insert_script_log(log_collection, script_name, status, log_content="", extra_info=None):
"""
插入一条脚本运行日志到MongoDB
Args:
log_collection: 集合对象(script_run_logs)
script_name (str): 脚本名
status (str): 运行状态(success/failed)
log_content (str): 日志内容
extra_info (dict): 额外信息(如运行时间、错误码,可选)
Returns:
str: 插入文档的ID(ObjectId),失败则返回None
"""
try:
# 构造文档(JSON格式,字段可动态添加)
log_document = {
"script_name": script_name,
"status": status,
"log_content": log_content,
"run_time": datetime.now(), # MongoDB自动存储为datetime类型
"created_at": datetime.now()
}
# 添加额外信息(若有)
if extra_info and isinstance(extra_info, dict):
log_document.update(extra_info)
# 插入单文档
result = log_collection.insert_one(log_document)
print(f"脚本日志插入成功,文档ID:{result.inserted_id}")
return str(result.inserted_id) # ObjectId转换为字符串,便于后续使用
except PyMongoError as e:
print(f"插入脚本日志失败:{e}")
return None
def insert_api_logs(api_collection, api_logs):
"""
批量插入API请求日志到MongoDB
Args:
api_collection: 集合对象(api_request_logs)
api_logs (list): API日志列表(每个元素为字典)
Returns:
list: 插入文档的ID列表,失败则返回空列表
"""
if not api_logs or not isinstance(api_logs, list):
print("API日志数据无效,无法插入")
return []
try:
# 为每条日志添加创建时间
for log in api_logs:
log["created_at"] = datetime.now()
# 批量插入多文档
result = api_collection.insert_many(api_logs)
inserted_ids = [str(_id) for _id in result.inserted_ids]
print(f"批量插入API日志成功,共 {len(inserted_ids)} 条,IDs:{inserted_ids}")
return inserted_ids
except PyMongoError as e:
print(f"批量插入API日志失败:{e}")
return []
# ------------------- 插入示例 -------------------
if log_collection and api_collection:
# 1. 插入一条脚本运行日志(含额外信息)
insert_script_log(
log_collection,
script_name="excel_updater.py",
status="success",
log_content="更新Excel L列完成,13个unitid均成功",
extra_info={"execution_time": 12.5, "affected_rows": 13} # 额外添加“执行时间”和“影响行数”
)
# 2. 批量插入API请求日志
api_log_data = [
{
"api_url": "http://111.41.51.214:8088/sqjz/private/sqjz/sxzhcx/jzrylist",
"method": "POST",
"status_code": 200,
"response_time": 0.8,
"unitid": "00020001",
"params": {"jjlx": "02"}
},
{
"api_url": "http://111.41.51.214:8088/sqjz/private/sqjz/sxzhcx/jzrylist",
"method": "POST",
"status_code": 200,
"response_time": 0.6,
"unitid": "00020002",
"params": {"jjlx": "02"}
},
{
"api_url": "http://111.41.51.214:8088/sqjz/private/sqjz/sxzhcx/jzrylist",
"method": "POST",
"status_code": 500,
"response_time": 1.2,
"unitid": "00020003",
"params": {"jjlx": "02"},
"error": "Server internal error"
}
]
insert_api_logs(api_collection, api_log_data)
3.2 查询文档(Read)
通过find()(查询多条)、find_one()(查询一条)实现,支持条件筛选、排序、分页、字段投影(只返回指定字段):
def query_script_logs(log_collection, status=None, min_exec_time=None, page=1, page_size=10):
"""
查询脚本运行日志
Args:
log_collection: 集合对象
status (str): 筛选状态(success/failed,可选)
min_exec_time (float): 最小执行时间(可选)
page (int): 页码
page_size (int): 每页条数
Returns:
dict: 查询结果(total:总条数,data:当前页数据)
"""
try:
# 1. 构建查询条件(字典格式,支持多条件组合)
query_filter = {}
if status:
query_filter["status"] = status # 精确匹配状态
if min_exec_time is not None:
# 大于等于:$gte(MongoDB查询操作符,类似SQL的>=)
query_filter["execution_time"] = {"$gte": min_exec_time}
# 2. 计算分页参数(跳过前 (page-1)*page_size 条)
skip = (page - 1) * page_size
# 3. 查询总条数
total = log_collection.count_documents(query_filter)
# 4. 查询当前页数据(按运行时间倒序,只返回指定字段)
# 字段投影:1表示返回,0表示不返回(_id默认返回,需显式设为0)
projection = {
"_id": 0, # 不返回_id
"script_name": 1,
"status": 1,
"run_time": 1,
"execution_time": 1,
"log_content": 1
}
# 执行查询:find(条件, 投影) -> 排序(-1表示倒序) -> 分页(skip+limit)
cursor = log_collection.find(query_filter, projection) \
.sort("run_time", -1) \
.skip(skip) \
.limit(page_size)
# 转换为列表(cursor是迭代器,需遍历获取数据)
data = list(cursor)
# 格式化时间(MongoDB的datetime转换为字符串,便于查看)
for item in data:
if "run_time" in item:
item["run_time"] = item["run_time"].strftime("%Y-%m-%d %H:%M:%S")
# 打印结果
print(f"\n=== 脚本日志查询结果(第 {page} 页,共 {total} 条) ===")
for idx, log in enumerate(data, 1):
print(f"\n{idx}. 脚本名:{log['script_name']}")
print(f" 状态:{log['status']}")
print(f" 运行时间:{log['run_time']}")
print(f" 执行时间:{log.get('execution_time', '未知')}秒")
print(f" 日志内容:{log['log_content']}")
return {"total": total, "data": data}
except PyMongoError as e:
print(f"查询脚本日志失败:{e}")
return {"total": 0, "data": []}
def query_failed_api_logs(api_collection, unitid=None):
"""
查询失败的API请求日志(status_code != 200)
Args:
api_collection: 集合对象
unitid (str): 筛选指定unitid(可选)
Returns:
list: 失败日志列表
"""
try:
# 查询条件:status_code != 200($ne:不等于),可选筛选unitid
query_filter = {"status_code": {"$ne": 200}}
if unitid:
query_filter["unitid"] = unitid
# 按响应时间倒序,返回所有字段
cursor = api_collection.find(query_filter).sort("response_time", -1)
failed_logs = list(cursor)
# 格式化输出
print(f"\n=== 失败的API请求日志(共 {len(failed_logs)} 条) ===")
for log in failed_logs:
print(f"\nAPI URL:{log['api_url']}")
print(f"Method:{log['method']}")
print(f"UnitID:{log['unitid']}")
print(f"状态码:{log['status_code']}")
print(f"响应时间:{log['response_time']}秒")
print(f"错误信息:{log.get('error', '无')}")
print(f"请求参数:{json.dumps(log['params'], ensure_ascii=False)}")
return failed_logs
except PyMongoError as e:
print(f"查询失败API日志失败:{e}")
return []
# ------------------- 查询示例 -------------------
if log_collection and api_collection:
# 1. 查询执行时间≥10秒的成功日志(第1页,每页5条)
query_script_logs(
log_collection,
status="success",
min_exec_time=10,
page=1,
page_size=5
)
# 2. 查询所有失败的API请求日志
query_failed_api_logs(api_collection)
3.3 更新文档(Update)
通过update_one()(更新一条)、update_many()(更新多条)实现,支持字段修改、数值增减、数组操作等:
def update_api_log_status(api_collection, unitid, new_status_code, new_error=None):
"""
更新指定unitid的API日志状态码和错误信息
Args:
api_collection: 集合对象
unitid (str): 目标unitid
new_status_code (int): 新状态码
new_error (str): 新错误信息(可选)
Returns:
int: 受影响的文档数,失败则返回0
"""
try:
# 查询条件:匹配unitid和状态码非200(只更新失败的日志)
query_filter = {"unitid": unitid, "status_code": {"$ne": 200}}
# 更新操作:$set表示设置字段值
update_operation = {"$set": {"status_code": new_status_code, "updated_at": datetime.now()}}
if new_error:
update_operation["$set"]["error"] = new_error
# 更新一条匹配的文档(若需更新所有匹配文档,用update_many())
result = api_collection.update_one(query_filter, update_operation)
if result.matched_count > 0:
print(f"成功更新 {result.modified_count} 条API日志(unitid:{unitid})")
return result.modified_count
else:
print(f"未找到unitid为 {unitid} 的失败API日志,无需更新")
return 0
except PyMongoError as e:
print(f"更新API日志失败:{e}")
return 0
def increase_script_exec_time(log_collection, script_name, increase_seconds):
"""
增加指定脚本的执行时间(如修正统计误差)
Args:
log_collection: 集合对象
script_name (str): 脚本名
increase_seconds (float): 增加的秒数
"""
try:
query_filter = {"script_name": script_name, "status": "success"}
# $inc:数值增减(+为增,-为减)
update_operation = {"$inc": {"execution_time": increase_seconds}, "$set": {"updated_at": datetime.now()}}
result = log_collection.update_many(query_filter, update_operation)
print(f"成功为 {result.modified_count} 条 {script_name} 的日志增加 {increase_seconds} 秒执行时间")
except PyMongoError as e:
print(f"增加执行时间失败:{e}")
# ------------------- 更新示例 -------------------
if log_collection and api_collection:
# 1. 更新unitid为00020003的API日志(状态码从500改为503,错误信息更新)
update_api_log_status(
api_collection,
unitid="00020003",
new_status_code=503,
new_error="Service unavailable (server maintenance)"
)
# 2. 为excel_updater.py的所有成功日志增加1.2秒执行时间(修正统计误差)
increase_script_exec_time(log_collection, "excel_updater.py", 1.2)
3.4 删除文档(Delete)
通过delete_one()(删除一条)、delete_many()(删除多条)实现,需谨慎操作(建议先查询确认):
def delete_old_logs(collection, days=30, log_type="script"):
"""
删除指定天数前的旧日志(支持脚本日志和API日志)
Args:
collection: 集合对象
days (int): 保留天数(删除days天前的日志)
log_type (str): 日志类型(script/api,用于确定时间字段)
Returns:
int: 删除的文档数,失败则返回0
"""
try:
# 计算days天前的时间(MongoDB的datetime类型)
cutoff_time = datetime.now() - pd.Timedelta(days=days) # 用pandas计算时间差(也可手动计算)
# 确定时间字段(脚本日志用run_time,API日志用created_at)
time_field = "run_time" if log_type == "script" else "created_at"
# 查询条件:时间字段 < cutoff_time($lt:小于)
query_filter = {time_field: {"$lt": cutoff_time}}
# 删除所有匹配的文档
result = collection.delete_many(query_filter)
print(f"成功删除 {result.deleted_count} 条 {days} 天前的{log_type}日志")
return result.deleted_count
except Exception as e:
print(f"删除旧日志失败:{e}")
return 0
# ------------------- 删除示例 -------------------
if log_collection and api_collection:
# 1. 删除90天前的脚本日志
delete_old_logs(log_collection, days=90, log_type="script")
# 2. 删除60天前的API日志
delete_old_logs(api_collection, days=60, log_type="api")
步骤 4:索引优化(提升查询效率)
MongoDB 默认无索引(除_id外),若脚本频繁按某字段查询(如unitid、script_name),需创建索引提升效率:
def create_indexes(collection, index_fields):
"""
为集合创建索引
Args:
collection: 集合对象
index_fields (list): 索引字段列表(如[("unitid", 1), ("status_code", -1)])
1表示升序,-1表示降序
"""
try:
# 创建复合索引(若需单字段索引,传入单元素列表即可)
index_name = collection.create_index(index_fields)
print(f"成功为集合 {collection.name} 创建索引:{index_name}(字段:{index_fields})")
except PyMongoError as e:
print(f"创建索引失败:{e}")
# ------------------- 索引示例 -------------------
if api_collection:
# 为API日志集合创建复合索引(unitid升序 + status_code降序)
create_indexes(api_collection, [("unitid", 1), ("status_code", -1)])
if log_collection:
# 为脚本日志集合创建单字段索引(script_name升序)
create_indexes(log_collection, [("script_name", 1)])
4.3.4.4 实战示例:API 请求日志分析脚本
编写一个 “记录 API 请求日志→存储到 MongoDB→分析请求成功率→生成可视化报告” 的脚本,结合matplotlib实现数据可视化:
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd
import json
# 设置中文字体(避免matplotlib中文乱码)
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei']
plt.rcParams['axes.unicode_minus'] = False
class APILogAnalyzer:
"""API请求日志分析器(MongoDB存储+可视化分析)"""
def __init__(self, host="localhost", port=27017, db_name="api_analysis_db"):
self.db = self._create_mongo_connection(host, port, db_name)
self.api_collection = self.db["</doubaocanvas>