利用DeepSeek编写从xlsx数据源调用duckdb执行已保存的查询SQL语句,并把查询结果保存到xlsx文件的程序
1.生成测试数据,用tpch插件的dbgen函数生成
需要事先安装python duckdb、pandas、openpyxl模块
pip install duckdb pandas openpyxl
如果本地未安装duckdb并安装过tpch插件,需要将http://extensions.duckdb.org/v1.3.2/linux_arm64/tpch.duckdb_extension.gz下载并解压到程序脚本所在目录,并执行注释掉的那行。
import duckdb
import pandas as pd
from openpyxl import Workbook
import timedef generate_test_data_enhanced():"""增强版的测试数据生成函数"""print("=" * 50)print("TPCH测试数据生成程序")print("=" * 50)start_time = time.time()try:# 连接到内存数据库conn = duckdb.connect(database=':memory:')conn.execute("load tpch")#conn.execute("load 'tpch.duckdb_extension'")# 生成TPCH测试数据print("🚀 正在生成TPCH测试数据 (sf=0.001)...")conn.execute("CALL dbgen(sf=0.001)")# 获取所有表名tables_result = conn.execute("SHOW TABLES").fetchall()table_names = [table[0] for table in tables_result if not table[0].startswith('tpch_') and table[0] != 'queries']print(f"📊 找到 {len(table_names)} 张表: {', '.join(table_names)}")# 创建source.xlsx文件print("💾 正在导出数据到 source.xlsx...")with pd.ExcelWriter('source.xlsx', engine='openpyxl') as writer:for i, table_name in enumerate(table_names, 1):print(f" [{i}/{len(table_names)}] 导出表: {table_name}")# 读取表数据df = conn.execute(f"SELECT * FROM {table_name}").fetchdf()# 保存到Excel的sheet中df.to_excel(writer, sheet_name=table_name, index=False)# 获取查询语句print("🔍 正在获取查询语句...")queries_df = conn.execute("SELECT query_nr, query FROM tpch_queries()").fetchdf()# 创建menu.xlsx文件print("💾 正在保存查询语句到 menu.xlsx...")with pd.ExcelWriter('menu.xlsx', engine='openpyxl') as writer:queries_df.to_excel(writer, sheet_name='queries', index=False)# 显示统计信息end_time = time.time()execution_time = end_time - start_timeprint("\n" + "=" * 50)print("✅ 数据生成完成!")print("=" * 50)print(f"📁 生成的文件:")print(f" - source.xlsx: 包含 {len(table_names)} 张表")print(f" - menu.xlsx: 包含 {len(queries_df)} 个查询语句")print(f"⏱️ 执行时间: {execution_time:.2f} 秒")# 显示各表的数据量print("\n📈 各表数据量统计:")for table_name in table_names:count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]print(f" - {table_name}: {count} 行")# 显示查询语句示例print(f"\n📝 查询语句示例 (前3个):")for i, (query_nr, query) in enumerate(queries_df.head(3).itertuples(index=False)):print(f" {query_nr}: {query[:50]}..." if len(query) > 50 else f" {query_nr}: {query}")except Exception as e:print(f"❌ 发生错误: {e}")raisefinally:if 'conn' in locals():conn.close()if __name__ == "__main__":generate_test_data_enhanced()
2.执行测试(你可以修改source和menu的内容,执行自己的查询,最简单的方法是把tpch改成tpcds测试)
import duckdb
import pandas as pd
from openpyxl import Workbook
import time
import osdef execute_queries_on_excel():"""执行menu.xlsx中的查询语句,在source.xlsx数据上执行,结果保存到target.xlsx"""print("=" * 60)print("第二步:执行查询语句并保存结果")print("=" * 60)start_time = time.time()# 检查必要文件是否存在if not os.path.exists('source.xlsx'):print("❌ source.xlsx 文件不存在,请先运行第一步生成测试数据")returnif not os.path.exists('menu.xlsx'):print("❌ menu.xlsx 文件不存在,请先运行第一步生成查询语句")returnconn = Nonetry:# 连接到内存数据库conn = duckdb.connect(database=':memory:')# 1. 读取source.xlsx的所有sheet并创建视图print("📖 正在读取source.xlsx并创建视图...")# 使用pd.ExcelFile获取所有sheet名excel_file = pd.ExcelFile('source.xlsx')sheet_names = excel_file.sheet_namesfor sheet_name in sheet_names:print(f" 🗂️ 创建视图: {sheet_name}")# 使用read_xlsx函数创建视图conn.execute(f"""CREATE OR REPLACE VIEW {sheet_name} AS SELECT * FROM read_xlsx('source.xlsx', sheet='{sheet_name}')""")print(f"✅ 成功创建 {len(sheet_names)} 个视图")# 2. 读取menu.xlsx中的查询语句print("🔍 正在读取menu.xlsx中的查询语句...")queries_df = pd.read_excel('menu.xlsx', sheet_name='queries')# 确保列名正确if 'query_nr' not in queries_df.columns or 'query' not in queries_df.columns:print("❌ menu.xlsx格式不正确,需要包含query_nr和query列")returnprint(f"📋 找到 {len(queries_df)} 个查询语句")# 3. 创建target.xlsx文件,并执行每个查询print("🚀 正在执行查询并保存结果...")with pd.ExcelWriter('target.xlsx', engine='openpyxl') as writer:successful_queries = 0for index, row in queries_df.iterrows():query_nr = row['query_nr']query = row['query']try:print(f" [{index + 1}/{len(queries_df)}] 执行查询 Q{query_nr}...")# 执行查询result_df = conn.execute(query).fetchdf()# 保存结果到target.xlsx的q{query_nr} sheetsheet_name = f"q{query_nr}"result_df.to_excel(writer, sheet_name=sheet_name, index=False)print(f" ✅ Q{query_nr} 执行成功,结果行数: {len(result_df)}")successful_queries += 1except Exception as e:print(f" ❌ Q{query_nr} 执行失败: {e}")# 创建错误信息sheeterror_df = pd.DataFrame({'error_message': [f"查询执行失败: {str(e)}"],'query_number': [query_nr],'query': [query[:100] + '...' if len(query) > 100 else query]})error_sheet_name = f"q{query_nr}_error"error_df.to_excel(writer, sheet_name=error_sheet_name, index=False)# 4. 显示执行统计end_time = time.time()execution_time = end_time - start_timeprint("\n" + "=" * 60)print("✅ 查询执行完成!")print("=" * 60)print(f"📊 执行统计:")print(f" - 总查询数: {len(queries_df)}")print(f" - 成功执行: {successful_queries}")print(f" - 失败查询: {len(queries_df) - successful_queries}")print(f"⏱️ 执行时间: {execution_time:.2f} 秒")print(f"📁 结果文件: target.xlsx")# 显示前几个查询的示例结果if successful_queries > 0:print(f"\n📈 示例结果:")for i in range(min(3, successful_queries)):try:sample_query = queries_df.iloc[i]['query']sample_result = conn.execute(sample_query).fetchdf()print(f" Q{queries_df.iloc[i]['query_nr']}: {len(sample_result)} 行结果")except:passexcept Exception as e:print(f"❌ 程序执行失败: {e}")raisefinally:if conn:conn.close()def enhanced_execute_queries():"""增强版的查询执行,包含更多错误处理和详细信息"""print("=" * 60)print("增强版查询执行")print("=" * 60)# 检查文件是否存在required_files = ['source.xlsx', 'menu.xlsx']for file in required_files:if not os.path.exists(file):print(f"❌ 文件 {file} 不存在")returnconn = Nonetry:conn = duckdb.connect(database=':memory:')# 读取并显示source.xlsx的信息print("📊 source.xlsx 文件信息:")excel_file = pd.ExcelFile('source.xlsx')print(f" - 包含 {len(excel_file.sheet_names)} 个sheet")print(f" - Sheet名称: {', '.join(excel_file.sheet_names)}")# 创建视图print("\n🏗️ 创建数据库视图...")for sheet_name in excel_file.sheet_names:try:conn.execute(f"""CREATE OR REPLACE VIEW {sheet_name} AS SELECT * FROM read_xlsx('source.xlsx', sheet='{sheet_name}')""")# 显示表结构table_info = conn.execute(f"DESCRIBE {sheet_name}").fetchdf()print(f" ✅ {sheet_name}: {len(table_info)} 列")except Exception as e:print(f" ❌ 创建视图 {sheet_name} 失败: {e}")# 读取查询语句queries_df = pd.read_excel('menu.xlsx', sheet_name='queries')print(f"\n🔍 读取到 {len(queries_df)} 个查询语句")# 执行查询print("\n🚀 开始执行查询...")results = {}with pd.ExcelWriter('target.xlsx', engine='openpyxl') as writer:for index, row in queries_df.iterrows():query_nr = int(row['query_nr'])query = row['query']print(f" 🔄 执行 Q{query_nr}...")try:# 执行查询result_df = conn.execute(query).fetchdf()# 保存结果sheet_name = f"q{query_nr}"result_df.to_excel(writer, sheet_name=sheet_name, index=False)results[query_nr] = {'status': 'success','row_count': len(result_df),'columns': list(result_df.columns)}print(f" ✅ Q{query_nr} 成功: {len(result_df)} 行, {len(result_df.columns)} 列")except Exception as e:error_msg = str(e)results[query_nr] = {'status': 'error','error': error_msg}print(f" ❌ Q{query_nr} 失败: {error_msg}")# 保存错误信息error_df = pd.DataFrame({'query_number': [query_nr],'error_message': [error_msg],'query': [query[:200] + '...' if len(query) > 200 else query]})error_sheet_name = f"q{query_nr}_error"error_df.to_excel(writer, sheet_name=error_sheet_name, index=False)# 生成执行报告print("\n" + "=" * 60)print("📋 执行报告")print("=" * 60)success_count = sum(1 for r in results.values() if r['status'] == 'success')error_count = len(results) - success_countprint(f"✅ 成功: {success_count} 个查询")print(f"❌ 失败: {error_count} 个查询")if success_count > 0:print("\n📈 成功查询详情:")for q_nr, result in results.items():if result['status'] == 'success':print(f" Q{q_nr}: {result['row_count']} 行, 列: {', '.join(result['columns'][:3])}{'...' if len(result['columns']) > 3 else ''}")if error_count > 0:print("\n⚠️ 失败查询详情:")for q_nr, result in results.items():if result['status'] == 'error':print(f" Q{q_nr}: {result['error']}")print(f"\n💾 结果已保存到: target.xlsx")except Exception as e:print(f"❌ 程序执行失败: {e}")raisefinally:if conn:conn.close()# 主程序
if __name__ == "__main__":# 执行查询execute_queries_on_excel()# 或者使用增强版# enhanced_execute_queries()
如果数据量较大,视图改成内存表更高效即把create view改为create table。