当前位置: 首页 > news >正文

Day05:Python中的并发和并行(3)

使用 multiprocessing 模块实现多进程

Python 中的 multiprocessing 模块提供了一种创建进程的方式,允许你并行运行代码。这对于受全局解释器锁(GIL)限制的多线程可能有限的 CPU 密集型任务特别有用。通过利用多个核心,multiprocessing 可以显著提高计算密集型操作的性能。本课程将涵盖使用 multiprocessing 模块的基础知识,包括进程创建、通信和同步。

理解 multiprocessing 模块

multiprocessing 模块是一个支持使用与 threading 模块相似的 API 来创建进程的包。它提供了本地和远程并发,通过使用子进程而不是线程来有效规避全局解释器锁(GIL)。正因如此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。

关键概念

  • 进程: 执行中的程序实例。每个进程都有自己独立的内存空间,这意味着变量默认情况下不会在进程之间共享。
  • 池: 一组工作进程的集合。使用池比手动创建和销毁进程更高效,尤其是在你需要执行许多任务时。
  • 队列: 一种允许进程通过发送和接收消息进行通信的数据结构。
  • 管道: 两个进程之间的连接,允许它们发送和接收数据。
  • 锁: 一种同步原语,防止多个进程同时访问共享资源,避免竞态条件。
  • 值和数组: 多个进程可以访问的共享内存对象。

为什么使用多进程?

多进程特别适用于:

  • CPU 密集型任务: 那些大部分时间用于执行计算的任务,例如图像处理、科学模拟和数据分析。
  • 可并行化任务: 那些可以分解为更小、独立子任务的任务,这些子任务可以同时执行。
  • 绕过 GIL: CPython 中的全局解释器锁限制了线程的真正并行性。多进程通过使用独立的进程绕过这一限制。

创建进程

使用 multiprocessing 模块的基本方式是创建 Process 对象。每个 Process 对象会在一个单独的进程中运行目标函数。

基本进程创建

import multiprocessing
import timedef worker(num):"""Worker function to be executed in a separate process"""print(f"Worker {num}: Starting")time.sleep(2)  # 模拟一些工作print(f"Worker {num}: Finishing")if __name__ == '__main__':processes = []for i in range(3):p = multiprocessing.Process(target=worker, args=(i,))processes.append(p)p.start()for p in processes:p.join()  # 等待所有进程完成print("All workers finished")

解释:

  1. import multiprocessing: 导入必要的模块。
  2. worker(num): 定义每个进程将执行的函数。它接受一个数字作为参数来标识工作进程。
  3. multiprocessing.Process(target=worker, args=(i,)) : 创建一个 Process 对象,指定目标函数 (worker) 及其参数 (i)。注意 args=(i,) 中的逗号,这是将其作为元组所必需的,即使只有一个元素。
  4. p.start(): 启动进程,导致 worker 函数在一个单独的进程中被执行。
  5. p.join(): 等待进程完成后再继续。这确保了主程序不会在所有工作进程都完成之前退出。
  6. if __name__ == '__main__':: 这至关重要。在一些平台(尤其是 Windows)上,multiprocessing 模块要求主程序的入口点受此条件保护,以避免递归地生成新进程。

向进程传递参数

args 参数允许你向目标函数传递参数。它必须是一个元组。

import multiprocessing
import timedef calculate_square(number, result, square_sum):"""Calculates the square of a number and stores the result in a shared memory array.Also updates the sum of squares in a shared memory value."""print(f"Calculating square of {number}")square = number * numberresult[number] = square  # 将正方形存储在结果数组中with square_sum.get_lock():square_sum.value += squareprint(f"Square of {number} is {square}")if __name__ == '__main__':numbers = range(1, 6)result = multiprocessing.Array('i', len(numbers))  # 'i' for integersquare_sum = multiprocessing.Value('i', 0)processes = []for i, number in enumerate(numbers):p = multiprocessing.Process(target=calculate_square, args=(number, result, square_sum))processes.append(p)p.start()for p in processes:p.join()print("Result:", list(result))print("Sum of squares:", square_sum.value)

解释:

  1. multiprocessing.Array('i', len(numbers)) : 创建一个共享内存的整型数组。第一个参数,'i',指定数据类型(此处为整型)。第二个参数指定数组的大小。
  2. multiprocessing.Value('i', 0): 创建一个整型的共享内存值,初始值为 0。
  3. with square_sum.get_lock():: 在更新共享的 square_sum 值之前获取锁。这防止了竞态条件,即多个进程可能同时尝试更新值,导致结果错误。使用 with 语句确保锁在异常发生时自动释放。

进程名称和 ID

每个进程都有一个名称和进程 ID(PID)。您可以使用 name 和 pid 属性来访问 Process 对象。

import multiprocessing
import osdef worker():print(f"Worker name: {multiprocessing.current_process().name}")print(f"Worker PID: {os.getpid()}")if __name__ == '__main__':p = multiprocessing.Process(target=worker, name="MyWorkerProcess")p.start()p.join()

解释:

  1. multiprocessing.current_process() : 返回代表当前进程的 Process 对象。
  2. os.getpid(): 返回当前进程的进程 ID。这是获取 PID 的另一种方法,使用 os 模块。
  3. name="MyWorkerProcess": 在创建进程时设置进程的名称。如果你不指定名称,它将默认为类似"Process-1"的名称。

进程池

进程池管理一组工作进程。当你有许多任务需要执行时,使用进程池通常比手动创建和销毁进程更高效。

使用 Pool.apply()

apply() 方法在一个工作进程中执行带有给定参数的函数,直到结果准备好才会解除阻塞。

import multiprocessing
import timedef cube(x):"""Calculates the cube of a number"""time.sleep(1)  # 模拟一些工作return x * x * xif __name__ == '__main__':pool = multiprocessing.Pool(processes=4)  # 创建一个由 4 个工作进程组成的池numbers = [1, 2, 3, 4, 5]results = []for num in numbers:result = pool.apply(cube, args=(num,))results.append(result)pool.close()  # 阻止将更多任务提交到池中pool.join()   # 等待所有任务完成print("Results:", results)

解释:

  1. multiprocessing.Pool(processes=4) : 创建一个包含 4 个工作进程的进程池。参数 processes 指定要使用的工作进程数量。如果省略,则默认为系统中的 CPU 数量。
  2. pool.apply(cube, args=(num,)): 在其中一个工作进程中应用 cube 函数到参数 num。它会阻塞直到结果可用。
  3. pool.close(): 防止向进程池提交更多任务。在调用 join() 之前必须调用 close()
  4. pool.join(): 等待池中的所有任务完成。

使用 Pool.map()

map() 方法将一个函数应用于可迭代对象中的每个元素,并返回一个包含结果列表。这是一种并行化简单循环的便捷方式。

import multiprocessing
import timedef square(x):"""Calculates the square of a number"""time.sleep(1)  # Simulate some workreturn x * xif __name__ == '__main__':pool = multiprocessing.Pool(processes=4)numbers = [1, 2, 3, 4, 5]results = pool.map(square, numbers)  # Apply the square function to each number in parallelpool.close()pool.join()print("Results:", results)

解释:

  1. pool.map(square, numbers): 将函数 square 并行应用于 `numbers 列表中的每个数字。它返回一个包含结果的列表。结果的顺序与输入可迭代对象的顺序相对应。`

使用 Pool.imap()

imap() 方法与 map() 类似,但它返回一个迭代器,该迭代器在结果可用时产生结果。如果你不需要一次性获得所有结果,这种方式可能更高效。

import multiprocessing
import timedef double(x):"""Calculates double of a number"""time.sleep(1)  # Simulate some workreturn x * 2if __name__ == '__main__':pool = multiprocessing.Pool(processes=4)numbers = [1, 2, 3, 4, 5]results = pool.imap(double, numbers)  # Returns an iteratorfor result in results:print("Result:", result)pool.close()pool.join()

解释:

  1. pool.imap(double, numbers):将 double 函数应用于 numbers 列表中的每个数字,并返回一个迭代器。
  2. for result in results::在结果可用时进行迭代。这避免了在处理第一个结果之前等待所有任务完成。

使用 Pool.apply_async() 和 Pool.map_async()

这些是 apply() 和 map() 的异步版本。它们返回一个 AsyncResult 对象,允许你检查任务状态并在稍后获取结果。

import multiprocessing
import timedef triple(x):"""Calculates the triple of a number"""time.sleep(1)  # Simulate some workreturn x * 3if __name__ == '__main__':pool = multiprocessing.Pool(processes=4)numbers = [1, 2, 3, 4, 5]# Asynchronous mapresult = pool.map_async(triple, numbers)pool.close()pool.join()print("Results:", result.get())  # Get the results

解释:

  1. result = pool.map_async(triple, numbers) : 异步地将 triple 函数应用于 numbers 列表中的每个数字。它立即返回一个 AsyncResult 对象,而无需等待任务完成。
  2. result.get(): 等待所有任务完成并返回结果。如果任务已经完成,它会立即返回结果。

进程间通信 (IPC)

由于进程拥有独立的内存空间,你需要使用特殊的机制来在它们之间共享数据。multiprocessing 模块提供了多种实现这一功能的方法。

队列

队列是一种线程和进程安全的在进程间传递消息的方式。

import multiprocessingdef producer(queue):"""Sends messages to the queue"""for i in range(5):message = f"Message {i}"print(f"Producer: Sending {message}")queue.put(message)def consumer(queue):"""Receives messages from the queue"""while True:message = queue.get()if message == "DONE":breakprint(f"Consumer: Received {message}")if __name__ == '__main__':queue = multiprocessing.Queue()producer_process = multiprocessing.Process(target=producer, args=(queue,))consumer_process = multiprocessing.Process(target=consumer, args=(queue,))producer_process.start()consumer_process.start()producer_process.join()queue.put("DONE")  # 发出信号让消费者退出consumer_process.join()print("Done")

解释:

  1. multiprocessing.Queue(): 创建一个进程安全的队列。
  2. queue.put(message): 将消息放入队列。
  3. message = queue.get(): 从队列中检索消息。它会阻塞直到消息可用。
  4. queue.put("DONE"): 发送一个特殊的"DONE"消息以通知消费者进程退出。这是终止等待消息的消费者进程的常用方法。

管道

管道为两个进程提供了一种简单的通信方式。它们通常用于单向通信。

import multiprocessingdef sender(conn, messages):"""Sends messages through the connection"""for message in messages:print(f"Sender: Sending {message}")conn.send(message)conn.close()def receiver(conn):"""Receives messages from the connection"""while True:try:message = conn.recv()print(f"Receiver: Received {message}")except EOFError:break  # 连接已关闭conn.close()if __name__ == '__main__':parent_conn, child_conn = multiprocessing.Pipe()  # Create a pipemessages = ["Hello", "World", "From", "Pipe"]sender_process = multiprocessing.Process(target=sender, args=(child_conn, messages))receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))sender_process.start()receiver_process.start()sender_process.join()receiver_process.join()print("Done")

解释:

  1. parent_conn, child_conn = multiprocessing.Pipe() : 创建一个管道。它返回两个连接对象:parent_conn 和 child_conn
  2. conn.send(message): 通过连接发送消息。
  3. message = conn.recv(): 从连接中接收消息。它会在消息可用时阻塞。
  4. conn.close(): 关闭连接。完成使用后关闭连接很重要。
  5. EOFError: 当 sender 关闭管道的端点时,receiver 捕获 EOFError,表示没有更多消息可以接收。

共享内存

multiprocessing 模块提供了可以被多个进程访问的共享内存对象。这对于共享大量数据很有用。

import multiprocessing
import timedef increment(number, lock):"""Increments the shared number"""for _ in range(100000):with lock:number.value += 1if __name__ == '__main__':number = multiprocessing.Value('i', 0)  # 共享整数值lock = multiprocessing.Lock()  # 锁定同步processes = []for _ in range(3):p = multiprocessing.Process(target=increment, args=(number, lock))processes.append(p)p.start()for p in processes:p.join()print("Final value:", number.value)

解释:

  1. number = multiprocessing.Value('i', 0) : 创建一个整型的共享内存值,初始值为 0。
  2. lock = multiprocessing.Lock(): 创建一个锁,用于同步对共享内存的访问。
  3. with lock:: 在增加共享数字之前获取锁。这可以防止竞态条件。

同步原语

同步原语用于协调对共享资源的访问并防止竞态条件。《multiprocessing》模块提供了多种同步原语,包括:

  • 锁: 一种基本的锁,每次只允许一个进程获取。
  • RLock: 可被同一进程多次获取的可重入锁。
  • 信号量: 一种更通用的同步原语,允许有限数量的进程访问一个资源。
  • 条件: 允许进程等待直到某个条件满足。
  • 事件: 一种简单的信号机制,允许一个进程通知其他进程某个事件已经发生。

之前的示例展示了 Lock 的使用。这里是一个使用 Semaphore 的示例:

import multiprocessing
import time
import randomdef worker(semaphore, worker_id):"""Worker function that acquires and releases a semaphore"""semaphore.acquire()try:print(f"Worker {worker_id}: Acquired semaphore")time.sleep(random.random())  # 模拟一些工作finally:print(f"Worker {worker_id}: Releasing semaphore")semaphore.release()if __name__ == '__main__':semaphore = multiprocessing.Semaphore(2)  # 每次只允许 2 名processes = []for i in range(5):p = multiprocessing.Process(target=worker, args=(semaphore, i))processes.append(p)p.start()for p in processes:p.join()print("All workers finished")

解释:

  1. semaphore = multiprocessing.Semaphore(2) :创建一个信号量,一次只允许 2 个进程获取它。
  2. semaphore.acquire():获取信号量。如果信号量已经达到最大计数,进程将会阻塞,直到另一个进程释放它。
  3. semaphore.release(): 释放信号量,允许另一个进程获取它。
  4. finally:: 确保信号量总是被释放,即使发生异常也是如此。
http://www.dtcms.com/a/267765.html

相关文章:

  • speech_sambert-hifigan_tts_zh-cn_16k的docker部署
  • 【电赛培训】运算放大器、滤波器
  • 关于 JNI 函数逆向(从 Java 到 native)
  • c++文字游戏_闯关打怪
  • 查看linux中steam游戏的兼容性
  • centos8.5安装jdk21详细安装教程
  • 网络编程(二)TCP和UDP
  • BM6 判断链表中是否有环(牛客)
  • 2025年- H92-Lc200-- 64.最小路径和(多维动态规划)--Java版
  • 详解存储单位、内存寻址及数据存储方式
  • Feign调用报“请求方法POST不支持“错误
  • WPF学习笔记(25)MVVM框架与项目实例
  • 基于pcl点云库实现激光雷达数据采集
  • java整合itext pdf实现自定义PDF文件格式导出
  • 调参——optuna
  • Python 面向对象编程(OOP)全面详解:类、对象与 API
  • 【算法刷题记录(简单题)002】字符串字符匹配(java代码实现)
  • 线程池的七个参数设计源于对高并发场景下资源管理、系统稳定性与性能平衡的深刻洞察
  • Policy Gradient【强化学习的数学原理】
  • 【C语言刷题】第十一天:加量加餐继续,代码题训练,融会贯通IO模式
  • JMM--数据原子操作
  • Python asyncio库与GIL之间的关系,是否能够解决核心问题?
  • Spring--循环依赖以及三级缓存详解
  • Linux安装java后没法运行
  • 计算机组成原理《浮点数的存储》
  • Python基础之字典(Dictionary)全面指南
  • 南山科技园的步行
  • Qt项目锻炼——TODO清单(三)
  • 【论文笔记】OctoThinker:突破 Llama 推理瓶颈的中期训练范式
  • 乌邦图(20.04)添加中文拼音(中文输入法)