当前位置: 首页 > news >正文

Python异步编程

Python异步编程

  • 1. 异步开发的几种实现方式
  • 2. 协程
  • 3. 第一个协程代码
  • 4. 并发多个协程
  • 5. 等待
  • 6. 超时
  • 7. 使用多线程执行同步任务
  • 8. Task对象


1. 异步开发的几种实现方式


1. 多进程

  • 进程是操作系统进行资源分配和调度的一个独立单位,每个进程都有自己独立的内存空间和系统资源。通过创建多个进程,各个进程可以同时执行不同的任务,实现并发编程。进程之间的运行互不影响,如果一个进程崩溃,不会影响其他进程的正常运行。但是,进程之间的切换成本和通信成本相对较高,每个进程需要占用独立的系统资源(如内存),因此对资源的消耗相对较大。在Python中,多进程适合处理CPU密集型任务,比如视频转码,科学计算等。

2. 多线程

  • 线程是进程内的一个执行单元,是操作系统进行任务调度的基本单位。一个进程内通常包含多个线程,这些线程共享进程的资源,包括内存空间、文件句柄等。通过创建多个线程,一个进程内的多个任务可以同时执行,实现并发编程。与进程相比,线程之间的切换成本更低,且线程之间的通信更方便,因为他们共享同一份内存空间。但是,线程之间的运行相互影响,如果一个线程崩溃,可能会影响同一进程内的其他线程。另外,线程的并发执行需要操作系统的支持,不同的操作系统对线程的支持程度不同。Python中的线程有GUL锁,即在同一个进程中,同一时刻只能有一个线程上CPU运行,所以Python多线程不适合处理CPU密集型任务。多线程适合处理I/O密集型任务,比如网络请求,磁盘读写等。

3. 协程

  • 协程(Coroutine)是一种程序组件,通过它可以实现多任务的并发执行。与传统进程和线程相比,协程提供了更轻量级的并发编程解决方案。接下来会详细介绍协程。

2. 协程


1. 协程的概念

  • 协程是一种用户态的轻量级线程,它允许程序在单个线程内实现多个任务的并发执行。写成通过协作式多任务来实现,这意味着协程会主动交出控制权,让其他协程运行。与进程和线程不同,协程的切换不需要操作系统内核的介入,从而降低了开销。

2. 协程实现并发编程

  • 协程实现并发编程的核心思想是利用函数的暂停和恢复。在协程中,函数可以在某个点暂停执行,并在适当的时候恢复执行,而不会影响其他协程的运行。这种机制使得多个协程可以在单个线程内交替执行,从而实现并发。
  • 协程的实现通常依赖于以下两个关键概念:
    • 生成器(Generator):生成器是一种特殊的函数,可以在执行过程中多次暂停和恢复。通过生成器,我们可以实现简单的协程功能。例如Python中使用yield关键字可以创建生成器。
    • 异步编程(Asynchronous Programming):异步编程是一种编程范式,允许程序在等待I/O操作完成时执行其他任务。在协程中,可以利用异步编程实现并发。

3. 协程与进程、线程的优势

  • 轻量级:协程的创建和切换开销远低于线程和进程。由于协程在用户态执行,因此不需要内核态的上下文切换,从而降低了开销。
  • 高并发性能:由于协程的轻量级特性,单个线程可以创建大量的协程,实现高并发处理。相比之下,线程和进程的数量受到系统资源的限制。
  • 资源共享:协程在单个线程内运行,可以轻松地共享资源,无序考虑线程和进程间的同步和通信问题。
  • 简化编程模型:协程的协作式多任务特性使得并发编程更加直观和简单。开发者可以专注于业务逻辑,而不是线程或进程的同步和竞争条件。

4. 主流协议

  • WSGI:同步。通过多进程+多线程的方式来实现并发。
  • ASGI:异步。通过多进程+主线程(不存在多线程)+协程来实现并发。

3. 第一个协程代码


在Python3.4中添加了asyncio库,这让我们利用Python编写协程代码变得更加简单(不要再用yield了,早过时了)。

import asyncioasync def main():print("hello")# 协程必须哟啊等待,也就是必须在前面加上await关键字await asyncio.sleep(1) # 这里不要使用同步的I/O,如time.sleep(),否则就是同步的,失去了并发性print('world')if __name__ == '__main__':# 创建一个协程对象# main(): 这样并不是直接执行main函数,而是创建一个协程cor = main()# 要把协程丢到事件循环中,才能运行协程asyncio.run(cor)
  • 以上代码有几点需要说明:
    • a. 协程不会自己运行,需要加入到事件循环中,让事件调度运行。我们可以通过asyncio.run()函数来将一个协程放到事件循环中;
    • b. 通过在函数前面加上async关键字,将一个普通的函数变为一个协程;
    • c. 在协程中,使用await关键字等待一个协程执行完成(必须加await)。关键字await,也必须放到async定义的函数中,否则会报错;
    • d. 以上asyncio.sleep函数不能用time.sleep来替换,后者是同步的,如果放到异步函数中,将无法发挥异步编程的优势。
  • 伪代码理解协程调度:
    • asyncio.run(main())就相当于把main()协程加入事件循环,即协程队列中。随后不断从队列中pop出协程。如果执行某个协程时,出现了资源等待,那么系统不会傻傻的等待这个协程资源就绪,而是直接进入下一次循环,pop出新的协程,继续执行。
# 有一个协程队列,存储所有需要执行的协程
queue = [cor1, cor2, ...]
while True:cor = queue.pop()result = await cor

4. 并发多个协程


先定义一个显示协程运行时间的装饰器:

# utils.py
import time
from functools import wrapsdef async_timed(func):@wraps(func)async def wrapper(*args, **kwargs):print(f'开始执行{func},参数为:{args}, {kwargs}')start = time.time()try:return await func(*args, **kwargs)finally:end = time.time()total = end - startprint(f'结束执行{func},耗时:{total:.4f}秒')return wrapper

1. 用创建任务的方式并发运行

  • 先展示一个错误的同步运行写法:
    • 最终main协程的运行时间为3s,没有实现并发。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# 同步运行
@async_timed
async def main():# 执行两个协程result1 = await greet(name='xxx', delay=1)print(f'result1: {result1}')result2 = await greet(name='yyy', delay=2)print(f'result2: {result2}')
  • 想实现并发,必须将协程包装成Task任务对象(下面详细解释一下代码的执行顺序):
    • 首先,(1)执行,asyncio.create_task将创建一个事件循环,并将协程greet(name='xxx', delay=1)放入队列中,开始执行;
    • 紧接着,(2)执行,将协程greet(name='yyy', delay=2)也加入队列,开始执行;
    • (3)执行,main()主程序等待task1执行完毕,时间为1s;
    • (4)执行,打印result1
    • (5)执行,由于两个协程几乎同时进入事件循环,所以此时task2也已经执行1s了,main()主程序继续等待task2执行完毕,剩余时间为1s;
    • 最后(6)执行,打印result
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# 用创建任务的方式并发运行
@async_timed
async def main():# 必须要将协程包装成Task对象,才能够并发执行task1 = asyncio.create_task(greet(name='xxx', delay=1))	# (1)task2 = asyncio.create_task(greet(name='yyy', delay=2)) # (2)result1 = await task1	# (3)print(f'result1: {result1}') # (4)result2 = await task2 # (5)print(f'result2: {result2}') # (6)
  • 由此衍生出一个错误的写法:
    • 如果在第一个协程进入事件循环后,就直接await该协程,就会导致main()主程序在此等待task1完全执行完毕,1s后才会执行下一句代码,总时间还是3s;
    • 由此可见,想要实现并发,必须先让全部的协程都先进入事件循环,之后才能await
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"@async_timed
async def main():result1 = await asyncio.create_task(greet(name='xxx', delay=1))result2 = await asyncio.create_task(greet(name='yyy', delay=2))

2. 使用任务组

  • 除了创建完任务后,一个个await,还可以使用TaskGroup创建一个任务组,然后再任务组中创建多个任务,最终统一await这个任务组(这个操作不用我们自己做,是自动的),示例:
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# 2. 用TaskGroup方式并发运行协程
@async_timed
async def main():async with asyncio.TaskGroup() as group:# 在这个里面创建任务task1 = group.create_task(greet(name='xxx', delay=1))task2 = group.create_task(greet(name='yyy', delay=2))print(task1.result())print(task2.result())
  • 但是如果其中有任务出现异常了,就会导致后面的任务被取消,从而提前退出协程的并发运行。
async def greet_group(name, delay):await asyncio.sleep(delay)if name == "xxx":raise ValueError("执行错误!")return f'hello {name}'# 2. 用TaskGroup方式并发运行协程
@async_timed
async def main():try:async with asyncio.TaskGroup() as group:# 在这个里面创建任务task1 = group.create_task(greet_group(name='xxx', delay=1))task2 = group.create_task(greet_group(name='yyy', delay=2))except Exception as e:print(e)# 其中有任务出现异常了,导致后面的任务被取消了,从而提前退出协程的并发运行# * done:代表该协程是否完成(被取消也算完成)# * cancelled:返回该协程是否被取消print(task1.done()) # trueprint(task2.cancelled()) # true

3. 使用gather

  • 以上代码是手动创建任务后运行,另外还可以通过一个更高级的API来实现并发运行,即asyncio.gather,这个函数的底层实际上也是将协程封装成Future对象,然后再并发运行。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"async def greet_group(name, delay):await asyncio.sleep(delay)if name == "xxx":raise ValueError("执行错误!")return f'hello {name}'@async_timed
async def main1():# gather在将所有协程全部执行完之后,会按照协程入队的顺序(注意不是协程执行完的顺序),将协程的返回值存放在results中results = await asyncio.gather(greet('张三', 1),greet('李四', 3),greet('王五', 2))print(results)@async_timed
async def main2():results = await asyncio.gather(greet_group('xxx', 1),greet('李四', 3),greet('王五', 2),return_exceptions=True)print(results)# results:[ValueError('执行错误!'), 'hello 李四', 'hello 王五']
  • 补充:
    • 如果gather中的协程出现异常,那么会抛出异常。如果不想抛出异常,那么可以设置return_exceptions=True,就会把异常作为返回值,而不会抛出异常;
    • gather在将所有协程全部执行完后,会按照协程入队的顺序,将协程的返回值存放在results中;
    • TaskGroup相比:asyncio.gather函数即使其中有任务抛出异常,也不会取消后面的任务;而TaskGroup则是只要有一个任务抛出异常,后续的任务都会被取消。

4. 使用as_completed

  • as_completed在每运行完一个协程后就返回,使用方法如下:
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"@async_timed
async def main():aws = [greet('张三', 1),greet('李四', 3)]for coro in asyncio.as_completed(aws):result = await coroprint(result)
  • 上述代码中,会并发执行aws中的协程。as_completed函数会返回一个迭代器,最先执行完的任务会最先被遍历到。并且as_completed可以设置超时时间:
@async_timed
async def main():aws = [greet('张三', 1),greet('李四', 3)]# 可以指定超时时间# 如果超过指定超时时间,还有任务没有完成,那么会抛出TimeoutError异常# 剩余的任务不会被取消try:for coro in asyncio.as_completed(aws, timeout=2):result = await coroprint(result)except asyncio.TimeoutError:print('超时了!')tasks = asyncio.all_tasks()for task in tasks:if task.get_name() == 'Task-1': # main协程不等待continueelse:# 如果没有继续等待task执行,那么这个task就不会执行了,而是处于pending状态result = await taskprint(result)
  • as_completed方法在其中某个任务抛出异常后,剩余的任务也不会被取消掉。可以通过await再次激活,这一点同gather

5. 等待


有时候,我们期望某个协程或者任务最多运行多长时间,就可以使用wait_forwait函数。

1. wait_for(aw, timeout)

  • wait_for函数只能用于等待一个协程或者任务,可以指定超时时间。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# wait_for
@async_timed
async def main():try:result = await asyncio.wait_for(greet('张三', 2), timeout=1)print(result)except asyncio.TimeoutError:print('超时了!')tasks = asyncio.all_tasks()print(tasks) # 这里只会打印一个Task1,也就是main协程,超时的协程被取消了
  • 超时后的任务,没法继续让其执行了。

2. wait(aws, timeout=None, return_when=ALL_COMPLETED)

  • 这个函数可用于等待多个Task或者FutureTask对象的基类),并且可以指定在什么情况下才会返回,默认是ALL_COMPLETED(全部执行完后返回),并且注意,这个函数不会触发TimeoutError,而是将执行完的,以及超时的任务,通过元组的形式返回:
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"# wait
@async_timed
async def main():aws = [asyncio.create_task(greet('张三', 1)),asyncio.create_task(greet('李四', 3))]# wait函数返回的结果是一个元组(执行完成的任务,执行超时的任务)# 如果没有指定timeout,那么永远不会超时done_tasks, pending_tasks = await asyncio.wait(aws, timeout=2)print(done_tasks)print(pending_tasks)for task in pending_tasks:result = await task # 没有执行完的协程可以继续执行print(result)
  • 其中,return_when除了默认的ALL_COMPLETED外,还有以下可选值(当然超时就会直接返回,相当于这个参数就不起任何作用):
    • ALL_COMPLETED:等所有任务都执行完后,再返回;
    • FIRST_EXCEPTION:有任何任务发生异常后就立即返回,即使没有超时也会返回;
    • FIRST_COMPLETED:第一个任务执行完后就立即返回。

6. 超时


asyncio提供了专门的超时API,用于限制某些任务的最大执行时间。超时API有两个,分别是:asyncio.timeoutasyncio.timeout_at

1. asyncio.timeout(delay)

  • 该函数返回一个异步上下文管理器,也就意味着我们可以使用async with进行使用;
  • 其中delay可以为具体的秒数,也可以为None,如果为None,那么代表哦永远不会超时;
  • 如果超时delay的时间,那么下面所有的任务都将会被取消,并抛出TimeoutError异常。
# 传入delay设置延迟
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"@async_timed
async def main():try:async with asyncio.timeout(1):task1 = asyncio.create_task(greet('张三', 1), name='zhangsan')task2 = asyncio.create_task(greet('李四', 2), name='lisi')result1 = await task1print(result1)result2 = await task2print(result2)except asyncio.TimeoutError:print('超时了!')tasks = asyncio.all_tasks()print(tasks) # lisi协程被打印,处于pending状态

2. asyncio.timeout_at(when)

  • asyncio.timeout不同的是,asyncio.timeout_at中的when参数是一个绝对一时间,或者为None

7. 使用多线程执行同步任务


1. 有协程了,为什么还要用线程?

  • 在Python中,虽然协程比线程效率更高,但是很多库比如requests,并没有提供异步协程的版本。一旦在协程中使用了requests库中的同步方法,比如requests.get(),事件循环就会在这里阻塞,等待资源就绪,从而失去异步特性。
  • 但是,如果我们新开一个线程,把同步的代码放在其中执行,就不会影响主线程中的事件循环了。

2. 代码示例

async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"def get_url(url):print(f'开始获取{url}')# 同步阻塞2stime.sleep(2)print(f'结束获取{url}')return 'success'@async_timed
async def main():result = await asyncio.gather(asyncio.to_thread(get_url, url="https://www.baidu.com"),greet('张三', 2))print(result)if __name__ == "__main__":asyncio.run(main())

8. Task对象


Task对象是用于封装和管理协程的运行的,可以将协程并发执行。Task对象有以下方法:

  • done:用于获取该Task对象是否执行完成(正常完成,异常,被取消都算done);
  • result:用于获取该Task执行完后的返回值;
  • exception:如果Task对象执行过程中发生异常,则该方法会返回异常信息。如果任务没有发生异常,那么调用exception()方法将抛出asyncio.exceptions.InvalidStateError: Exception is ont set.异常
async def task_will_fail():await asyncio.sleep(1)raise ValueError("发生异常!")async def main1():# create_task创建完任务后,这个任务会立马加入到事件循环中进行调度task = asyncio.create_task(task_will_fail())# 如果任务中没有出现异常,那么调用exception()方法就会出现异常print(task.exception()) # 报错async def main2():task = asyncio.create_task(task_will_fail())await asyncio.sleep(2)print(task.exception()) # 打印异常,不报错
  • add_done_callback:添加任务执行完成后的回调。
async def greet(name, delay):await asyncio.sleep(delay)return f"hello {name}"def my_callback1(task):print('='*20)print(type(task)) # <class '_asyncio.Task'>print(task.result())print('='*20)def my_callback2(task, tag):print('='*20)print(type(task)) # <class '_asyncio.Task'>print(task.result())print('tag: ', tag)print('='*20)async def main1():task = asyncio.create_task(greet('张三', 2))task.add_done_callback(my_callback1)await taskasync def main2():task = asyncio.create_task(greet('张三', 2))# partial: 偏函数,可以提前准备好一些参数task.add_done_callback(partial(my_callback2, tag='zhangsan'))await task
  • cancel:取消任务的执行;
async def something():print('something start')await asyncio.sleep(20)async def main():task = asyncio.create_task(something())task.cancel()# 等待一个已经被取消的任务,会抛出CanceledError异常try:await taskexcept asyncio.CancelledError:print('是否被取消:', task.cancelled())
  • cancelled:判断任务是否被取消;
  • get_name:获取任务的名称;
  • set_name:设置任务的名称。

http://www.dtcms.com/a/278100.html

相关文章:

  • 57.第二阶段x64游戏实战-实时监控抓取lua内容
  • 利用低汇率国家苹果订阅,120 元开通 ChatGPT Plus
  • 14.使用GoogleNet/Inception网络进行Fashion-Mnist分类
  • docker基础部署
  • ID生成策略
  • 在新版本的微信开发者工具中使用npm包
  • 用信号量实现进程互斥,进程同步,进程前驱关系(操作系统os)
  • DOS下EXE文件的分析 <1>
  • MacBook Air通过VMware Fusion Pro安装Win11
  • 从代码学习深度强化学习 - DDPG PyTorch版
  • [Python 基础课程]列表
  • 【DataLoader的使用】
  • 力扣 hot100 Day43
  • Actor-Critic重要性采样原理
  • java valueOf方法
  • 【算法】贪心算法入门
  • SwiftUI 7 新 WebView:金蛇出洞,网页江湖换新天
  • 一些git命令
  • 若依框架集成阿里云OSS实现文件上传优化
  • 对于muduo我自己的理解
  • UniHttp生命周期钩子与公共参数实战:打造智能天气接口客户端
  • flask校园学科竞赛管理系统-计算机毕业设计源码12876
  • SPSSPRO:数据分析市场SaaS挑战者的战略分析
  • JAVA并发——什么是AQS?
  • Mapbox GL初探
  • 【unitrix】 5.0 第二套类型级二进制数基本结构体(types2.rs)
  • 16.使用ResNet网络进行Fashion-Mnist分类
  • css如何同时给元素设置背景和背景图?
  • 每日算法刷题Day47:7.13:leetcode 复习完滑动窗口一章,用时2h30min
  • 说实话,统计分析用Python这5个第三方库就够了