协程:单线程并发开发的高效利器
一、什么是协程?
协程(Coroutine),又称微线程(Microthread),是一种比线程更轻量的用户态多任务调度方式。它运行在单个线程中,由开发者控制其执行流程,而非操作系统内核进行调度。
特点:
- 轻量级:创建和切换的成本远低于线程。
- 协作式调度:协程主动让出 CPU,不会被强制中断。
- 自带上下文:每个协程保存自己的寄存器状态和栈信息。
- 适用于 I/O 密集型任务:如网络请求、文件读写等。
注意:
- 线程/进程的切换由操作系统完成,而协程的切换由程序自己控制。
- 协程不是并行的,而是通过协作实现伪并发,在单线程中交替执行多个任务。
二、协程与线程的区别
对比项 | 进程 | 线程 | 协程 |
---|---|---|---|
资源占用 | 最大 | 中等 | 最小 |
切换开销 | 大(需系统调用) | 中等(需系统调用) | 小(用户态切换) |
调度机制 | 操作系统抢占式调度 | 操作系统抢占式调度 | 用户代码协作式调度 |
并发能力 | 多核并行 | 多任务并发 | 单线程多任务“并发” |
使用场景 | CPU密集型任务 | I/O密集型任务 | 高并发 I/O 任务 |
三、协程的基本原理
协程的本质是在函数执行过程中可以暂停并恢复执行,并通过事件循环来管理这些协程的调度。
核心机制:
- 挂起(Yield):当前协程主动让出 CPU。
- 恢复(Resume):当某个外部事件(如 I/O 完成)完成后,协程被重新唤醒继续执行。
- 事件循环(Event Loop):负责监听事件并调度协程执行。
四、协程的基本实现
1. 使用生成器模拟协程
理解原理:
实现边下载变处理文件
为了更好地演示协程的概念,我们采用一个更加实际的例子:模拟两个任务,一个是下载文件(download_file
),另一个是处理数据(process_data
)。我们将看到如何通过生成器实现任务间的简单切换。
def download_file():for i in range(3):print(f"正在下载文件... {i+1}/3")yield # 让出CPU,模拟I/O等待time.sleep(1) # 模拟下载耗时def process_data():for i in range(3):print(f"正在处理数据... {i+1}/3")yield # 让出CPU,模拟处理过程中的等待time.sleep(1) # 模拟处理耗时if __name__ == "__main__":d = download_file()p = process_data()while True:try:next(d)next(p)except StopIteration:break
说明:
- 此示例展示了如何利用生成器实现简单的协程行为。
yield
关键字用于让出控制权,并允许其他任务运行。- 这种方式需要手动切换任务,适合学习目的。
2. 使用 greenlet 实现协程(手动切换)
安装:
pip install greenlet
特点:
- 支持任意函数间切换。
- 需要手动调用 .switch() 控制切换顺序。
❌ 缺点:
- 不支持自动切换。
- 遇到阻塞(如 time.sleep())时仍会阻塞整个线程。
greenlet
提供了更为直接的协程控制方法,但仍然需要手动切换任务。这里我们使用一个更贴近实际应用的例子:模拟两个不同的服务端点请求(fetch_endpoint_a
和 fetch_endpoint_b
)。
- 实现了模拟I/O阻塞请求时切换到另一个任务
- sleep也就是模拟的I/O阻塞,也就是I/O阻塞结束后,又切换回任务
from greenlet import greenlet
import timedef fetch_endpoint_a():print("开始请求Endpoint A...")gr2.switch() # 切换到Btime.sleep(1) # 模拟请求耗时print("从Endpoint A获取的数据已准备好")def fetch_endpoint_b():print("开始请求Endpoint B...")time.sleep(1) # 模拟请求耗时print("从Endpoint B获取的数据已准备好")gr1.switch() # 返回Agr1 = greenlet(fetch_endpoint_a)
gr2 = greenlet(fetch_endpoint_b)gr1.switch() # 启动第一个协程
调用方式
- 如果在上面的示例中我们调用方式不是gr1.switch()
- 而是直接调用函数fetch_endpoint_a(),他的结果会不同
- switch是协程入口会进入协程的上下文,而直接调用函数是普通函数的调用不会进入协程上下文。
五、gevent 实现协程(自动切换)
✅ 说明:
- 只要创建了协程,并且创建协程的线程没有退出,那么他就会被加入事件循环中自动执行。
gevent.spawn() 创建协程对象。
gevent.sleep() 可以触发自动切换。gevent.join:阻塞主线程,等待该协程完成,也是开启协程
gevent.joinall() 阻塞主线程,等待所有协程完成。 是一个list列表
gevent
是基于 greenlet
的高级封装库,提供了自动切换的能力,尤其擅长处理 I/O 密集型任务。
安装:
pip install gevent
# 推荐使用国内镜像加速安装
pip install gevent -i https://pypi.tuna.tsinghua.edu.cn/simple --default-timeout=100
示例代码:
join方式:
- 这里即使你只调用g1.join,他也会进行自动切换协程的。
import gevent
import timedef fetch_data(endpoint_name):for i in range(3):print(f"{endpoint_name}正在请求数据... {i+1}/3")gevent.sleep(1) # 使用gevent.sleep()以支持自动切换print(f"{endpoint_name}数据请求完成")if __name__ == '__main__':g1 = gevent.spawn(fetch_data, 'g1')g2 = gevent.spawn(fetch_data, 'g2')g1.join()g2.join()
joinall方式:
import gevent
import timedef fetch_data(endpoint_name):for i in range(3):print(f"{endpoint_name}正在请求数据... {i+1}/3")gevent.sleep(1) # 使用gevent.sleep()以支持自动切换print(f"{endpoint_name}数据请求完成")if __name__ == '__main__':gevent.joinall([gevent.spawn(fetch_data, "Endpoint A"),gevent.spawn(fetch_data, "Endpoint B"),gevent.spawn(fetch_data, "Endpoint C")])
输出结果(非严格顺序):
Endpoint A正在请求数据... 1/3
Endpoint B正在请求数据... 1/3
Endpoint C正在请求数据... 1/3
...
✅ 说明:
gevent.spawn()
创建协程对象。gevent.sleep()
可以触发自动切换。gevent.joinall()
阻塞主线程,等待所有协程完成。
六、Monkey Patch(猴子补丁)详解
默认情况下,gevent
不能自动识别标准库中的 I/O 函数(如 time.sleep
, socket.socket
)。为了使其生效,需要打上 Monkey 补丁。
示例:
from gevent import monkey
monkey.patch_all() # 替换所有标准库为 gevent 版本import gevent
import timedef sing(name):for i in range(3):time.sleep(1) # 现在会自动切换print(f"{name}在唱歌,第{i}次")if __name__ == '__main__':gevent.joinall([gevent.spawn(sing, "协程1"),gevent.spawn(sing, "协程2"),gevent.spawn(sing, "协程3")])
✅ 作用:
替换标准库中阻塞函数为非阻塞版本
使用 gevent
库可以将标准库中的阻塞函数替换为非阻塞版本,从而实现协程的自动切换。gevent
通过 monkey.patch_all()
方法来实现这一功能。该方法会修改标准库中的一些函数,使其支持协程的自动切换。
from gevent import monkey
monkey.patch_all()
应用场景
协程特别适合以下场景:
- 网络爬虫:同时发起大量 HTTP 请求,等待响应期间可切换到其他请求。
- Web 服务器:一个线程服务多个客户端连接,提高吞吐量。
- 高并发通信:如 WebSocket、聊天服务器等,保持大量长连接。
- 数据采集系统:异步获取多个 API 数据,提升效率。
七 总结对比
类型 | 是否系统级调度 | 是否自动切换 | 适用场景 | 切换开销 | 并发性 |
---|---|---|---|---|---|
进程 | ✅ | ❌ | CPU 密集型任务 | 高 | 真并行 |
线程 | ✅ | ❌ | I/O 密集型任务 | 中等 | 伪并行 |
协程(gevent) | ❌ | ✅ | 高并发 I/O 操作 | 极低 | 伪并发 |
常见问题
1.join()
方法的作用是什么?是否必须调用?
spawn()
创建协程后,协程立即进入就绪状态。join()
的作用是阻塞主线程,直到该协程执行完毕。如果不调用 join()
或 joinall()
,主线程可能提前退出,导致子协程未完全执行。
2. 协程遇到 time.sleep()
会不会自动切换?
默认不会。需要结合 monkey.patch_all()
才能让 time.sleep()
触发自动切换。
import time
from gevent import monkey, spawnmonkey.patch_all()def task():print("Task started")time.sleep(2)print("Task finished")spawn(task).join()
通过以上代码,time.sleep()
会被 gevent
替换为非阻塞版本,从而实现协程的自动切换。
八 高并发通信场景
在处理高并发通信场景,比如WebSocket服务器或聊天服务器等需要保持大量长连接的应用时,使用协程相较于传统的多线程方法有几个显著的优势。为什么在这种情况下协程通常是更好的选择。
线程:每个线程都需要一定的内存开销(例如栈空间),并且操作系统在线程之间进行上下文切换时也会消耗额外的资源。对于维持大量长连接的应用来说,创建和维护成千上万的线程是不切实际的,因为这会导致巨大的内存消耗和较高的CPU开销用于线程间的上下文切换。
协程:相比之下,协程非常轻量级。它们共享同一个线程的状态,因此不需要像线程那样为每个实例分配大量的内存。此外,协程之间的切换是由用户态代码控制的,而不是通过操作系统调度器,这意味着切换的成本更低,可以更高效地管理大量并发任务。
九 并行
4核心8线程的CPU意味着它具有4个物理核心,并通过超线程技术(Hyper-Threading)提供了8个线程,即8个逻辑处理器。理论上,操作系统可以同时调度和运行8个线程,从而在理想情况下实现并行执行8个线程的任务。