Python异步编程
Python异步编程
- 1. 异步开发的几种实现方式
- 2. 协程
- 3. 第一个协程代码
- 4. 并发多个协程
- 5. 等待
- 6. 超时
- 7. 使用多线程执行同步任务
- 8. Task对象
1. 异步开发的几种实现方式
1. 多进程
- 进程是操作系统进行资源分配和调度的一个独立单位,每个进程都有自己独立的内存空间和系统资源。通过创建多个进程,各个进程可以同时执行不同的任务,实现并发编程。进程之间的运行互不影响,如果一个进程崩溃,不会影响其他进程的正常运行。但是,进程之间的切换成本和通信成本相对较高,每个进程需要占用独立的系统资源(如内存),因此对资源的消耗相对较大。在Python中,多进程适合处理CPU密集型任务,比如视频转码,科学计算等。
2. 多线程
- 线程是进程内的一个执行单元,是操作系统进行任务调度的基本单位。一个进程内通常包含多个线程,这些线程共享进程的资源,包括内存空间、文件句柄等。通过创建多个线程,一个进程内的多个任务可以同时执行,实现并发编程。与进程相比,线程之间的切换成本更低,且线程之间的通信更方便,因为他们共享同一份内存空间。但是,线程之间的运行相互影响,如果一个线程崩溃,可能会影响同一进程内的其他线程。另外,线程的并发执行需要操作系统的支持,不同的操作系统对线程的支持程度不同。Python中的线程有GUL锁,即在同一个进程中,同一时刻只能有一个线程上CPU运行,所以Python多线程不适合处理CPU密集型任务。多线程适合处理I/O密集型任务,比如网络请求,磁盘读写等。
3. 协程
- 协程(Coroutine)是一种程序组件,通过它可以实现多任务的并发执行。与传统进程和线程相比,协程提供了更轻量级的并发编程解决方案。接下来会详细介绍协程。
2. 协程
1. 协程的概念
- 协程是一种用户态的轻量级线程,它允许程序在单个线程内实现多个任务的并发执行。写成通过协作式多任务来实现,这意味着协程会主动交出控制权,让其他协程运行。与进程和线程不同,协程的切换不需要操作系统内核的介入,从而降低了开销。
2. 协程实现并发编程
- 协程实现并发编程的核心思想是利用函数的暂停和恢复。在协程中,函数可以在某个点暂停执行,并在适当的时候恢复执行,而不会影响其他协程的运行。这种机制使得多个协程可以在单个线程内交替执行,从而实现并发。
- 协程的实现通常依赖于以下两个关键概念:
- 生成器(Generator):生成器是一种特殊的函数,可以在执行过程中多次暂停和恢复。通过生成器,我们可以实现简单的协程功能。例如Python中使用
yield
关键字可以创建生成器。 - 异步编程(Asynchronous Programming):异步编程是一种编程范式,允许程序在等待I/O操作完成时执行其他任务。在协程中,可以利用异步编程实现并发。
- 生成器(Generator):生成器是一种特殊的函数,可以在执行过程中多次暂停和恢复。通过生成器,我们可以实现简单的协程功能。例如Python中使用
3. 协程与进程、线程的优势
- 轻量级:协程的创建和切换开销远低于线程和进程。由于协程在用户态执行,因此不需要内核态的上下文切换,从而降低了开销。
- 高并发性能:由于协程的轻量级特性,单个线程可以创建大量的协程,实现高并发处理。相比之下,线程和进程的数量受到系统资源的限制。
- 资源共享:协程在单个线程内运行,可以轻松地共享资源,无序考虑线程和进程间的同步和通信问题。
- 简化编程模型:协程的协作式多任务特性使得并发编程更加直观和简单。开发者可以专注于业务逻辑,而不是线程或进程的同步和竞争条件。
4. 主流协议
- WSGI:同步。通过多进程+多线程的方式来实现并发。
- ASGI:异步。通过多进程+主线程(不存在多线程)+协程来实现并发。
3. 第一个协程代码
在Python3.4中添加了asyncio
库,这让我们利用Python编写协程代码变得更加简单(不要再用yield
了,早过时了)。
import asyncioasync def main():print("hello")# 协程必须哟啊等待,也就是必须在前面加上await关键字await asyncio.sleep(1) # 这里不要使用同步的I/O,如time.sleep(),否则就是同步的,失去了并发性print('world')if __name__ == '__main__':# 创建一个协程对象# main(): 这样并不是直接执行main函数,而是创建一个协程cor = main()# 要把协程丢到事件循环中,才能运行协程asyncio.run(cor)
- 以上代码有几点需要说明:
- a. 协程不会自己运行,需要加入到事件循环中,让事件调度运行。我们可以通过
asyncio.run()
函数来将一个协程放到事件循环中; - b. 通过在函数前面加上
async
关键字,将一个普通的函数变为一个协程; - c. 在协程中,使用
await
关键字等待一个协程执行完成(必须加await
)。关键字await
,也必须放到async
定义的函数中,否则会报错; - d. 以上
asyncio.sleep
函数不能用time.sleep
来替换,后者是同步的,如果放到异步函数中,将无法发挥异步编程的优势。
- a. 协程不会自己运行,需要加入到事件循环中,让事件调度运行。我们可以通过
- 伪代码理解协程调度:
asyncio.run(main())
就相当于把main()
协程加入事件循环,即协程队列中。随后不断从队列中pop
出协程。如果执行某个协程时,出现了资源等待,那么系统不会傻傻的等待这个协程资源就绪,而是直接进入下一次循环,pop
出新的协程,继续执行。
# 有一个协程队列,存储所有需要执行的协程
queue = [cor1, cor2, ...]
while True:cor = queue.pop()result = await cor
4. 并发多个协程
先定义一个显示协程运行时间的装饰器:
# utils.py
import time
from functools import wrapsdef async_timed(func):@wraps(func)async def wrapper(*args, **kwargs):print(f'开始执行{func},参数为:{args}, {kwargs}')start = time.time()try:return await func(*args, **kwargs)finally:end = time.time()total = end - startprint(f'结束执行{func},耗时:{total:.4f}秒')return wrapper
1. 用创建任务的方式并发运行
- 先展示一个错误的同步运行写法:
- 最终
main
协程的运行时间为3s,没有实现并发。
- 最终
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# 同步运行
@async_timed
async def main():# 执行两个协程result1 = await greet(name='xxx', delay=1)print(f'result1: {result1}')result2 = await greet(name='yyy', delay=2)print(f'result2: {result2}')
- 想实现并发,必须将协程包装成
Task
任务对象(下面详细解释一下代码的执行顺序):- 首先,(1)执行,
asyncio.create_task
将创建一个事件循环,并将协程greet(name='xxx', delay=1)
放入队列中,开始执行; - 紧接着,(2)执行,将协程
greet(name='yyy', delay=2)
也加入队列,开始执行; - (3)执行,
main()
主程序等待task1
执行完毕,时间为1s; - (4)执行,打印
result1
; - (5)执行,由于两个协程几乎同时进入事件循环,所以此时
task2
也已经执行1s了,main()
主程序继续等待task2
执行完毕,剩余时间为1s; - 最后(6)执行,打印
result
。
- 首先,(1)执行,
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# 用创建任务的方式并发运行
@async_timed
async def main():# 必须要将协程包装成Task对象,才能够并发执行task1 = asyncio.create_task(greet(name='xxx', delay=1)) # (1)task2 = asyncio.create_task(greet(name='yyy', delay=2)) # (2)result1 = await task1 # (3)print(f'result1: {result1}') # (4)result2 = await task2 # (5)print(f'result2: {result2}') # (6)
- 由此衍生出一个错误的写法:
- 如果在第一个协程进入事件循环后,就直接
await
该协程,就会导致main()
主程序在此等待task1
完全执行完毕,1s后才会执行下一句代码,总时间还是3s; - 由此可见,想要实现并发,必须先让全部的协程都先进入事件循环,之后才能
await
。
- 如果在第一个协程进入事件循环后,就直接
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"@async_timed
async def main():result1 = await asyncio.create_task(greet(name='xxx', delay=1))result2 = await asyncio.create_task(greet(name='yyy', delay=2))
2. 使用任务组
- 除了创建完任务后,一个个
await
,还可以使用TaskGroup
创建一个任务组,然后再任务组中创建多个任务,最终统一await
这个任务组(这个操作不用我们自己做,是自动的),示例:
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# 2. 用TaskGroup方式并发运行协程
@async_timed
async def main():async with asyncio.TaskGroup() as group:# 在这个里面创建任务task1 = group.create_task(greet(name='xxx', delay=1))task2 = group.create_task(greet(name='yyy', delay=2))print(task1.result())print(task2.result())
- 但是如果其中有任务出现异常了,就会导致后面的任务被取消,从而提前退出协程的并发运行。
async def greet_group(name, delay):await asyncio.sleep(delay)if name == "xxx":raise ValueError("执行错误!")return f'hello {name}'# 2. 用TaskGroup方式并发运行协程
@async_timed
async def main():try:async with asyncio.TaskGroup() as group:# 在这个里面创建任务task1 = group.create_task(greet_group(name='xxx', delay=1))task2 = group.create_task(greet_group(name='yyy', delay=2))except Exception as e:print(e)# 其中有任务出现异常了,导致后面的任务被取消了,从而提前退出协程的并发运行# * done:代表该协程是否完成(被取消也算完成)# * cancelled:返回该协程是否被取消print(task1.done()) # trueprint(task2.cancelled()) # true
3. 使用gather
- 以上代码是手动创建任务后运行,另外还可以通过一个更高级的API来实现并发运行,即
asyncio.gather
,这个函数的底层实际上也是将协程封装成Future
对象,然后再并发运行。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"async def greet_group(name, delay):await asyncio.sleep(delay)if name == "xxx":raise ValueError("执行错误!")return f'hello {name}'@async_timed
async def main1():# gather在将所有协程全部执行完之后,会按照协程入队的顺序(注意不是协程执行完的顺序),将协程的返回值存放在results中results = await asyncio.gather(greet('张三', 1),greet('李四', 3),greet('王五', 2))print(results)@async_timed
async def main2():results = await asyncio.gather(greet_group('xxx', 1),greet('李四', 3),greet('王五', 2),return_exceptions=True)print(results)# results:[ValueError('执行错误!'), 'hello 李四', 'hello 王五']
- 补充:
- 如果gather中的协程出现异常,那么会抛出异常。如果不想抛出异常,那么可以设置
return_exceptions=True
,就会把异常作为返回值,而不会抛出异常; - gather在将所有协程全部执行完后,会按照协程入队的顺序,将协程的返回值存放在
results
中; - 与
TaskGroup
相比:asyncio.gather
函数即使其中有任务抛出异常,也不会取消后面的任务;而TaskGroup
则是只要有一个任务抛出异常,后续的任务都会被取消。
- 如果gather中的协程出现异常,那么会抛出异常。如果不想抛出异常,那么可以设置
4. 使用as_completed
as_completed
在每运行完一个协程后就返回,使用方法如下:
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"@async_timed
async def main():aws = [greet('张三', 1),greet('李四', 3)]for coro in asyncio.as_completed(aws):result = await coroprint(result)
- 上述代码中,会并发执行
aws
中的协程。as_completed
函数会返回一个迭代器,最先执行完的任务会最先被遍历到。并且as_completed
可以设置超时时间:
@async_timed
async def main():aws = [greet('张三', 1),greet('李四', 3)]# 可以指定超时时间# 如果超过指定超时时间,还有任务没有完成,那么会抛出TimeoutError异常# 剩余的任务不会被取消try:for coro in asyncio.as_completed(aws, timeout=2):result = await coroprint(result)except asyncio.TimeoutError:print('超时了!')tasks = asyncio.all_tasks()for task in tasks:if task.get_name() == 'Task-1': # main协程不等待continueelse:# 如果没有继续等待task执行,那么这个task就不会执行了,而是处于pending状态result = await taskprint(result)
as_completed
方法在其中某个任务抛出异常后,剩余的任务也不会被取消掉。可以通过await
再次激活,这一点同gather
。
5. 等待
有时候,我们期望某个协程或者任务最多运行多长时间,就可以使用
wait_for
或wait
函数。
1. wait_for(aw, timeout)
wait_for
函数只能用于等待一个协程或者任务,可以指定超时时间。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# wait_for
@async_timed
async def main():try:result = await asyncio.wait_for(greet('张三', 2), timeout=1)print(result)except asyncio.TimeoutError:print('超时了!')tasks = asyncio.all_tasks()print(tasks) # 这里只会打印一个Task1,也就是main协程,超时的协程被取消了
- 超时后的任务,没法继续让其执行了。
2. wait(aws, timeout=None, return_when=ALL_COMPLETED)
- 这个函数可用于等待多个
Task
或者Future
(Task
对象的基类),并且可以指定在什么情况下才会返回,默认是ALL_COMPLETED
(全部执行完后返回),并且注意,这个函数不会触发TimeoutError
,而是将执行完的,以及超时的任务,通过元组的形式返回:
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# wait
@async_timed
async def main():aws = [asyncio.create_task(greet('张三', 1)),asyncio.create_task(greet('李四', 3))]# wait函数返回的结果是一个元组(执行完成的任务,执行超时的任务)# 如果没有指定timeout,那么永远不会超时done_tasks, pending_tasks = await asyncio.wait(aws, timeout=2)print(done_tasks)print(pending_tasks)for task in pending_tasks:result = await task # 没有执行完的协程可以继续执行print(result)
- 其中,
return_when
除了默认的ALL_COMPLETED
外,还有以下可选值(当然超时就会直接返回,相当于这个参数就不起任何作用):ALL_COMPLETED
:等所有任务都执行完后,再返回;FIRST_EXCEPTION
:有任何任务发生异常后就立即返回,即使没有超时也会返回;FIRST_COMPLETED
:第一个任务执行完后就立即返回。
6. 超时
asyncio
提供了专门的超时API,用于限制某些任务的最大执行时间。超时API有两个,分别是:asyncio.timeout
和asyncio.timeout_at
。
1. asyncio.timeout(delay)
- 该函数返回一个异步上下文管理器,也就意味着我们可以使用
async with
进行使用; - 其中
delay
可以为具体的秒数,也可以为None
,如果为None
,那么代表哦永远不会超时; - 如果超时
delay
的时间,那么下面所有的任务都将会被取消,并抛出TimeoutError
异常。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"@async_timed
async def main():try:async with asyncio.timeout(1):task1 = asyncio.create_task(greet('张三', 1), name='zhangsan')task2 = asyncio.create_task(greet('李四', 2), name='lisi')result1 = await task1print(result1)result2 = await task2print(result2)except asyncio.TimeoutError:print('超时了!')tasks = asyncio.all_tasks()print(tasks) # lisi协程被打印,处于pending状态
2. asyncio.timeout_at(when)
- 与
asyncio.timeout
不同的是,asyncio.timeout_at
中的when
参数是一个绝对一时间,或者为None
。
7. 使用多线程执行同步任务
1. 有协程了,为什么还要用线程?
- 在Python中,虽然协程比线程效率更高,但是很多库比如
requests
,并没有提供异步协程的版本。一旦在协程中使用了requests
库中的同步方法,比如requests.get()
,事件循环就会在这里阻塞,等待资源就绪,从而失去异步特性。 - 但是,如果我们新开一个线程,把同步的代码放在其中执行,就不会影响主线程中的事件循环了。
2. 代码示例
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"def get_url(url):print(f'开始获取{url}')# 同步阻塞2stime.sleep(2)print(f'结束获取{url}')return 'success'@async_timed
async def main():result = await asyncio.gather(asyncio.to_thread(get_url, url="https://www.baidu.com"),greet('张三', 2))print(result)if __name__ == "__main__":asyncio.run(main())
8. Task对象
Task对象是用于封装和管理协程的运行的,可以将协程并发执行。Task对象有以下方法:
done
:用于获取该Task对象是否执行完成(正常完成,异常,被取消都算done
);result
:用于获取该Task执行完后的返回值;exception
:如果Task对象执行过程中发生异常,则该方法会返回异常信息。如果任务没有发生异常,那么调用exception()
方法将抛出asyncio.exceptions.InvalidStateError: Exception is ont set.
异常
async def task_will_fail():await asyncio.sleep(1)raise ValueError("发生异常!")async def main1():# create_task创建完任务后,这个任务会立马加入到事件循环中进行调度task = asyncio.create_task(task_will_fail())# 如果任务中没有出现异常,那么调用exception()方法就会出现异常print(task.exception()) # 报错async def main2():task = asyncio.create_task(task_will_fail())await asyncio.sleep(2)print(task.exception()) # 打印异常,不报错
add_done_callback
:添加任务执行完成后的回调。
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"def my_callback1(task):print('='*20)print(type(task)) # <class '_asyncio.Task'>print(task.result())print('='*20)def my_callback2(task, tag):print('='*20)print(type(task)) # <class '_asyncio.Task'>print(task.result())print('tag: ', tag)print('='*20)async def main1():task = asyncio.create_task(greet('张三', 2))task.add_done_callback(my_callback1)await taskasync def main2():task = asyncio.create_task(greet('张三', 2))# partial: 偏函数,可以提前准备好一些参数task.add_done_callback(partial(my_callback2, tag='zhangsan'))await task
cancel
:取消任务的执行;
async def something():print('something start')await asyncio.sleep(20)async def main():task = asyncio.create_task(something())task.cancel()# 等待一个已经被取消的任务,会抛出CanceledError异常try:await taskexcept asyncio.CancelledError:print('是否被取消:', task.cancelled())
cancelled
:判断任务是否被取消;get_name
:获取任务的名称;set_name
:设置任务的名称。