当前位置: 首页 > news >正文

【ReText】1.3 Python multiprocessing 库详解

multiprocessing 是 Python 的一个标准库,用于创建多进程程序。它通过创建子进程来绕过 GIL(全局解释器锁),允许程序真正地同时利用多个 CPU 核心,从而显著提高 CPU 密集型任务的性能。


核心概念:为什么需要多进程?

Python 的 threading 多线程由于 GIL 的存在,只适合处理 I/O 密集型任务(如网络请求、文件读写)。对于 CPU 密集型任务(如数学计算、图像处理),多个线程无法真正并行执行。

多进程的优势:

  • 真正的并行:每个进程有独立的 Python 解释器和内存空间,因此每个进程都有自己的 GIL,可以同时在不同的 CPU 核心上运行。
  • 避免 GIL 限制:彻底解决了 GIL 对 CPU 密集型任务的限制。
  • 稳定性更高:一个进程崩溃通常不会影响其他进程。

多进程的劣势:

  • 资源开销大:创建进程比创建线程消耗更多的系统资源(内存、CPU 初始化时间)。
  • 进程间通信(IPC)复杂:不能像线程那样直接共享内存,需要通过队列(Queue)、管道(Pipe)等机制进行通信,速度较慢。

核心组件与用法

1. Process 类:创建子进程

这是最基本的方式,用于创建一个独立的进程来执行任务。

import multiprocessing
import os
import timedef worker(task_id, duration):"""子进程要执行的任务"""print(f"进程 {task_id} (PID: {os.getpid()}) 开始执行,父进程PID: {os.getppid()}")time.sleep(duration) # 模拟工作耗时print(f"进程 {task_id} 结束")return f"任务 {task_id} 完成"if __name__ == '__main__': # 必须使用保护,尤其是在Windows上processes = []start_time = time.time()# 创建3个进程for i in range(3):# target: 要执行的函数,args: 传给函数的参数p = multiprocessing.Process(target=worker, args=(i, 2))processes.append(p)p.start() # 启动进程# 等待所有进程结束for p in processes:p.join() # 阻塞主进程,直到子进程p运行完毕end_time = time.time()print(f"所有进程执行完毕,总耗时: {end_time - start_time:.2f} 秒")

关键点:

  • target:指定子进程要运行的函数。
  • args/kwargs:传递给目标函数的参数。
  • start():启动进程。
  • join():等待进程终止。
  • if __name__ == '__main__'::在 Windows 系统上必须使用,否则创建新进程时会递归执行代码,导致错误。

2. Pool 类:进程池

对于需要创建大量进程的任务,使用进程池可以更好地管理资源,避免频繁创建和销毁进程的开销。

import multiprocessing
import timedef cpu_intensive_task(n):"""模拟一个CPU密集型任务:计算n的平方"""print(f"处理数字: {n}")time.sleep(0.5) # 模拟计算耗时return n * nif __name__ == '__main__':# 创建一个包含4个worker的进程池(通常为CPU核心数)with multiprocessing.Pool(processes=4) as pool:# 方法一: map - 同步执行,阻塞主进程直到所有任务完成numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]results_map = pool.map(cpu_intensive_task, numbers)print("map 结果:", results_map)# 方法二: apply_async - 异步执行,立即返回AsyncResult对象async_result = pool.apply_async(cpu_intensive_task, (25,))# 之后可以通过 async_result.get() 获取结果(会阻塞)print("异步结果:", async_result.get())# 方法三: map_async - 异步版的mapasync_map_result = pool.map_async(cpu_intensive_task, numbers)results_async = async_map_result.get() # 获取所有结果print("map_async 结果:", results_async)

Pool 常用方法:

  • map(func, iterable):同步映射,最常用。
  • apply_async(func, args):异步提交单个任务。
  • map_async(func, iterable):异步映射。
  • close():关闭进程池,不再接受新任务。
  • terminate():立即终止所有工作进程。
  • join():等待所有工作进程退出(必须在 close()terminate() 之后调用)。

3. 进程间通信 (IPC)

由于进程不共享内存,必须使用特殊的对象进行数据交换。

a. Queue (队列)

一个先进先出(FIFO)的管道,用于进程间安全地传递消息。

import multiprocessing
import timedef producer(queue, items):"""生产者进程:向队列中放入数据"""for item in items:print(f"生产: {item}")queue.put(item)time.sleep(0.1)queue.put(None) # 发送结束信号def consumer(queue):"""消费者进程:从队列中取出数据"""while True:item = queue.get()if item is None: # 收到结束信号breakprint(f"消费: {item}")time.sleep(0.2)if __name__ == '__main__':# 创建队列q = multiprocessing.Queue()# 创建进程p1 = multiprocessing.Process(target=producer, args=(q, [1, 2, 3, 4, 5]))p2 = multiprocessing.Process(target=consumer, args=(q,))p1.start()p2.start()p1.join()p2.join()
b. Pipe (管道)

提供一个双向或单向的连接,返回一对连接对象 (conn1, conn2)

def sender(conn, messages):for message in messages:conn.send(message) # 发送消息print(f"发送: {message}")conn.close() # 关闭连接def receiver(conn):while True:try:message = conn.recv() # 接收消息print(f"接收: {message}")except EOFError: # 当连接已关闭且没有数据可读时breakif __name__ == '__main__':parent_conn, child_conn = multiprocessing.Pipe() # 创建管道p1 = multiprocessing.Process(target=sender, args=(child_conn, ['Hello', 'World', None]))p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))p1.start()p2.start()p1.join()child_conn.close() # 确保子连接关闭p2.join()

4. 进程间共享状态

虽然进程内存独立,但 multiprocessing 提供了在进程间共享数据的方式。

a. Value / Array:共享内存
import multiprocessingdef worker(n, shared_value, shared_array):"""修改共享值"""with n.get_lock(): # 获取锁以确保原子操作shared_value.value += nshared_array[n] = n * n # 修改共享数组if __name__ == '__main__':# 'i' 表示整数类型, 'd' 表示双精度浮点数counter = multiprocessing.Value('i', 0) # 共享整数值,初始为0squares = multiprocessing.Array('i', 10) # 共享整数数组,长度为10processes = []for i in range(5):p = multiprocessing.Process(target=worker, args=(i, counter, squares))processes.append(p)p.start()for p in processes:p.join()print(f"最终计数: {counter.value}")print(f"平方数组: {list(squares)}")
b. Manager:服务器进程

Manager 可以创建共享的列表、字典等更复杂的数据结构,但速度比共享内存慢。

def worker(shared_dict, key, value):shared_dict[key] = valueif __name__ == '__main__':with multiprocessing.Manager() as manager:shared_dict = manager.dict() # 共享字典shared_list = manager.list(range(5)) # 共享列表processes = []for i in range(3):p = multiprocessing.Process(target=worker, args=(shared_dict, f'key_{i}', i))processes.append(p)p.start()for p in processes:p.join()print(f"共享字典: {dict(shared_dict)}")print(f"共享列表: {list(shared_list)}")

适用场景与最佳实践

适用场景:

  1. CPU 密集型任务:数学计算、图像处理、数据压缩/加密、科学模拟。
  2. 需要真正并行的任务。
  3. 需要更高稳定性的任务(进程隔离)。

最佳实践:

  1. 合理设置进程数:通常设置为 CPU 核心数量(multiprocessing.cpu_count())。
  2. 优先使用进程池 (Pool):避免频繁创建销毁进程的开销。
  3. 尽量减少进程间通信:IPC 是主要性能瓶颈,尽量让每个进程独立处理数据。
  4. 使用 if __name__ == '__main__'::尤其是在 Windows 上,这是必须的。
  5. 妥善处理异常:子进程的异常不会自动传递给父进程,需要在目标函数内部捕获和处理。

总结

multiprocessing 库是 Python 中进行真正并行计算的首选工具。它通过创建独立的进程来充分利用多核 CPU,完美规避了 GIL 的限制。虽然进程间通信比线程复杂且开销更大,但对于需要显著提升计算性能的场景,multiprocessing 是不可或缺的强大工具。掌握 ProcessPoolQueue 等核心组件,能够帮助你构建出高效可靠的并行程序。

http://www.dtcms.com/a/391081.html

相关文章:

  • Liunx系统下出现“Could not resolve host: mirrorlist.centos.org; 未知的错误”地解决方案
  • CentOS Stream 9安装系统(LVM扩容案例)
  • Docusign AI 全球化:构建安全、合规的多语言协议管理
  • C# 基于halcon的视觉工作流-章37-零件测量
  • 第二部分:VTK核心类详解(第38章 vtkPointData点数据类)
  • 木卫四科技 × 一汽解放商用车开发院: 共驱商用车 AI 研发新程
  • 【C++闯关笔记】STL:stack与queue的学习和使用
  • [HCTF 2018] WarmUp
  • Vue 学习随笔系列二十六 —— 动态表头
  • BIM 可视化运维平台 + IBMS 中央集成系统一体化解决方案:构建虚实融合的智慧运营中枢
  • XSUN_DESKTOP_PET(桌面宠物)
  • 具身智能VR遥操开发记录
  • 构建AI智能体:三十八、告别“冷启动”:看大模型如何解决推荐系统的世纪难题
  • [重学Rust]之结构体打印和转换
  • 数据结构(陈越,何钦铭) 第十一讲 散列查找
  • 2025年JBD SCI2区TOP,基于改进蚁群算法的应急路径规划,深度解析+性能实测
  • UIKit-layer
  • 一物一码公司推荐再互动平台
  • Wireshark捕获MQTT报文
  • Docker镜像核心作战手册:镜像命令全解析+离线迁移实战+压缩共享储存,打造无缝跨环境部署!
  • Static Deinitialization Order Fiasco
  • 如何使用 Qt Creator 高效调试
  • 保障路灯用电安全!配电箱漏电检测,为城市照明筑牢防线
  • 不同版本tensorflow推理报错解决方法
  • 嵌入式铁头山羊STM32-各章节详细笔记-查阅传送门
  • 在没有随机对照的情况下如果做实验对比:双重差分法(结合虚拟变量回归)(五)
  • 材质、效率双突破:Rendercool 解决室内渲染核心痛点
  • 【ThreeJs】【材质Material】核心材质参数解析手册
  • 无人机桨叶的材质与工艺对飞行速度的影响
  • PMBOK第六版项目沟通管理总结