当前位置: 首页 > news >正文

利用DeepSeek编写从xlsx数据源调用duckdb执行已保存的查询SQL语句,并把查询结果保存到xlsx文件的程序

1.生成测试数据,用tpch插件的dbgen函数生成
需要事先安装python duckdb、pandas、openpyxl模块

pip install duckdb pandas openpyxl

如果本地未安装duckdb并安装过tpch插件,需要将http://extensions.duckdb.org/v1.3.2/linux_arm64/tpch.duckdb_extension.gz下载并解压到程序脚本所在目录,并执行注释掉的那行。

import duckdb
import pandas as pd
from openpyxl import Workbook
import timedef generate_test_data_enhanced():"""增强版的测试数据生成函数"""print("=" * 50)print("TPCH测试数据生成程序")print("=" * 50)start_time = time.time()try:# 连接到内存数据库conn = duckdb.connect(database=':memory:')conn.execute("load tpch")#conn.execute("load 'tpch.duckdb_extension'")# 生成TPCH测试数据print("🚀 正在生成TPCH测试数据 (sf=0.001)...")conn.execute("CALL dbgen(sf=0.001)")# 获取所有表名tables_result = conn.execute("SHOW TABLES").fetchall()table_names = [table[0] for table in tables_result if not table[0].startswith('tpch_') and table[0] != 'queries']print(f"📊 找到 {len(table_names)} 张表: {', '.join(table_names)}")# 创建source.xlsx文件print("💾 正在导出数据到 source.xlsx...")with pd.ExcelWriter('source.xlsx', engine='openpyxl') as writer:for i, table_name in enumerate(table_names, 1):print(f"   [{i}/{len(table_names)}] 导出表: {table_name}")# 读取表数据df = conn.execute(f"SELECT * FROM {table_name}").fetchdf()# 保存到Excel的sheet中df.to_excel(writer, sheet_name=table_name, index=False)# 获取查询语句print("🔍 正在获取查询语句...")queries_df = conn.execute("SELECT query_nr, query FROM tpch_queries()").fetchdf()# 创建menu.xlsx文件print("💾 正在保存查询语句到 menu.xlsx...")with pd.ExcelWriter('menu.xlsx', engine='openpyxl') as writer:queries_df.to_excel(writer, sheet_name='queries', index=False)# 显示统计信息end_time = time.time()execution_time = end_time - start_timeprint("\n" + "=" * 50)print("✅ 数据生成完成!")print("=" * 50)print(f"📁 生成的文件:")print(f"   - source.xlsx: 包含 {len(table_names)} 张表")print(f"   - menu.xlsx: 包含 {len(queries_df)} 个查询语句")print(f"⏱️  执行时间: {execution_time:.2f} 秒")# 显示各表的数据量print("\n📈 各表数据量统计:")for table_name in table_names:count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]print(f"   - {table_name}: {count} 行")# 显示查询语句示例print(f"\n📝 查询语句示例 (前3个):")for i, (query_nr, query) in enumerate(queries_df.head(3).itertuples(index=False)):print(f"   {query_nr}: {query[:50]}..." if len(query) > 50 else f"   {query_nr}: {query}")except Exception as e:print(f"❌ 发生错误: {e}")raisefinally:if 'conn' in locals():conn.close()if __name__ == "__main__":generate_test_data_enhanced()

2.执行测试(你可以修改source和menu的内容,执行自己的查询,最简单的方法是把tpch改成tpcds测试)

import duckdb
import pandas as pd
from openpyxl import Workbook
import time
import osdef execute_queries_on_excel():"""执行menu.xlsx中的查询语句,在source.xlsx数据上执行,结果保存到target.xlsx"""print("=" * 60)print("第二步:执行查询语句并保存结果")print("=" * 60)start_time = time.time()# 检查必要文件是否存在if not os.path.exists('source.xlsx'):print("❌ source.xlsx 文件不存在,请先运行第一步生成测试数据")returnif not os.path.exists('menu.xlsx'):print("❌ menu.xlsx 文件不存在,请先运行第一步生成查询语句")returnconn = Nonetry:# 连接到内存数据库conn = duckdb.connect(database=':memory:')# 1. 读取source.xlsx的所有sheet并创建视图print("📖 正在读取source.xlsx并创建视图...")# 使用pd.ExcelFile获取所有sheet名excel_file = pd.ExcelFile('source.xlsx')sheet_names = excel_file.sheet_namesfor sheet_name in sheet_names:print(f"   🗂️  创建视图: {sheet_name}")# 使用read_xlsx函数创建视图conn.execute(f"""CREATE OR REPLACE VIEW {sheet_name} AS SELECT * FROM read_xlsx('source.xlsx', sheet='{sheet_name}')""")print(f"✅ 成功创建 {len(sheet_names)} 个视图")# 2. 读取menu.xlsx中的查询语句print("🔍 正在读取menu.xlsx中的查询语句...")queries_df = pd.read_excel('menu.xlsx', sheet_name='queries')# 确保列名正确if 'query_nr' not in queries_df.columns or 'query' not in queries_df.columns:print("❌ menu.xlsx格式不正确,需要包含query_nr和query列")returnprint(f"📋 找到 {len(queries_df)} 个查询语句")# 3. 创建target.xlsx文件,并执行每个查询print("🚀 正在执行查询并保存结果...")with pd.ExcelWriter('target.xlsx', engine='openpyxl') as writer:successful_queries = 0for index, row in queries_df.iterrows():query_nr = row['query_nr']query = row['query']try:print(f"   [{index + 1}/{len(queries_df)}] 执行查询 Q{query_nr}...")# 执行查询result_df = conn.execute(query).fetchdf()# 保存结果到target.xlsx的q{query_nr} sheetsheet_name = f"q{query_nr}"result_df.to_excel(writer, sheet_name=sheet_name, index=False)print(f"   ✅ Q{query_nr} 执行成功,结果行数: {len(result_df)}")successful_queries += 1except Exception as e:print(f"   ❌ Q{query_nr} 执行失败: {e}")# 创建错误信息sheeterror_df = pd.DataFrame({'error_message': [f"查询执行失败: {str(e)}"],'query_number': [query_nr],'query': [query[:100] + '...' if len(query) > 100 else query]})error_sheet_name = f"q{query_nr}_error"error_df.to_excel(writer, sheet_name=error_sheet_name, index=False)# 4. 显示执行统计end_time = time.time()execution_time = end_time - start_timeprint("\n" + "=" * 60)print("✅ 查询执行完成!")print("=" * 60)print(f"📊 执行统计:")print(f"   - 总查询数: {len(queries_df)}")print(f"   - 成功执行: {successful_queries}")print(f"   - 失败查询: {len(queries_df) - successful_queries}")print(f"⏱️  执行时间: {execution_time:.2f} 秒")print(f"📁 结果文件: target.xlsx")# 显示前几个查询的示例结果if successful_queries > 0:print(f"\n📈 示例结果:")for i in range(min(3, successful_queries)):try:sample_query = queries_df.iloc[i]['query']sample_result = conn.execute(sample_query).fetchdf()print(f"   Q{queries_df.iloc[i]['query_nr']}: {len(sample_result)} 行结果")except:passexcept Exception as e:print(f"❌ 程序执行失败: {e}")raisefinally:if conn:conn.close()def enhanced_execute_queries():"""增强版的查询执行,包含更多错误处理和详细信息"""print("=" * 60)print("增强版查询执行")print("=" * 60)# 检查文件是否存在required_files = ['source.xlsx', 'menu.xlsx']for file in required_files:if not os.path.exists(file):print(f"❌ 文件 {file} 不存在")returnconn = Nonetry:conn = duckdb.connect(database=':memory:')# 读取并显示source.xlsx的信息print("📊 source.xlsx 文件信息:")excel_file = pd.ExcelFile('source.xlsx')print(f"   - 包含 {len(excel_file.sheet_names)} 个sheet")print(f"   - Sheet名称: {', '.join(excel_file.sheet_names)}")# 创建视图print("\n🏗️  创建数据库视图...")for sheet_name in excel_file.sheet_names:try:conn.execute(f"""CREATE OR REPLACE VIEW {sheet_name} AS SELECT * FROM read_xlsx('source.xlsx', sheet='{sheet_name}')""")# 显示表结构table_info = conn.execute(f"DESCRIBE {sheet_name}").fetchdf()print(f"   ✅ {sheet_name}: {len(table_info)} 列")except Exception as e:print(f"   ❌ 创建视图 {sheet_name} 失败: {e}")# 读取查询语句queries_df = pd.read_excel('menu.xlsx', sheet_name='queries')print(f"\n🔍 读取到 {len(queries_df)} 个查询语句")# 执行查询print("\n🚀 开始执行查询...")results = {}with pd.ExcelWriter('target.xlsx', engine='openpyxl') as writer:for index, row in queries_df.iterrows():query_nr = int(row['query_nr'])query = row['query']print(f"   🔄 执行 Q{query_nr}...")try:# 执行查询result_df = conn.execute(query).fetchdf()# 保存结果sheet_name = f"q{query_nr}"result_df.to_excel(writer, sheet_name=sheet_name, index=False)results[query_nr] = {'status': 'success','row_count': len(result_df),'columns': list(result_df.columns)}print(f"   ✅ Q{query_nr} 成功: {len(result_df)} 行, {len(result_df.columns)} 列")except Exception as e:error_msg = str(e)results[query_nr] = {'status': 'error','error': error_msg}print(f"   ❌ Q{query_nr} 失败: {error_msg}")# 保存错误信息error_df = pd.DataFrame({'query_number': [query_nr],'error_message': [error_msg],'query': [query[:200] + '...' if len(query) > 200 else query]})error_sheet_name = f"q{query_nr}_error"error_df.to_excel(writer, sheet_name=error_sheet_name, index=False)# 生成执行报告print("\n" + "=" * 60)print("📋 执行报告")print("=" * 60)success_count = sum(1 for r in results.values() if r['status'] == 'success')error_count = len(results) - success_countprint(f"✅ 成功: {success_count} 个查询")print(f"❌ 失败: {error_count} 个查询")if success_count > 0:print("\n📈 成功查询详情:")for q_nr, result in results.items():if result['status'] == 'success':print(f"   Q{q_nr}: {result['row_count']} 行, 列: {', '.join(result['columns'][:3])}{'...' if len(result['columns']) > 3 else ''}")if error_count > 0:print("\n⚠️  失败查询详情:")for q_nr, result in results.items():if result['status'] == 'error':print(f"   Q{q_nr}: {result['error']}")print(f"\n💾 结果已保存到: target.xlsx")except Exception as e:print(f"❌ 程序执行失败: {e}")raisefinally:if conn:conn.close()# 主程序
if __name__ == "__main__":# 执行查询execute_queries_on_excel()# 或者使用增强版# enhanced_execute_queries()

如果数据量较大,视图改成内存表更高效即把create view改为create table。

http://www.dtcms.com/a/345052.html

相关文章:

  • 电机驱动实现插补算法之脉冲和方向接收(以stm32主控为例)
  • 飞算JavaAI开发助手: 新手开发任务管理系统实战流程
  • STM32G4-比较器
  • Autosar之Com模块
  • Redis面试精讲 Day 27:Redis 7.0/8.0新特性深度解析
  • 基于STM32+Python+MySQL实现在线温度计设计和制作
  • 【高等数学笔记-极限(4)】极限的运算法则
  • 大麦盒子DM4036-精简固件包及教程
  • Vue2+Vue3前端开发_Day7
  • [TG开发]部署机器人
  • Java多线程编程与锁机制全解析(覆盖Java到Spring)
  • 从0到1打造一台机器人走起来
  • 技术解读|MatrixOne高效 CDC:基于快照的分布式数据库优化方案
  • AI如何赋能财务分析:1份财务报表录入从数小时到5分钟
  • 声网SDK更新,多场景抗弱网稳定性大幅增强
  • 制造企业用档案宝,档案清晰可查
  • ArrayList线程不安全问题及解决方案详解
  • AI:业务驱动与技术赋能:企业智能化应用的双向进化深度指南
  • 红酒数据集预处理实战:缺失值处理的 5 种打开方式,从入门到进阶一步到位
  • vue-admin-template权限管理
  • 信创认证是什么?怎么报考?
  • 特级资质信息化迎检核心流程经验分享
  • Pod控制器详解
  • STM32之ADC详解
  • [系统架构设计师]大数据架构设计理论与实践(十九)
  • ​维基框架 (Wiki Framework) 1.1.0 版本发布​ 提供多模型AI辅助开发
  • TNS(ORACLE)协议分析
  • [硬件电路-162]:PID参数受哪些因素影响?
  • 【Redis】缓存和分布式锁
  • MySQL - 视图,事务和索引