当前位置: 首页 > news >正文

【Python】Python并发与并行编程图解

【python】生成器和迭代器详解
【python】协程 (Coroutine) 详解
【python】元类 (Metaclass) 详解
【python】python装饰器-详解
【Python】 Python内存管理

Python并发与并行编程图解

文章目录

  • Python并发与并行编程图解
    • 1. 并发 vs 并行
    • 2. Python并发编程技术对比
    • 3. 多线程执行流程
      • 基础多线程
      • 线程锁和同步
      • 线程池
    • 4. 多进程执行流程
      • 基础多进程
      • 进程间通信
      • 进程池高级用法
    • 5. 异步编程执行流程
      • 基础异步编程
      • 异步生产者消费者模式
    • 6. 线程池工作流程
    • 7. GIL(全局解释器锁)影响
    • 8. 实际应用场景选择
      • 1. 选择合适的工具
      • 2. 避免常见陷阱
      • 3. 资源管理
    • 9. 性能优化决策流程

1. 并发 vs 并行

  • 并发:多个任务交替执行,看起来同时运行(单核)
  • 并行:多个任务真正同时执行(多核)
并发与并行
并发 Concurrency
并行 Parallelism
多个任务交替执行
单核/多核都可
提高资源利用率
多个任务同时执行
需要多核CPU
提高执行速度
看起来同时运行
时间片轮转
真正物理同时
多核并行计算

2. Python并发编程技术对比

Python并发技术
多线程 Threading
多进程 Multiprocessing
异步编程 Asyncio
I/O密集型任务
受GIL限制
共享内存
适合网络请求/文件操作
CPU密集型任务
绕过GIL限制
独立内存空间
适合计算密集型任务
高并发I/O操作
单线程事件循环
协程异步执行
适合Web服务器/爬虫

3. 多线程执行流程

基础多线程

import threading
import timedef worker(name, delay):print(f"Worker {name} started")time.sleep(delay)print(f"Worker {name} finished")# 创建线程
threads = []
for i in range(3):t = threading.Thread(target=worker, args=(f"Thread-{i}", i+1))threads.append(t)t.start()# 等待所有线程完成
for t in threads:t.join()print("All threads completed")

线程锁和同步

import threadingclass Counter:def __init__(self):self.value = 0self.lock = threading.Lock()def increment(self):with self.lock:  # 自动获取和释放锁self.value += 1def increment_counter(counter, times):for _ in range(times):counter.increment()counter = Counter()
threads = []# 创建多个线程同时增加计数器
for _ in range(5):t = threading.Thread(target=increment_counter, args=(counter, 1000))threads.append(t)t.start()for t in threads:t.join()print(f"Final counter value: {counter.value}")  # 应该是5000

线程池

from concurrent.futures import ThreadPoolExecutor
import requests
import timedef download_url(url):try:response = requests.get(url, timeout=5)return f"{url}: {len(response.content)} bytes"except Exception as e:return f"{url}: Error - {e}"urls = ["https://httpbin.org/get","https://httpbin.org/ip","https://httpbin.org/user-agent","https://httpbin.org/headers"
]# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:start_time = time.time()# 方法1: submitfutures = [executor.submit(download_url, url) for url in urls]results = [future.result() for future in futures]# 方法2: map# results = list(executor.map(download_url, urls))end_time = time.time()for result in results:print(result)print(f"Total time: {end_time - start_time:.2f} seconds")
主线程线程1线程2线程3创建并启动线程1创建并启动线程2创建并启动线程3线程并发执行时间片轮转执行任务(可能被中断)执行任务(可能被中断)执行任务(可能被中断)完成任务完成任务完成任务等待所有线程join继续主程序主线程线程1线程2线程3

4. 多进程执行流程

基础多进程

import multiprocessing
import time
import osdef cpu_intensive_task(n):"""模拟CPU密集型任务"""print(f"Process {os.getpid()} working on task {n}")result = sum(i * i for i in range(n))return resultif __name__ == "__main__":numbers = [1000000, 2000000, 3000000, 4000000]# 顺序执行start_time = time.time()results_sequential = [cpu_intensive_task(n) for n in numbers]sequential_time = time.time() - start_time# 并行执行start_time = time.time()with multiprocessing.Pool() as pool:results_parallel = pool.map(cpu_intensive_task, numbers)parallel_time = time.time() - start_timeprint(f"Sequential time: {sequential_time:.2f}s")print(f"Parallel time: {parallel_time:.2f}s")print(f"Speedup: {sequential_time/parallel_time:.2f}x")

进程间通信

import multiprocessing
import time# 使用队列进行进程间通信
def producer(queue, items):for item in items:print(f"Producing {item}")queue.put(item)time.sleep(0.1)queue.put(None)  # 结束信号def consumer(queue, name):while True:item = queue.get()if item is None:queue.put(None)  # 让其他消费者也能结束breakprint(f"Consumer {name} got {item}")time.sleep(0.2)if __name__ == "__main__":queue = multiprocessing.Queue()# 创建生产者和消费者进程producer_process = multiprocessing.Process(target=producer, args=(queue, range(10)))consumers = [multiprocessing.Process(target=consumer, args=(queue, i))for i in range(3)]# 启动所有进程producer_process.start()for c in consumers:c.start()# 等待完成producer_process.join()for c in consumers:c.join()print("All processes completed")

进程池高级用法

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completeddef process_task(data):name, value = datatime.sleep(1)  # 模拟工作return f"Processed {name} with result {value * 2}"if __name__ == "__main__":data = [("task1", 10), ("task2", 20), ("task3", 30), ("task4", 40), ("task5", 50)]with ProcessPoolExecutor(max_workers=3) as executor:# 提交所有任务future_to_name = {executor.submit(process_task, item): item[0] for item in data}# 按完成顺序处理结果for future in as_completed(future_to_name):name = future_to_name[future]try:result = future.result()print(f"Completed: {name} -> {result}")except Exception as e:print(f"Error in {name}: {e}")
主进程
创建进程1
创建进程2
创建进程3
进程1
独立内存空间
进程2
独立内存空间
进程3
独立内存空间
CPU核心1
CPU核心2
CPU核心3
结果1
结果2
结果3
结果收集
最终结果

5. 异步编程执行流程

基础异步编程

import asyncio
import aiohttp
import timeasync def fetch_url(session, url):"""异步获取URL内容"""try:async with session.get(url, timeout=5) as response:data = await response.text()return f"{url}: {len(data)} bytes"except Exception as e:return f"{url}: Error - {e}"async def main():urls = ["https://httpbin.org/get","https://httpbin.org/ip", "https://httpbin.org/user-agent","https://httpbin.org/headers","https://httpbin.org/delay/1",  # 模拟延迟"https://httpbin.org/delay/2"]async with aiohttp.ClientSession() as session:# 创建所有任务tasks = [fetch_url(session, url) for url in urls]# 等待所有任务完成results = await asyncio.gather(*tasks, return_exceptions=True)for result in results:print(result)# 运行异步程序
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"Total time: {end_time - start_time:.2f} seconds")

异步生产者消费者模式

import asyncio
import randomasync def producer(queue, name, count):"""异步生产者"""for i in range(count):item = f"item-{name}-{i}"await asyncio.sleep(random.uniform(0.1, 0.5))  # 模拟生产时间await queue.put(item)print(f"Producer {name} produced {item}")await queue.put(None)  # 结束信号async def consumer(queue, name):"""异步消费者"""while True:item = await queue.get()if item is None:await queue.put(None)  # 让其他消费者也能结束breakprint(f"Consumer {name} consumed {item}")await asyncio.sleep(random.uniform(0.2, 0.8))  # 模拟处理时间queue.task_done()async def main():queue = asyncio.Queue(maxsize=5)# 创建生产者和消费者任务producers = [asyncio.create_task(producer(queue, "A", 5)),asyncio.create_task(producer(queue, "B", 3))]consumers = [asyncio.create_task(consumer(queue, "X")),asyncio.create_task(consumer(queue, "Y"))]# 等待所有生产者完成await asyncio.gather(*producers)# 等待队列清空await queue.join()# 取消消费者任务for c in consumers:c.cancel()asyncio.run(main())
创建任务1
创建任务2
创建任务3
遇到I/O操作
遇到I/O操作
继续执行
I/O完成
I/O完成
执行完毕
执行完毕
执行完毕
事件循环开始
任务1
任务2
任务3
等待I/O1
等待I/O2
任务1完成
任务2完成
任务3完成

6. 线程池工作流程

线程池管理
任务队列
线程池管理器
工作线程1
工作线程2
工作线程3
工作线程4
结果1
结果2
结果3
结果4
结果收集器
最终输出

7. GIL(全局解释器锁)影响

GIL全局解释器锁
影响
限制多线程并行执行CPU代码
同一时间只有一个线程执行Python字节码
简化内存管理
避免竞争条件
使用多进程绕过GIL
使用C扩展
使用异步编程
适用场景
I/O密集型: 多线程有效
CPU密集型: 多进程更佳

8. 实际应用场景选择

1. 选择合适的工具

  • I/O密集型:使用 asyncio 或多线程
  • CPU密集型:使用多进程
  • 混合型:结合使用多种技术

2. 避免常见陷阱

# 错误:在多进程中使用lambda
# pool.map(lambda x: x*2, range(10))  # 可能无法序列化# 正确:使用普通函数或functools.partial
def double(x):return x * 2pool.map(double, range(10))

3. 资源管理

import concurrent.futures
import contextlib@contextlib.contextmanager
def timing(description):start = time.time()yieldelapsed = time.time() - startprint(f"{description}: {elapsed:.2f}s")# 使用上下文管理器确保资源清理
with timing("Thread pool execution"):with ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(io_bound_task, url) for url in urls]results = [f.result() for f in concurrent.futures.as_completed(futures)]
I/O密集型
CPU密集型
混合型
选择并发方案
任务类型?
I/O密集型
CPU密集型
混合型
网络请求
文件操作
数据库查询
解决方案: 多线程/异步
数学计算
图像处理
数据分析
解决方案: 多进程
Web服务器
科学计算
解决方案: 混合使用
推荐: asyncio > threading
推荐: multiprocessing
推荐: 进程池 + 线程池

9. 性能优化决策流程

主要瓶颈是I/O等待?
使用异步编程 asyncio
需要真正并行计算?
使用多进程 multiprocessing
任务数量多但每个任务轻量?
使用线程池 ThreadPoolExecutor
考虑顺序执行或少量线程
实施并测试性能
性能满意?
完成
调整参数并重新测试
http://www.dtcms.com/a/581279.html

相关文章:

  • 清城网站seodiscuz自适应模板
  • 优秀网页设计网站是wordpress php开发
  • 内部网关协议——OSPF 协议(开放最短路径优先)(链路状态路由协议)
  • rman-08137:warning:archived log not deleted
  • 专业的开发网站建设价格虚拟云电脑
  • [Linux——Lesson21.进程信号:信号概念 信号的产生]
  • 浙江英文网站建设嘉兴高档网站建设
  • ERP与WMS一体化构建方案
  • python+django/flask的眼科患者随访管理系统 AI智能模型
  • 实战案例:用 Guava ImmutableList 优化缓存查询系统,解决多线程数据篡改与内存浪费问题
  • AR短视频SDK,打造差异化竞争壁垒
  • 什么是AR人脸特效sdk?
  • Angular由一个bug说起之二十:Table lazy load:防止重复渲染
  • 从0到1做一个“字母拼词”Unity小游戏(含源码/GIF)- 字母拼词正确错误判断
  • 网站建设自查情况报告做淘宝联盟网站要多少钱?
  • 重新思考 weapp-tailwindcss 的未来
  • RuoYi .net-实现商城秒杀下单(redis,rabbitmq)
  • Langchain 和LangGraph 为何是AI智能体开发的核心技术
  • C++与C#布尔类型深度解析:从语言设计到跨平台互操作
  • 贵阳 网站建设设计企业门户网站
  • Rust 练习册 :Matching Brackets与栈数据结构
  • Java基础——常用算法3
  • 【JAVA 进阶】SpringAI人工智能框架深度解析:从理论到实战的企业级AI应用开发指南
  • 对话百胜软件产品经理CC:胜券POS如何用“一个APP”,撬动智慧零售的万千场景?
  • 用ps怎么做短视频网站建立网站的步骤 实湖南岚鸿
  • wordpress使用latex乱码长沙优化网站厂家
  • 【uniapp】解决小程序分包下的json文件编译后生成到主包的问题
  • MySQL-5-触发器和储存过程
  • HTTPS是什么端口?443端口的工作原理与网络安全重要性
  • 从零搭建一个 PHP 登录注册系统(含完整源码)