Python实现高效导入Excel数据到ClickHouse
设计一个从Excel文件导入Clickhouse表的程序流程,并用Python来实现。单进程读取Excel文件,分批读取,每批读取1000条,使用INSERT的语句带多组值来写入数据库,比如100组值,可以创建多个列表来缓存这些插入数据库的语句,实现连续读取数据不中断,创建单个进程来读取顺序这些列表来写入表,读取完列表的数据之后,清空列表以便下一次再写入插入数据库的依据,我希望读取和写入数据库的操作,尽量减少中间锁对象和等待完成的时间,以最高的性能运行。
import pandas as pd
import queue
import threading
from clickhouse_driver import Client
from time import perf_counterclass ExcelToClickHouse:def __init__(self, excel_path, ch_host, ch_database, ch_table, batch_size=1000, insert_batch=100):""":param excel_path: Excel文件路径:param ch_host: ClickHouse服务器地址:param ch_database: 数据库名:param ch_table: 表名:param batch_size: 每批读取行数:param insert_batch: 每次插入的批次数"""self.excel_path = excel_pathself.ch_client = Client(host=ch_host, database=ch_database)self.ch_table = ch_tableself.batch_size = batch_sizeself.insert_batch = insert_batchself.data_queue = queue.Queue(maxsize=10) # 控制内存使用self.producer_done = threading.Event()self.total_rows = 0self.inserted_rows = 0self.lock = threading.Lock()def _producer(self):"""生产者:读取Excel并分批次放入队列"""try:# 首次读取获取列名df_sample = pd.read_excel(self.excel_path, nrows=1)columns = df_sample.columns.tolist()# 重新创建迭代器reader = pd.read_excel(self.excel_path, chunksize=self.batch_size,iterator=True)for chunk in reader:# 转换为元组列表 (更高效的数据格式)data = [tuple(row) for row