Day05:Python中的并发和并行(3)
使用 multiprocessing
模块实现多进程
Python 中的 multiprocessing
模块提供了一种创建进程的方式,允许你并行运行代码。这对于受全局解释器锁(GIL)限制的多线程可能有限的 CPU 密集型任务特别有用。通过利用多个核心,multiprocessing 可以显著提高计算密集型操作的性能。本课程将涵盖使用 multiprocessing
模块的基础知识,包括进程创建、通信和同步。
理解 multiprocessing
模块
multiprocessing
模块是一个支持使用与 threading
模块相似的 API 来创建进程的包。它提供了本地和远程并发,通过使用子进程而不是线程来有效规避全局解释器锁(GIL)。正因如此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。
关键概念
- 进程: 执行中的程序实例。每个进程都有自己独立的内存空间,这意味着变量默认情况下不会在进程之间共享。
- 池: 一组工作进程的集合。使用池比手动创建和销毁进程更高效,尤其是在你需要执行许多任务时。
- 队列: 一种允许进程通过发送和接收消息进行通信的数据结构。
- 管道: 两个进程之间的连接,允许它们发送和接收数据。
- 锁: 一种同步原语,防止多个进程同时访问共享资源,避免竞态条件。
- 值和数组: 多个进程可以访问的共享内存对象。
为什么使用多进程?
多进程特别适用于:
- CPU 密集型任务: 那些大部分时间用于执行计算的任务,例如图像处理、科学模拟和数据分析。
- 可并行化任务: 那些可以分解为更小、独立子任务的任务,这些子任务可以同时执行。
- 绕过 GIL: CPython 中的全局解释器锁限制了线程的真正并行性。多进程通过使用独立的进程绕过这一限制。
创建进程
使用 multiprocessing
模块的基本方式是创建 Process
对象。每个 Process
对象会在一个单独的进程中运行目标函数。
基本进程创建
import multiprocessing
import timedef worker(num):"""Worker function to be executed in a separate process"""print(f"Worker {num}: Starting")time.sleep(2) # 模拟一些工作print(f"Worker {num}: Finishing")if __name__ == '__main__':processes = []for i in range(3):p = multiprocessing.Process(target=worker, args=(i,))processes.append(p)p.start()for p in processes:p.join() # 等待所有进程完成print("All workers finished")
解释:
import multiprocessing
: 导入必要的模块。worker(num)
: 定义每个进程将执行的函数。它接受一个数字作为参数来标识工作进程。multiprocessing.Process(target=worker, args=(i,))
: 创建一个Process
对象,指定目标函数 (worker
) 及其参数 (i
)。注意args=(i,)
中的逗号,这是将其作为元组所必需的,即使只有一个元素。p.start()
: 启动进程,导致worker
函数在一个单独的进程中被执行。p.join()
: 等待进程完成后再继续。这确保了主程序不会在所有工作进程都完成之前退出。if __name__ == '__main__':
: 这至关重要。在一些平台(尤其是 Windows)上,multiprocessing
模块要求主程序的入口点受此条件保护,以避免递归地生成新进程。
向进程传递参数
args
参数允许你向目标函数传递参数。它必须是一个元组。
import multiprocessing
import timedef calculate_square(number, result, square_sum):"""Calculates the square of a number and stores the result in a shared memory array.Also updates the sum of squares in a shared memory value."""print(f"Calculating square of {number}")square = number * numberresult[number] = square # 将正方形存储在结果数组中with square_sum.get_lock():square_sum.value += squareprint(f"Square of {number} is {square}")if __name__ == '__main__':numbers = range(1, 6)result = multiprocessing.Array('i', len(numbers)) # 'i' for integersquare_sum = multiprocessing.Value('i', 0)processes = []for i, number in enumerate(numbers):p = multiprocessing.Process(target=calculate_square, args=(number, result, square_sum))processes.append(p)p.start()for p in processes:p.join()print("Result:", list(result))print("Sum of squares:", square_sum.value)
解释:
multiprocessing.Array('i', len(numbers))
: 创建一个共享内存的整型数组。第一个参数,'i'
,指定数据类型(此处为整型)。第二个参数指定数组的大小。multiprocessing.Value('i', 0)
: 创建一个整型的共享内存值,初始值为 0。with square_sum.get_lock():
: 在更新共享的square_sum
值之前获取锁。这防止了竞态条件,即多个进程可能同时尝试更新值,导致结果错误。使用with
语句确保锁在异常发生时自动释放。
进程名称和 ID
每个进程都有一个名称和进程 ID(PID)。您可以使用 name
和 pid
属性来访问 Process
对象。
import multiprocessing
import osdef worker():print(f"Worker name: {multiprocessing.current_process().name}")print(f"Worker PID: {os.getpid()}")if __name__ == '__main__':p = multiprocessing.Process(target=worker, name="MyWorkerProcess")p.start()p.join()
解释:
multiprocessing.current_process()
: 返回代表当前进程的Process
对象。os.getpid()
: 返回当前进程的进程 ID。这是获取 PID 的另一种方法,使用os
模块。name="MyWorkerProcess"
: 在创建进程时设置进程的名称。如果你不指定名称,它将默认为类似"Process-1"的名称。
进程池
进程池管理一组工作进程。当你有许多任务需要执行时,使用进程池通常比手动创建和销毁进程更高效。
使用 Pool.apply()
apply()
方法在一个工作进程中执行带有给定参数的函数,直到结果准备好才会解除阻塞。
import multiprocessing
import timedef cube(x):"""Calculates the cube of a number"""time.sleep(1) # 模拟一些工作return x * x * xif __name__ == '__main__':pool = multiprocessing.Pool(processes=4) # 创建一个由 4 个工作进程组成的池numbers = [1, 2, 3, 4, 5]results = []for num in numbers:result = pool.apply(cube, args=(num,))results.append(result)pool.close() # 阻止将更多任务提交到池中pool.join() # 等待所有任务完成print("Results:", results)
解释:
multiprocessing.Pool(processes=4)
: 创建一个包含 4 个工作进程的进程池。参数processes
指定要使用的工作进程数量。如果省略,则默认为系统中的 CPU 数量。pool.apply(cube, args=(num,))
: 在其中一个工作进程中应用cube
函数到参数num
。它会阻塞直到结果可用。pool.close()
: 防止向进程池提交更多任务。在调用join()
之前必须调用close()
。pool.join()
: 等待池中的所有任务完成。
使用 Pool.map()
map()
方法将一个函数应用于可迭代对象中的每个元素,并返回一个包含结果列表。这是一种并行化简单循环的便捷方式。
import multiprocessing
import timedef square(x):"""Calculates the square of a number"""time.sleep(1) # Simulate some workreturn x * xif __name__ == '__main__':pool = multiprocessing.Pool(processes=4)numbers = [1, 2, 3, 4, 5]results = pool.map(square, numbers) # Apply the square function to each number in parallelpool.close()pool.join()print("Results:", results)
解释:
pool.map(square, numbers)
: 将函数square 并行应用于 `numbers 列表中的每个数字。它返回一个包含结果的列表。结果的顺序与输入可迭代对象的顺序相对应。`
使用 Pool.imap()
imap()
方法与 map()
类似,但它返回一个迭代器,该迭代器在结果可用时产生结果。如果你不需要一次性获得所有结果,这种方式可能更高效。
import multiprocessing
import timedef double(x):"""Calculates double of a number"""time.sleep(1) # Simulate some workreturn x * 2if __name__ == '__main__':pool = multiprocessing.Pool(processes=4)numbers = [1, 2, 3, 4, 5]results = pool.imap(double, numbers) # Returns an iteratorfor result in results:print("Result:", result)pool.close()pool.join()
解释:
pool.imap(double, numbers)
:将double
函数应用于numbers
列表中的每个数字,并返回一个迭代器。for result in results:
:在结果可用时进行迭代。这避免了在处理第一个结果之前等待所有任务完成。
使用 Pool.apply_async()
和 Pool.map_async()
这些是 apply()
和 map()
的异步版本。它们返回一个 AsyncResult
对象,允许你检查任务状态并在稍后获取结果。
import multiprocessing
import timedef triple(x):"""Calculates the triple of a number"""time.sleep(1) # Simulate some workreturn x * 3if __name__ == '__main__':pool = multiprocessing.Pool(processes=4)numbers = [1, 2, 3, 4, 5]# Asynchronous mapresult = pool.map_async(triple, numbers)pool.close()pool.join()print("Results:", result.get()) # Get the results
解释:
result = pool.map_async(triple, numbers)
: 异步地将triple
函数应用于numbers
列表中的每个数字。它立即返回一个AsyncResult
对象,而无需等待任务完成。result.get()
: 等待所有任务完成并返回结果。如果任务已经完成,它会立即返回结果。
进程间通信 (IPC)
由于进程拥有独立的内存空间,你需要使用特殊的机制来在它们之间共享数据。multiprocessing
模块提供了多种实现这一功能的方法。
队列
队列是一种线程和进程安全的在进程间传递消息的方式。
import multiprocessingdef producer(queue):"""Sends messages to the queue"""for i in range(5):message = f"Message {i}"print(f"Producer: Sending {message}")queue.put(message)def consumer(queue):"""Receives messages from the queue"""while True:message = queue.get()if message == "DONE":breakprint(f"Consumer: Received {message}")if __name__ == '__main__':queue = multiprocessing.Queue()producer_process = multiprocessing.Process(target=producer, args=(queue,))consumer_process = multiprocessing.Process(target=consumer, args=(queue,))producer_process.start()consumer_process.start()producer_process.join()queue.put("DONE") # 发出信号让消费者退出consumer_process.join()print("Done")
解释:
multiprocessing.Queue()
: 创建一个进程安全的队列。queue.put(message)
: 将消息放入队列。message = queue.get()
: 从队列中检索消息。它会阻塞直到消息可用。queue.put("DONE")
: 发送一个特殊的"DONE"消息以通知消费者进程退出。这是终止等待消息的消费者进程的常用方法。
管道
管道为两个进程提供了一种简单的通信方式。它们通常用于单向通信。
import multiprocessingdef sender(conn, messages):"""Sends messages through the connection"""for message in messages:print(f"Sender: Sending {message}")conn.send(message)conn.close()def receiver(conn):"""Receives messages from the connection"""while True:try:message = conn.recv()print(f"Receiver: Received {message}")except EOFError:break # 连接已关闭conn.close()if __name__ == '__main__':parent_conn, child_conn = multiprocessing.Pipe() # Create a pipemessages = ["Hello", "World", "From", "Pipe"]sender_process = multiprocessing.Process(target=sender, args=(child_conn, messages))receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))sender_process.start()receiver_process.start()sender_process.join()receiver_process.join()print("Done")
解释:
parent_conn, child_conn = multiprocessing.Pipe()
: 创建一个管道。它返回两个连接对象:parent_conn
和child_conn
。conn.send(message)
: 通过连接发送消息。message = conn.recv()
: 从连接中接收消息。它会在消息可用时阻塞。conn.close()
: 关闭连接。完成使用后关闭连接很重要。EOFError
: 当sender
关闭管道的端点时,receiver
捕获EOFError
,表示没有更多消息可以接收。
共享内存
multiprocessing
模块提供了可以被多个进程访问的共享内存对象。这对于共享大量数据很有用。
import multiprocessing
import timedef increment(number, lock):"""Increments the shared number"""for _ in range(100000):with lock:number.value += 1if __name__ == '__main__':number = multiprocessing.Value('i', 0) # 共享整数值lock = multiprocessing.Lock() # 锁定同步processes = []for _ in range(3):p = multiprocessing.Process(target=increment, args=(number, lock))processes.append(p)p.start()for p in processes:p.join()print("Final value:", number.value)
解释:
number = multiprocessing.Value('i', 0)
: 创建一个整型的共享内存值,初始值为 0。lock = multiprocessing.Lock()
: 创建一个锁,用于同步对共享内存的访问。with lock:
: 在增加共享数字之前获取锁。这可以防止竞态条件。
同步原语
同步原语用于协调对共享资源的访问并防止竞态条件。《multiprocessing
》模块提供了多种同步原语,包括:
- 锁: 一种基本的锁,每次只允许一个进程获取。
- RLock: 可被同一进程多次获取的可重入锁。
- 信号量: 一种更通用的同步原语,允许有限数量的进程访问一个资源。
- 条件: 允许进程等待直到某个条件满足。
- 事件: 一种简单的信号机制,允许一个进程通知其他进程某个事件已经发生。
之前的示例展示了 Lock
的使用。这里是一个使用 Semaphore
的示例:
import multiprocessing
import time
import randomdef worker(semaphore, worker_id):"""Worker function that acquires and releases a semaphore"""semaphore.acquire()try:print(f"Worker {worker_id}: Acquired semaphore")time.sleep(random.random()) # 模拟一些工作finally:print(f"Worker {worker_id}: Releasing semaphore")semaphore.release()if __name__ == '__main__':semaphore = multiprocessing.Semaphore(2) # 每次只允许 2 名processes = []for i in range(5):p = multiprocessing.Process(target=worker, args=(semaphore, i))processes.append(p)p.start()for p in processes:p.join()print("All workers finished")
解释:
semaphore = multiprocessing.Semaphore(2)
:创建一个信号量,一次只允许 2 个进程获取它。semaphore.acquire()
:获取信号量。如果信号量已经达到最大计数,进程将会阻塞,直到另一个进程释放它。semaphore.release()
: 释放信号量,允许另一个进程获取它。finally:
: 确保信号量总是被释放,即使发生异常也是如此。