【ReText】1.3 Python multiprocessing 库详解
multiprocessing
是 Python 的一个标准库,用于创建多进程程序。它通过创建子进程来绕过 GIL(全局解释器锁),允许程序真正地同时利用多个 CPU 核心,从而显著提高 CPU 密集型任务的性能。
核心概念:为什么需要多进程?
Python 的 threading
多线程由于 GIL 的存在,只适合处理 I/O 密集型任务(如网络请求、文件读写)。对于 CPU 密集型任务(如数学计算、图像处理),多个线程无法真正并行执行。
多进程的优势:
- 真正的并行:每个进程有独立的 Python 解释器和内存空间,因此每个进程都有自己的 GIL,可以同时在不同的 CPU 核心上运行。
- 避免 GIL 限制:彻底解决了 GIL 对 CPU 密集型任务的限制。
- 稳定性更高:一个进程崩溃通常不会影响其他进程。
多进程的劣势:
- 资源开销大:创建进程比创建线程消耗更多的系统资源(内存、CPU 初始化时间)。
- 进程间通信(IPC)复杂:不能像线程那样直接共享内存,需要通过队列(Queue)、管道(Pipe)等机制进行通信,速度较慢。
核心组件与用法
1. Process
类:创建子进程
这是最基本的方式,用于创建一个独立的进程来执行任务。
import multiprocessing
import os
import timedef worker(task_id, duration):"""子进程要执行的任务"""print(f"进程 {task_id} (PID: {os.getpid()}) 开始执行,父进程PID: {os.getppid()}")time.sleep(duration) # 模拟工作耗时print(f"进程 {task_id} 结束")return f"任务 {task_id} 完成"if __name__ == '__main__': # 必须使用保护,尤其是在Windows上processes = []start_time = time.time()# 创建3个进程for i in range(3):# target: 要执行的函数,args: 传给函数的参数p = multiprocessing.Process(target=worker, args=(i, 2))processes.append(p)p.start() # 启动进程# 等待所有进程结束for p in processes:p.join() # 阻塞主进程,直到子进程p运行完毕end_time = time.time()print(f"所有进程执行完毕,总耗时: {end_time - start_time:.2f} 秒")
关键点:
target
:指定子进程要运行的函数。args
/kwargs
:传递给目标函数的参数。start()
:启动进程。join()
:等待进程终止。if __name__ == '__main__':
:在 Windows 系统上必须使用,否则创建新进程时会递归执行代码,导致错误。
2. Pool
类:进程池
对于需要创建大量进程的任务,使用进程池可以更好地管理资源,避免频繁创建和销毁进程的开销。
import multiprocessing
import timedef cpu_intensive_task(n):"""模拟一个CPU密集型任务:计算n的平方"""print(f"处理数字: {n}")time.sleep(0.5) # 模拟计算耗时return n * nif __name__ == '__main__':# 创建一个包含4个worker的进程池(通常为CPU核心数)with multiprocessing.Pool(processes=4) as pool:# 方法一: map - 同步执行,阻塞主进程直到所有任务完成numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]results_map = pool.map(cpu_intensive_task, numbers)print("map 结果:", results_map)# 方法二: apply_async - 异步执行,立即返回AsyncResult对象async_result = pool.apply_async(cpu_intensive_task, (25,))# 之后可以通过 async_result.get() 获取结果(会阻塞)print("异步结果:", async_result.get())# 方法三: map_async - 异步版的mapasync_map_result = pool.map_async(cpu_intensive_task, numbers)results_async = async_map_result.get() # 获取所有结果print("map_async 结果:", results_async)
Pool 常用方法:
map(func, iterable)
:同步映射,最常用。apply_async(func, args)
:异步提交单个任务。map_async(func, iterable)
:异步映射。close()
:关闭进程池,不再接受新任务。terminate()
:立即终止所有工作进程。join()
:等待所有工作进程退出(必须在close()
或terminate()
之后调用)。
3. 进程间通信 (IPC)
由于进程不共享内存,必须使用特殊的对象进行数据交换。
a. Queue
(队列)
一个先进先出(FIFO)的管道,用于进程间安全地传递消息。
import multiprocessing
import timedef producer(queue, items):"""生产者进程:向队列中放入数据"""for item in items:print(f"生产: {item}")queue.put(item)time.sleep(0.1)queue.put(None) # 发送结束信号def consumer(queue):"""消费者进程:从队列中取出数据"""while True:item = queue.get()if item is None: # 收到结束信号breakprint(f"消费: {item}")time.sleep(0.2)if __name__ == '__main__':# 创建队列q = multiprocessing.Queue()# 创建进程p1 = multiprocessing.Process(target=producer, args=(q, [1, 2, 3, 4, 5]))p2 = multiprocessing.Process(target=consumer, args=(q,))p1.start()p2.start()p1.join()p2.join()
b. Pipe
(管道)
提供一个双向或单向的连接,返回一对连接对象 (conn1, conn2)
。
def sender(conn, messages):for message in messages:conn.send(message) # 发送消息print(f"发送: {message}")conn.close() # 关闭连接def receiver(conn):while True:try:message = conn.recv() # 接收消息print(f"接收: {message}")except EOFError: # 当连接已关闭且没有数据可读时breakif __name__ == '__main__':parent_conn, child_conn = multiprocessing.Pipe() # 创建管道p1 = multiprocessing.Process(target=sender, args=(child_conn, ['Hello', 'World', None]))p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))p1.start()p2.start()p1.join()child_conn.close() # 确保子连接关闭p2.join()
4. 进程间共享状态
虽然进程内存独立,但 multiprocessing
提供了在进程间共享数据的方式。
a. Value
/ Array
:共享内存
import multiprocessingdef worker(n, shared_value, shared_array):"""修改共享值"""with n.get_lock(): # 获取锁以确保原子操作shared_value.value += nshared_array[n] = n * n # 修改共享数组if __name__ == '__main__':# 'i' 表示整数类型, 'd' 表示双精度浮点数counter = multiprocessing.Value('i', 0) # 共享整数值,初始为0squares = multiprocessing.Array('i', 10) # 共享整数数组,长度为10processes = []for i in range(5):p = multiprocessing.Process(target=worker, args=(i, counter, squares))processes.append(p)p.start()for p in processes:p.join()print(f"最终计数: {counter.value}")print(f"平方数组: {list(squares)}")
b. Manager
:服务器进程
Manager
可以创建共享的列表、字典等更复杂的数据结构,但速度比共享内存慢。
def worker(shared_dict, key, value):shared_dict[key] = valueif __name__ == '__main__':with multiprocessing.Manager() as manager:shared_dict = manager.dict() # 共享字典shared_list = manager.list(range(5)) # 共享列表processes = []for i in range(3):p = multiprocessing.Process(target=worker, args=(shared_dict, f'key_{i}', i))processes.append(p)p.start()for p in processes:p.join()print(f"共享字典: {dict(shared_dict)}")print(f"共享列表: {list(shared_list)}")
适用场景与最佳实践
适用场景:
- CPU 密集型任务:数学计算、图像处理、数据压缩/加密、科学模拟。
- 需要真正并行的任务。
- 需要更高稳定性的任务(进程隔离)。
最佳实践:
- 合理设置进程数:通常设置为 CPU 核心数量(
multiprocessing.cpu_count()
)。 - 优先使用进程池 (
Pool
):避免频繁创建销毁进程的开销。 - 尽量减少进程间通信:IPC 是主要性能瓶颈,尽量让每个进程独立处理数据。
- 使用
if __name__ == '__main__':
:尤其是在 Windows 上,这是必须的。 - 妥善处理异常:子进程的异常不会自动传递给父进程,需要在目标函数内部捕获和处理。
总结
multiprocessing
库是 Python 中进行真正并行计算的首选工具。它通过创建独立的进程来充分利用多核 CPU,完美规避了 GIL 的限制。虽然进程间通信比线程复杂且开销更大,但对于需要显著提升计算性能的场景,multiprocessing
是不可或缺的强大工具。掌握 Process
、Pool
、Queue
等核心组件,能够帮助你构建出高效可靠的并行程序。