多路径PKL文件读取与合并
文章目录
- 一 核心概念对比
- 1.1 进程: 操作系统资源分配单元
- 1.2 线程: 轻量级执行流
- 1.3 协程: 用户态异步
- 二 示例: 多路径PKL文件读取与合并
- 2.1 问题场景描述
- 2.2 多进程方案实现
- 2.3 多线程优化版本
- 2.4 协程异步方案
- 三 混合编程: 构建高性能复合方案
- 3.1 进程+协程架构设计
- 3.2 线程池+协程模式
- 3.3 性能优化关键策略
- 四 常见问题解决方案
一 核心概念对比
1.1 进程: 操作系统资源分配单元
进程是程序执行的独立实例, 每个进程拥有独立内存空间 (堆, 栈, 代码段) 和系统资源 (文件描述符, 环境变量) . 在Python中可通过multiprocessing
模块创建, 适用于:
- CPU密集型任务 (如图像处理, 数学计算)
- 需要内存隔离的场景 (如不同业务模块)
- 利用多核处理器实现真并行
1.2 线程: 轻量级执行流
线程共享进程的内存空间, 通过threading
模块创建. 特点包括:
- 共享堆内存, 通信成本低
- 受GIL (全局解释器锁) 限制, 适合I/O密集型任务
- 上下文切换开销约为进程的1/5
1.3 协程: 用户态异步
基于生成器和asyncio
的协程实现单线程并发, 特点:
- 上下文切换无需内核介入 (开销仅线程的3.7%)
- 需要显式
await
让出控制权 - 完美适配I/O密集型任务 (网络请求, 文件读写)
二 示例: 多路径PKL文件读取与合并
2.1 问题场景描述
假设存在以下文件结构:
/pkl_data/
├─ group1/
│ ├─ data_001.pkl
│ └─ data_002.pkl
├─ group2/
│ ├─ info_001.pkl
│ └─ info_002.pkl
...
要求将1000+个分散在不同路径的pkl文件快速读取, 合并到统一的DataFrame, 并处理以下挑战:
- 路径遍历与文件发现
- 并行读取优化
- DataFrame的线程安全写入
2.2 多进程方案实现
import os
import pickle
import pandas as pd
from multiprocessing import Pool, Manager
def read_pkl(path):
with open(path, 'rb') as f:
return pickle.load(f)
def process_group(group_dir, df_lock, shared_df):
pkl_files = [os.path.join(group_dir, f)
for f in os.listdir(group_dir) if f.endswith('.pkl')]
group_data = []
for pkl in pkl_files:
data = read_pkl(pkl)
group_data.append(data)
with df_lock: # 进程级互斥锁
temp_df = pd.concat([shared_df, pd.DataFrame(group_data)])
shared_df[:] = temp_df.values.tolist() # 共享内存操作
if __name__ == '__main__':
base_path = '/pkl_data'
groups = [os.path.join(base_path, g) for g in os.listdir(base_path)]
with Manager() as manager:
shared_df = manager.list()
lock = manager.Lock()
with Pool(processes=os.cpu_count()) as pool:
pool.starmap(process_group, [(g, lock, shared_df) for g in groups])
final_df = pd.DataFrame(list(shared_df))
2.3 多线程优化版本
from concurrent.futures import ThreadPoolExecutor
import threading
def thread_worker(pkl_path, df, lock):
data = read_pkl(pkl_path)
with lock: # 线程级互斥
df.append(data, ignore_index=True)
def main():
all_pkl = [...] # 所有pkl文件路径列表
df = pd.DataFrame()
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=32) as executor:
futures = [executor.submit(thread_worker, p, df, lock)
for p in all_pkl]
for f in futures:
f.result()
2.4 协程异步方案
import aiofiles
import asyncio
async def async_read(pkl_path, df, lock):
async with aiofiles.open(pkl_path, 'rb') as f:
data = pickle.loads(await f.read())
async with lock: # 协程级互斥
df.append(data, ignore_index=True)
async def main():
all_pkl = [...]
df = pd.DataFrame()
lock = asyncio.Lock()
tasks = [async_read(p, df, lock) for p in all_pkl]
await asyncio.gather(*tasks)
三 混合编程: 构建高性能复合方案
3.1 进程+协程架构设计
from multiprocessing import Pool
import asyncio
async def async_worker(group_path):
# 每个进程内运行独立事件循环
all_pkl = [...]
df = pd.DataFrame()
await asyncio.gather(*[async_read(p, df) for p in all_pkl])
return df
def process_worker(group_path):
loop = asyncio.new_event_loop()
return loop.run_until_complete(async_worker(group_path))
if __name__ == '__main__':
groups = [...] # 分组路径
with Pool() as pool:
results = pool.map(process_worker, groups)
final_df = pd.concat(results)
3.2 线程池+协程模式
def hybrid_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_main())
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(hybrid_worker) # 每个线程运行独立事件循环
3.3 性能优化关键策略
- 分层并发控制: 进程级处理不同磁盘分区, 线程级处理目录遍历, 协程处理文件读取
- 内存共享优化:
- 使用Apache Arrow格式进行零拷贝传输
- 通过共享内存 (
multiprocessing.shared_memory
) 传递大型数组
- 动态负载均衡:
from concurrent.futures import ProcessPoolExecutor, as_completed def dynamic_dispatch(): with ProcessPoolExecutor() as executor: futures = {executor.submit(task, param): param for param in params} for future in as_completed(futures): result = future.result() # 动态分配新任务
四 常见问题解决方案
问题类型 | 现象 | 解决方案 |
---|---|---|
内存泄漏 | DataFrame合并后内存不释放 | 使用dask.dataframe 延迟加载 |
死锁 | 多级锁嵌套导致阻塞 | 统一获取锁的顺序 |
序列化错误 | 跨进程传递复杂对象失败 | 使用cloudpickle 代替标准pickle |