当前位置: 首页 > news >正文

多路径PKL文件读取与合并

文章目录

    • 一 核心概念对比
      • 1.1 进程: 操作系统资源分配单元
      • 1.2 线程: 轻量级执行流
      • 1.3 协程: 用户态异步
    • 二 示例: 多路径PKL文件读取与合并
      • 2.1 问题场景描述
      • 2.2 多进程方案实现
      • 2.3 多线程优化版本
      • 2.4 协程异步方案
    • 三 混合编程: 构建高性能复合方案
      • 3.1 进程+协程架构设计
      • 3.2 线程池+协程模式
      • 3.3 性能优化关键策略
    • 四 常见问题解决方案

一 核心概念对比

1.1 进程: 操作系统资源分配单元

进程是程序执行的独立实例, 每个进程拥有独立内存空间 (堆, 栈, 代码段) 和系统资源 (文件描述符, 环境变量) . 在Python中可通过multiprocessing模块创建, 适用于:

  • CPU密集型任务 (如图像处理, 数学计算)
  • 需要内存隔离的场景 (如不同业务模块)
  • 利用多核处理器实现真并行

1.2 线程: 轻量级执行流

线程共享进程的内存空间, 通过threading模块创建. 特点包括:

  • 共享堆内存, 通信成本低
  • 受GIL (全局解释器锁) 限制, 适合I/O密集型任务
  • 上下文切换开销约为进程的1/5

1.3 协程: 用户态异步

基于生成器和asyncio的协程实现单线程并发, 特点:

  • 上下文切换无需内核介入 (开销仅线程的3.7%)
  • 需要显式await让出控制权
  • 完美适配I/O密集型任务 (网络请求, 文件读写)

二 示例: 多路径PKL文件读取与合并

2.1 问题场景描述

假设存在以下文件结构:

/pkl_data/
├─ group1/
│  ├─ data_001.pkl
│  └─ data_002.pkl  
├─ group2/
│  ├─ info_001.pkl
│  └─ info_002.pkl
...

要求将1000+个分散在不同路径的pkl文件快速读取, 合并到统一的DataFrame, 并处理以下挑战:

  • 路径遍历与文件发现
  • 并行读取优化
  • DataFrame的线程安全写入

2.2 多进程方案实现

import os
import pickle
import pandas as pd
from multiprocessing import Pool, Manager

def read_pkl(path):
    with open(path, 'rb') as f:
        return pickle.load(f)

def process_group(group_dir, df_lock, shared_df):
    pkl_files = [os.path.join(group_dir, f) 
                for f in os.listdir(group_dir) if f.endswith('.pkl')]
    
    group_data = []
    for pkl in pkl_files:
        data = read_pkl(pkl)
        group_data.append(data)
    
    with df_lock:  # 进程级互斥锁
        temp_df = pd.concat([shared_df, pd.DataFrame(group_data)])
        shared_df[:] = temp_df.values.tolist()  # 共享内存操作

if __name__ == '__main__':
    base_path = '/pkl_data'
    groups = [os.path.join(base_path, g) for g in os.listdir(base_path)]
    
    with Manager() as manager:
        shared_df = manager.list()
        lock = manager.Lock()
        
        with Pool(processes=os.cpu_count()) as pool:
            pool.starmap(process_group, [(g, lock, shared_df) for g in groups])
        
        final_df = pd.DataFrame(list(shared_df))

2.3 多线程优化版本

from concurrent.futures import ThreadPoolExecutor
import threading

def thread_worker(pkl_path, df, lock):
    data = read_pkl(pkl_path)
    with lock:  # 线程级互斥
        df.append(data, ignore_index=True)

def main():
    all_pkl = [...]  # 所有pkl文件路径列表
    df = pd.DataFrame()
    lock = threading.Lock()
    
    with ThreadPoolExecutor(max_workers=32) as executor:
        futures = [executor.submit(thread_worker, p, df, lock) 
                  for p in all_pkl]
        for f in futures:
            f.result()

2.4 协程异步方案

import aiofiles
import asyncio

async def async_read(pkl_path, df, lock):
    async with aiofiles.open(pkl_path, 'rb') as f:
        data = pickle.loads(await f.read())
    
    async with lock:  # 协程级互斥
        df.append(data, ignore_index=True)

async def main():
    all_pkl = [...]  
    df = pd.DataFrame()
    lock = asyncio.Lock()
    
    tasks = [async_read(p, df, lock) for p in all_pkl]
    await asyncio.gather(*tasks)

三 混合编程: 构建高性能复合方案

3.1 进程+协程架构设计

from multiprocessing import Pool
import asyncio

async def async_worker(group_path):
    # 每个进程内运行独立事件循环
    all_pkl = [...]  
    df = pd.DataFrame()
    await asyncio.gather(*[async_read(p, df) for p in all_pkl])
    return df

def process_worker(group_path):
    loop = asyncio.new_event_loop()
    return loop.run_until_complete(async_worker(group_path))

if __name__ == '__main__':
    groups = [...]  # 分组路径
    with Pool() as pool:
        results = pool.map(process_worker, groups)
    
    final_df = pd.concat(results)

3.2 线程池+协程模式

def hybrid_worker():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(async_main())

with ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(hybrid_worker)  # 每个线程运行独立事件循环

3.3 性能优化关键策略

  1. 分层并发控制: 进程级处理不同磁盘分区, 线程级处理目录遍历, 协程处理文件读取
  2. 内存共享优化:
    • 使用Apache Arrow格式进行零拷贝传输
    • 通过共享内存 (multiprocessing.shared_memory) 传递大型数组
  3. 动态负载均衡:
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    def dynamic_dispatch():
        with ProcessPoolExecutor() as executor:
            futures = {executor.submit(task, param): param for param in params}
            for future in as_completed(futures):
                result = future.result()
                # 动态分配新任务
    

四 常见问题解决方案

问题类型现象解决方案
内存泄漏DataFrame合并后内存不释放使用dask.dataframe延迟加载
死锁多级锁嵌套导致阻塞统一获取锁的顺序
序列化错误跨进程传递复杂对象失败使用cloudpickle代替标准pickle

相关文章:

  • 云服务器怎么设置端口禁用呢?
  • Python 迭代器与生成器:深入理解与实践
  • 资源分配图(RAG)检测死锁算法实现
  • 【数据库】sql错题详解
  • Android 16开发实战指南|锁屏交互+Vulkan优化全解析
  • QuectPython 网络协议之TCP/UDP协议最祥解析
  • drizzleDumper:基于内存搜索的Android脱壳工具
  • 计算机视觉算法实战——相机标定技术
  • 無人機高空收集地形之linux server 的應用部署
  • 三相永磁同步电机的控制方法之六步换向控制(Six-Step Commutation)
  • 2、pytest核心功能(进阶用法)
  • CS实现票据样式效果
  • IIS漏洞再现
  • 七、GPIO中断控制器(2)—— pcf8575
  • 阅读li2019-DOT源码--逐步调试
  • 【机器学习】什么是逻辑回归?
  • 分页查询互动问题(管理端)
  • 测试工程 常用Python库
  • FPGA_DDS_IP核
  • 【RHCE】LVS-NAT模式负载均衡实验
  • 几天洗一次头发最好?终于有答案了...
  • 新华时评:需要“重新平衡”的是美国心态
  • 三家“券商系”公募同日变更掌门人,新董事长均为公司股东方老将
  • 网警查处编造传播“登顶泰山最高可得3万奖金”网络谣言者
  • 中国强镇密码丨洪泽湖畔的蒋坝,如何打破古镇刻板印象
  • 两部门预拨4000万元支持山西、广西、陕西做好抗旱救灾工作