DeepSeek辅助编写的测试xlsx文件写入性能的程序
上文在测试tpcds sf=0.1 数据量时,写入较慢,让DeepSeek辅助编写了常见写入库性能比较测试程序,
import duckdb
import pandas as pd
import time
import os
from openpyxl import Workbook
import xlsxwriter
from pyxlsbwriter import XlsbWriter,XlsxWriterdef generate_tpch_data():"""生成TPCH测试数据"""conn = duckdb.connect(database=':memory:')try:# 安装并加载TPCH扩展conn.execute("INSTALL tpch")conn.execute("LOAD tpch")# 生成TPCH测试数据conn.execute("CALL dbgen(sf=0.003)")# 获取所有表名tables_result = conn.execute("SHOW TABLES").fetchall()table_names = [table[0] for table in tables_result if not table[0].startswith('tpch_') and table[0] != 'queries']# 获取表数据table_data = {}for table_name in table_names:df = conn.execute(f"SELECT * FROM {table_name}").fetchdf()table_data[table_name] = dfreturn table_datafinally:conn.close()def test_openpyxl_write(table_data, output_file):"""使用openpyxl写入测试"""start_time = time.time()with pd.ExcelWriter(output_file, engine='openpyxl') as writer:for sheet_name, df in table_data.items():df.to_excel(writer, sheet_name=sheet_name, index=False)end_time = time.time()file_size = os.path.getsize(output_file)return end_time - start_time, file_sizedef test_xlsxwriter_write(table_data, output_file):"""使用xlsxwriter写入测试"""start_time = time.time()with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:for sheet_name, df in table_data.items():df.to_excel(writer, sheet_name=sheet_name, index=False)end_time = time.time()file_size = os.path.getsize(output_file)return end_time - start_time, file_sizedef df_to_xlsb(df, output_file, sheet_name, hidden=False, compression_level=6):"""将DataFrame写入XLSB文件"""# 将DataFrame转换为二维列表格式data = [df.columns.tolist()] # 添加列名作为首行data.extend(df.values.tolist()) # 添加数据行with XlsbWriter(output_file, compressionLevel=compression_level) as writer:writer.add_sheet(sheet_name, hidden=hidden)writer.write_sheet(data)def test_pyxlsb_write_single_sheet(table_data, output_file):"""使用pyxlsbwriter写入单个sheet测试"""start_time = time.time()# 只测试第一个sheetif table_data:first_sheet_name = list(table_data.keys())[0]first_df = table_data[first_sheet_name]df_to_xlsb(first_df, output_file, first_sheet_name, compression_level=6)end_time = time.time()file_size = os.path.getsize(output_file) if os.path.exists(output_file) else 0return end_time - start_time, file_sizedef test_pyxlsb_write_multiple_sheets(table_data, output_file):"""使用pyxlsbwriter写入多个sheets测试"""start_time = time.time()# 由于pyxlsbwriter需要手动处理多sheet,我们创建一个新的writer实例with XlsbWriter(output_file, compressionLevel=6) as writer:for sheet_name, df in table_data.items():# 将DataFrame转换为二维列表格式data = [df.columns.tolist()] # 添加列名作为首行data.extend(df.values.tolist()) # 添加数据行writer.add_sheet(sheet_name, hidden=False)writer.write_sheet(data)end_time = time.time()file_size = os.path.getsize(output_file) if os.path.exists(output_file) else 0return end_time - start_time, file_sizedef test_pyxlsx_write_multiple_sheets(table_data, output_file):"""使用pyxlsbwriter写入xlsx多个sheets测试"""start_time = time.time()# 由于pyxlsbwriter需要手动处理多sheet,我们创建一个新的writer实例with XlsxWriter(output_file, compressionLevel=6) as writer:for sheet_name, df in table_data.items():# 将DataFrame转换为二维列表格式data = [df.columns.tolist()] # 添加列名作为首行data.extend(df.values.tolist()) # 添加数据行writer.add_sheet(sheet_name, hidden=False)writer.write_sheet(data)end_time = time.time()file_size = os.path.getsize(output_file) if os.path.exists(output_file) else 0return end_time - start_time, file_sizedef test_pandas_xlsxwriter(table_data, output_file):"""使用pandas内置的xlsxwriter引擎"""start_time = time.time()with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:for sheet_name, df in table_data.items():df.to_excel(writer, sheet_name=sheet_name, index=False)end_time = time.time()file_size = os.path.getsize(output_file)return end_time - start_time, file_sizedef run_comparison_test():"""运行比较测试"""print("=" * 60)print("Excel写入性能比较测试")print("=" * 60)# 生成TPCH数据print("正在生成TPCH测试数据...")table_data = generate_tpch_data()print(f"生成 {len(table_data)} 张表的数据")total_rows = sum(len(df) for df in table_data.values())total_columns = sum(len(df.columns) for df in table_data.values())print(f"总行数: {total_rows}, 总列数: {total_columns}")# 显示各表信息print("\n各表信息:")for sheet_name, df in table_data.items():print(f" {sheet_name}: {len(df)} 行, {len(df.columns)} 列")# 测试结果存储results = []# 测试1: openpyxlprint("\n1. 测试 openpyxl...")openpyxl_file = "test_openpyxl.xlsx"if os.path.exists(openpyxl_file):os.remove(openpyxl_file)time_taken, file_size = test_openpyxl_write(table_data, openpyxl_file)results.append(("openpyxl", time_taken, file_size))print(f" 时间: {time_taken:.2f}秒, 文件大小: {file_size/1024:.1f}KB")# 测试2: xlsxwriterprint("2. 测试 xlsxwriter...")xlsxwriter_file = "test_xlsxwriter.xlsx"if os.path.exists(xlsxwriter_file):os.remove(xlsxwriter_file)time_taken, file_size = test_xlsxwriter_write(table_data, xlsxwriter_file)results.append(("xlsxwriter", time_taken, file_size))print(f" 时间: {time_taken:.2f}秒, 文件大小: {file_size/1024:.1f}KB")# 测试3: pandas with xlsxwriterprint("3. 测试 pandas + xlsxwriter...")pandas_xlsxwriter_file = "test_pandas_xlsxwriter.xlsx"if os.path.exists(pandas_xlsxwriter_file):os.remove(pandas_xlsxwriter_file)time_taken, file_size = test_pandas_xlsxwriter(table_data, pandas_xlsxwriter_file)results.append(("pandas+xlsxwriter", time_taken, file_size))print(f" 时间: {time_taken:.2f}秒, 文件大小: {file_size/1024:.1f}KB")# 测试4: pyxlsbwriter 单sheetprint("4. 测试 pyxlsbwriter (单sheet)...")pyxlsb_single_file = "test_pyxlsb_single.xlsb"if os.path.exists(pyxlsb_single_file):os.remove(pyxlsb_single_file)time_taken, file_size = test_pyxlsb_write_single_sheet(table_data, pyxlsb_single_file)results.append(("pyxlsbwriter (单sheet)", time_taken, file_size))print(f" 时间: {time_taken:.2f}秒, 文件大小: {file_size/1024:.1f}KB")# 测试5: pyxlsbwriter 多sheetprint("5. 测试 pyxlsbwriter (多sheet)...")pyxlsb_multi_file = "test_pyxlsb_multi.xlsb"if os.path.exists(pyxlsb_multi_file):os.remove(pyxlsb_multi_file)time_taken, file_size = test_pyxlsb_write_multiple_sheets(table_data, pyxlsb_multi_file)results.append(("pyxlsbwriter (多sheet)", time_taken, file_size))print(f" 时间: {time_taken:.2f}秒, 文件大小: {file_size/1024:.1f}KB")# 测试65: pyxlsbwriter xlsx多sheetprint("5. 测试 pyxlsbwriter xlsx(多sheet)...")pyxlsb_multi_file = "test_pyxlsb_multi.xlsx"if os.path.exists(pyxlsb_multi_file):os.remove(pyxlsb_multi_file)time_taken, file_size = test_pyxlsx_write_multiple_sheets(table_data, pyxlsb_multi_file)results.append(("pyxlsbwriter (多sheet)", time_taken, file_size))print(f" 时间: {time_taken:.2f}秒, 文件大小: {file_size/1024:.1f}KB")# 显示比较结果print("\n" + "=" * 60)print("性能比较结果")print("=" * 60)# 按时间排序results.sort(key=lambda x: x[1])fastest = results[0]print(f"🏆 最快: {fastest[0]} - {fastest[1]:.2f}秒")print("\n详细排名:")for i, (name, time_taken, file_size) in enumerate(results, 1):print(f"{i}. {name}: {time_taken:.2f}秒, {file_size/1024:.1f}KB")# 计算相对性能if len(results) > 1:base_time = results[0][1]print(f"\n相对性能 (以{results[0][0]}为基准):")for name, time_taken, file_size in results:ratio = time_taken / base_timeprint(f" {name}: {ratio:.1f}x")return resultsdef quick_generate_with_xlsxwriter():"""使用xlsxwriter快速生成TPCH数据"""print("使用xlsxwriter快速生成TPCH数据...")# 生成TPCH数据table_data = generate_tpch_data()# 使用xlsxwriter写入output_file = "source.xlsx"start_time = time.time()with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:for sheet_name, df in table_data.items():df.to_excel(writer, sheet_name=sheet_name, index=False)end_time = time.time()print(f"✅ 数据生成完成: {output_file}")print(f"⏱️ 生成时间: {end_time - start_time:.2f}秒")print(f"📊 包含 {len(table_data)} 个sheet")# 生成menu.xlsxconn = duckdb.connect(database=':memory:')try:conn.execute("INSTALL tpch")conn.execute("LOAD tpch")queries_df = conn.execute("SELECT query_nr, query FROM tpch_queries()").fetchdf()with pd.ExcelWriter('menu.xlsx', engine='xlsxwriter') as writer:queries_df.to_excel(writer, sheet_name='queries', index=False)print(f"✅ 查询语句生成完成: menu.xlsx")print(f"📋 包含 {len(queries_df)} 个查询语句")finally:conn.close()def generate_with_pyxlsbwriter():"""使用pyxlsbwriter生成TPCH数据(XLSB格式)"""print("使用pyxlsbwriter生成TPCH数据 (XLSB格式)...")# 生成TPCH数据table_data = generate_tpch_data()# 使用pyxlsbwriter写入output_file = "source.xlsb"start_time = time.time()with XlsbWriter(output_file, compressionLevel=6) as writer:for sheet_name, df in table_data.items():print(f"正在写入: {sheet_name}")writer.add_sheet(sheet_name, hidden=False)# 写入列名writer.write_sheet([df.columns.tolist()])# 写入数据行for _, row in df.iterrows():writer.write_sheet([row.tolist()])end_time = time.time()print(f"✅ 数据生成完成: {output_file}")print(f"⏱️ 生成时间: {end_time - start_time:.2f}秒")print(f"📊 包含 {len(table_data)} 个sheet")print(f"💾 文件大小: {os.path.getsize(output_file)/1024:.1f}KB")if __name__ == "__main__":# 运行性能比较测试results = run_comparison_test()# 使用xlsxwriter快速生成数据#quick_generate_with_xlsxwriter()# 或者使用pyxlsbwriter生成XLSB格式数据# generate_with_pyxlsbwriter()
测试结果显示,pyxlsbwriter生成xlsx数据速度最快,用如下语句来代替上文的测试数据生成程序中的写入xlsx部分,执行时间从418秒下降到41秒,速度提高了10倍。文件大小也从100MB减少到43MB。
with XlsxWriter('source.xlsx', compressionLevel=6) as writer:for i, table_name in enumerate(table_names, 1): print(f" [{i}/{len(table_names)}] 导出表: {table_name}") # 读取表数据df = conn.execute(f"SELECT * FROM {table_name}").fetchdf() # 将DataFrame转换为二维列表格式data = [df.columns.tolist()] # 添加列名作为首行data.extend(df.values.tolist()) # 添加数据行writer.add_sheet(table_name, hidden=False)writer.write_sheet(data)
查询的性能不变,但是查询出错的语句多了一个Q49,旧的xlsx文件只有Q72报错。需要进一步研究。