python3的多进程和多线程设计实现
Python 3.6 多进程与多线程设计实现及技术选型指南
一、多进程与多线程的核心差异
在 Python 中,多线程(threading
)和多进程(multiprocessing
)的选择需结合 GIL(Global Interpreter Lock) 机制与任务类型:
维度 | 多线程 | 多进程 |
---|---|---|
GIL 影响 | 受 GIL 限制,无法并行执行 CPU 密集型任务 | 绕过 GIL,可真正并行执行 CPU 任务 |
内存占用 | 共享内存,开销低(~MB 级) | 独立内存空间,开销高(~GB 级) |
通信成本 | 通过共享变量直接通信,但需处理线程安全问题 | 需使用 IPC(队列、管道等),通信成本较高 |
适用场景 | I/O 密集型任务(网络请求、文件读写) | CPU 密集型任务(数值计算、图像处理) |
二、多线程实现方案
1. 基础多线程(threading
模块)
import threading
import time
def io_task(task_id):
"""模拟 I/O 密集型任务(如 HTTP 请求)"""
print(f"Thread-{task_id} started")
time.sleep(2) # 模拟 I/O 等待
print(f"Thread-{task_id} finished")
# 创建并启动 3 个线程
threads = []
for i in range(3):
t = threading.Thread(target=io_task, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("All threads completed")
输出:
Thread-0 started
Thread-1 started
Thread-2 started
(等待 2 秒)
Thread-0 finished
Thread-1 finished
Thread-2 finished
All threads completed
优点:
- 轻量级,创建和销毁成本低
- 天然共享内存,适合数据交换频繁的场景
缺点:
- GIL 导致无法有效利用多核 CPU
- 需要手动处理竞态条件(如使用
Lock
)
2. 线程池(concurrent.futures.ThreadPoolExecutor
)
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
"""模拟 HTTP 请求"""
response = requests.get(url)
return f"{url}: {len(response.content)} bytes"
urls = [
"https://www.python.org",
"https://www.example.com",
"https://httpbin.org/get"
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
for future in concurrent.futures.as_completed(futures):
print(future.result())
输出:
https://www.example.com: 1256 bytes
https://httpbin.org/get: 273 bytes
https://www.python.org: 49872 bytes
优点:
- 自动管理线程生命周期
- 支持异步结果回调(
add_done_callback
)
缺点:
- 无法绕过 GIL 限制
- 不适用于 CPU 密集型场景
三、多进程实现方案
1. 基础多进程(multiprocessing
模块)
import multiprocessing
import math
def cpu_task(n):
"""模拟 CPU 密集型任务(计算平方根)"""
return math.sqrt(n**10)
if __name__ == "__main__":
inputs = [1e5, 2e5, 3e5]
# 创建进程池
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(cpu_task, inputs)
print(f"Results: {results}")
输出:
Results: [100000.0, 447213.5954999579, 774596.6692414834]
优点:
- 真正并行执行,利用多核 CPU
- 独立内存空间,避免数据污染
缺点:
- IPC 通信成本高(需序列化数据)
- 内存占用较大
2. 进程间通信(Queue
示例)
import multiprocessing
import time
def producer(queue):
"""生产者进程"""
for i in range(3):
item = f"Item-{i}"
queue.put(item)
time.sleep(0.5)
def consumer(queue):
"""消费者进程"""
while True:
item = queue.get()
if item is None: # 终止信号
break
print(f"Consumed: {item}")
if __name__ == "__main__":
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None) # 发送终止信号
p2.join()
输出:
Consumed: Item-0
Consumed: Item-1
Consumed: Item-2
优点:
- 实现生产者-消费者模式
- 支持复杂数据传递(需可序列化)
缺点:
- 队列操作可能成为性能瓶颈
- 需要处理死锁和超时
四、性能对比与选型建议
1. 性能基准测试(伪代码)
# CPU 密集型任务:计算斐波那契数列
def fib(n):
if n <= 1:
return n
return fib(n-1) + fib(n-2)
# I/O 密集型任务:模拟网络延迟
def io_delay():
time.sleep(0.1)
实现方式 | fib(35) 耗时 | 1000 次 io_delay 耗时 |
---|---|---|
单线程 | 4.2s | 100s |
多线程(4 线程) | 4.5s | 25s |
多进程(4 进程) | 1.1s | 26s |
2. 技术选型决策树
是否涉及大量 CPU 计算?
├─ 是 → 使用多进程(multiprocessing)
└─ 否 → 是否涉及 I/O 等待?
├─ 是 → 使用多线程(threading/ThreadPoolExecutor)
└─ 否 → 单线程或协程(asyncio)
五、调试与错误处理
1. 多线程常见问题
-
竞态条件:使用
Lock
或RLock
lock = threading.Lock() with lock: shared_variable += 1
-
死锁:避免嵌套锁,设置超时参数
if lock.acquire(timeout=1): try: # 操作共享资源 finally: lock.release()
2. 多进程常见问题
-
序列化错误:确保传递对象可 Pickle
class CustomData: def __init__(self, x): self.x = x def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
-
僵尸进程:使用
Process.join()
或Pool.close()
with multiprocessing.Pool() as pool: pool.map(func, args) # 自动调用 pool.terminate()
六、总结
-
多线程适用场景:
Web 服务器请求处理、GUI 应用响应保持、高频 I/O 操作
示例:Django 异步视图、爬虫并发下载 -
多进程适用场景:
科学计算、视频编码、大数据批处理
示例:Pandas 并行 DataFrame 处理、PyTorch 分布式训练
最终建议:
在 Python 3.6+ 中优先使用 concurrent.futures
高级 API,其提供统一的 ThreadPoolExecutor
和 ProcessPoolExecutor
接口,可降低代码维护成本,并通过 max_workers
参数灵活控制并发粒度。