【Python】Python并发与并行编程图解
【python】生成器和迭代器详解
【python】协程 (Coroutine) 详解
【python】元类 (Metaclass) 详解
【python】python装饰器-详解
【Python】 Python内存管理
Python并发与并行编程图解
文章目录
- Python并发与并行编程图解
- 1. 并发 vs 并行
- 2. Python并发编程技术对比
- 3. 多线程执行流程
- 基础多线程
- 线程锁和同步
- 线程池
- 4. 多进程执行流程
- 基础多进程
- 进程间通信
- 进程池高级用法
- 5. 异步编程执行流程
- 基础异步编程
- 异步生产者消费者模式
- 6. 线程池工作流程
- 7. GIL(全局解释器锁)影响
- 8. 实际应用场景选择
- 1. 选择合适的工具
- 2. 避免常见陷阱
- 3. 资源管理
- 9. 性能优化决策流程
1. 并发 vs 并行
- 并发:多个任务交替执行,看起来同时运行(单核)
- 并行:多个任务真正同时执行(多核)
2. Python并发编程技术对比
3. 多线程执行流程
基础多线程
import threading
import timedef worker(name, delay):print(f"Worker {name} started")time.sleep(delay)print(f"Worker {name} finished")# 创建线程
threads = []
for i in range(3):t = threading.Thread(target=worker, args=(f"Thread-{i}", i+1))threads.append(t)t.start()# 等待所有线程完成
for t in threads:t.join()print("All threads completed")
线程锁和同步
import threadingclass Counter:def __init__(self):self.value = 0self.lock = threading.Lock()def increment(self):with self.lock: # 自动获取和释放锁self.value += 1def increment_counter(counter, times):for _ in range(times):counter.increment()counter = Counter()
threads = []# 创建多个线程同时增加计数器
for _ in range(5):t = threading.Thread(target=increment_counter, args=(counter, 1000))threads.append(t)t.start()for t in threads:t.join()print(f"Final counter value: {counter.value}") # 应该是5000
线程池
from concurrent.futures import ThreadPoolExecutor
import requests
import timedef download_url(url):try:response = requests.get(url, timeout=5)return f"{url}: {len(response.content)} bytes"except Exception as e:return f"{url}: Error - {e}"urls = ["https://httpbin.org/get","https://httpbin.org/ip","https://httpbin.org/user-agent","https://httpbin.org/headers"
]# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:start_time = time.time()# 方法1: submitfutures = [executor.submit(download_url, url) for url in urls]results = [future.result() for future in futures]# 方法2: map# results = list(executor.map(download_url, urls))end_time = time.time()for result in results:print(result)print(f"Total time: {end_time - start_time:.2f} seconds")
4. 多进程执行流程
基础多进程
import multiprocessing
import time
import osdef cpu_intensive_task(n):"""模拟CPU密集型任务"""print(f"Process {os.getpid()} working on task {n}")result = sum(i * i for i in range(n))return resultif __name__ == "__main__":numbers = [1000000, 2000000, 3000000, 4000000]# 顺序执行start_time = time.time()results_sequential = [cpu_intensive_task(n) for n in numbers]sequential_time = time.time() - start_time# 并行执行start_time = time.time()with multiprocessing.Pool() as pool:results_parallel = pool.map(cpu_intensive_task, numbers)parallel_time = time.time() - start_timeprint(f"Sequential time: {sequential_time:.2f}s")print(f"Parallel time: {parallel_time:.2f}s")print(f"Speedup: {sequential_time/parallel_time:.2f}x")
进程间通信
import multiprocessing
import time# 使用队列进行进程间通信
def producer(queue, items):for item in items:print(f"Producing {item}")queue.put(item)time.sleep(0.1)queue.put(None) # 结束信号def consumer(queue, name):while True:item = queue.get()if item is None:queue.put(None) # 让其他消费者也能结束breakprint(f"Consumer {name} got {item}")time.sleep(0.2)if __name__ == "__main__":queue = multiprocessing.Queue()# 创建生产者和消费者进程producer_process = multiprocessing.Process(target=producer, args=(queue, range(10)))consumers = [multiprocessing.Process(target=consumer, args=(queue, i))for i in range(3)]# 启动所有进程producer_process.start()for c in consumers:c.start()# 等待完成producer_process.join()for c in consumers:c.join()print("All processes completed")
进程池高级用法
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completeddef process_task(data):name, value = datatime.sleep(1) # 模拟工作return f"Processed {name} with result {value * 2}"if __name__ == "__main__":data = [("task1", 10), ("task2", 20), ("task3", 30), ("task4", 40), ("task5", 50)]with ProcessPoolExecutor(max_workers=3) as executor:# 提交所有任务future_to_name = {executor.submit(process_task, item): item[0] for item in data}# 按完成顺序处理结果for future in as_completed(future_to_name):name = future_to_name[future]try:result = future.result()print(f"Completed: {name} -> {result}")except Exception as e:print(f"Error in {name}: {e}")
5. 异步编程执行流程
基础异步编程
import asyncio
import aiohttp
import timeasync def fetch_url(session, url):"""异步获取URL内容"""try:async with session.get(url, timeout=5) as response:data = await response.text()return f"{url}: {len(data)} bytes"except Exception as e:return f"{url}: Error - {e}"async def main():urls = ["https://httpbin.org/get","https://httpbin.org/ip", "https://httpbin.org/user-agent","https://httpbin.org/headers","https://httpbin.org/delay/1", # 模拟延迟"https://httpbin.org/delay/2"]async with aiohttp.ClientSession() as session:# 创建所有任务tasks = [fetch_url(session, url) for url in urls]# 等待所有任务完成results = await asyncio.gather(*tasks, return_exceptions=True)for result in results:print(result)# 运行异步程序
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"Total time: {end_time - start_time:.2f} seconds")
异步生产者消费者模式
import asyncio
import randomasync def producer(queue, name, count):"""异步生产者"""for i in range(count):item = f"item-{name}-{i}"await asyncio.sleep(random.uniform(0.1, 0.5)) # 模拟生产时间await queue.put(item)print(f"Producer {name} produced {item}")await queue.put(None) # 结束信号async def consumer(queue, name):"""异步消费者"""while True:item = await queue.get()if item is None:await queue.put(None) # 让其他消费者也能结束breakprint(f"Consumer {name} consumed {item}")await asyncio.sleep(random.uniform(0.2, 0.8)) # 模拟处理时间queue.task_done()async def main():queue = asyncio.Queue(maxsize=5)# 创建生产者和消费者任务producers = [asyncio.create_task(producer(queue, "A", 5)),asyncio.create_task(producer(queue, "B", 3))]consumers = [asyncio.create_task(consumer(queue, "X")),asyncio.create_task(consumer(queue, "Y"))]# 等待所有生产者完成await asyncio.gather(*producers)# 等待队列清空await queue.join()# 取消消费者任务for c in consumers:c.cancel()asyncio.run(main())
6. 线程池工作流程
7. GIL(全局解释器锁)影响
8. 实际应用场景选择
1. 选择合适的工具
- I/O密集型:使用
asyncio或多线程 - CPU密集型:使用多进程
- 混合型:结合使用多种技术
2. 避免常见陷阱
# 错误:在多进程中使用lambda
# pool.map(lambda x: x*2, range(10)) # 可能无法序列化# 正确:使用普通函数或functools.partial
def double(x):return x * 2pool.map(double, range(10))
3. 资源管理
import concurrent.futures
import contextlib@contextlib.contextmanager
def timing(description):start = time.time()yieldelapsed = time.time() - startprint(f"{description}: {elapsed:.2f}s")# 使用上下文管理器确保资源清理
with timing("Thread pool execution"):with ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(io_bound_task, url) for url in urls]results = [f.result() for f in concurrent.futures.as_completed(futures)]
