python asyncio的作用
协程是可以暂停运行和恢复运行的函数。协程函数是用async
定义的函数。它与普通的函数最大的区别是,当执行的时候不会真的执行里面的代码,而是返回一个协程对象,在执行协程对象时才执行里面真正的代码。
例如代码:
async def coroutine_function():print("this is a coroutine function")print(coroutine_function())
执行结果:
<coroutine object coroutine_function at 0x10a7fc7c0>
/Users/4bu/code/neimeng-python/test/test_async.py:5: RuntimeWarning: coroutine 'coroutine_function' was never awaitedprint(coroutine_function())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
从打印结果可以看出返回的是一个协程对象coroutine object
,当运行协程对象才会执行里面的代码,并且可以暂停运行和恢复运行。
当出现await
时,就会暂停运行,让出控制权,等await
后的函数执行完成后,再请求控制权回来恢复运行(拥有控制权的协程可以运行,没有控制权的只有等待)。
当协程暂停运行的时候,CPU开始事件循环
,用来调度协程执行,握着控制权,循环往复做三件事情:
- 检查协程。拿到控制权后,就开始检查有没有可以执行的协程。
- 让出控制。将控制权传递给可以执行的协程。
- 等待协程。等当前协程暂停或者执行完成,放开控制权给自己。然后再回到第一步。
事件循环如何知道哪些协程可以执行,哪些协程不可以执行,这就需要任务
。任务是对协程的封装,除了包含协程本身,还包含协程的状态,比如准备执行,正在执行,已完成等等。让事件循环知道协程是否可以运行。只要一个协程被封装为任务,那么就会被事件循环调度执行。
不添加协程的代码
例如不添加协程的代码:
from time import sleep, perf_counterdef fetch_url(url):print(f"Fetching {url}")sleep(1)print(f"Finished {url}")return 'url_content'def read_file(file_path):print(f"Reading {file_path}")sleep(1)print(f"Finished {file_path}")def main():url = 'example.com'file_path = 'example.txt'fetch_result = fetch_url(url)read_result = read_file(file_path)if __name__ == '__main__':start = perf_counter()main()end = perf_counter()print(f"Time taken: {end - start}")
输出结果:
Fetching example.com
Finished example.com
Reading example.txt
Finished example.txt
Time taken: 2.0090410669999983
没有使用协程的方式,是用时2秒。
编写协程
- 定义协程函数,在需要暂停的地方使用
await
- 将协程包装为
任务
- 建立
事件循环
将其改写为使用协程的方式进行:
from time import sleep, perf_counter
import asyncioasync def fetch_url(url):print(f"Fetching {url}")# 如何保证在当前协程暂停的情况下,await后的函数能够执行# 那就是await后的函数也必须是协程函数,因此需要使用asyncio.sleep()替换sleep()# await同时会将后面的协程包装乘任务,让事件循环调度await asyncio.sleep(1)print(f"Finished {url}")return 'url_content'async def read_file(file_path):print(f"Reading {file_path}")await asyncio.sleep(1)print(f"Finished {file_path}")async def main():url = 'example.com'file_path = 'example.txt'task1 = asyncio.create_task(fetch_url(url))task2 = asyncio.create_task(read_file(file_path))fetch_result = await task1read_result = await task2print(fetch_result)print(read_result)if __name__ == '__main__':start = perf_counter()# asyncio.run(main())main()end = perf_counter()print(f"Time taken: {end - start}")
await
表明了当前协程要暂停运行,等完成了,后面的语句才会继续运行。如果await
后面是一个协程,则需要包装成一个任务,但如果已经是一个协程任务了,那就不需要再包装了。等到await
后的协程执行完了,返回await
的协程执行结果。
即await
的作用:
- 暂停当前协程
- 包装
await
后的协程为任务 - 获取
await
后的协程结果
输出结果如下:
Fetching example.com
Reading example.txt
Finished example.com
Finished example.txt
url_content
file_content
Time taken: 1.002050409999356
实现协程异步的方式:
- 定义协程函数
- 包装协程为任务
- 建立事件循环
将协程包装为任务有两种方式:
- 手动。先用一个语句创建
task
,然后再用另一个语句获取执行结果。这种方式可以检查task
的执行状态,或者执行取消task
。上面的方式就是手动方式。 - 自动。一个语句接收协程后,直接返回执行结果。这种方式更方便简洁。
自动包装协程任务
- asyncio.gather(),会等所有协程执行完成后才返回结果,代码如下:
from os import read
from time import sleep, perf_counter
import asyncioasync def fetch_url(url):print(f"Fetching {url}")# 如何保证在当前协程暂停的情况下,await后的函数能够执行# 那就是await后的函数也必须是协程函数,因此需要使用asyncio.sleep()替换sleep()# await同时会将后面的协程包装成任务,让事件循环调度await asyncio.sleep(1)print(f"Finished {url}")return 'url_content'async def read_file(file_path):print(f"Reading {file_path}")await asyncio.sleep(1)print(f"Finished {file_path}")return 'file_content'async def main():url = 'example.com'file_path = 'example.txt'result = await asyncio.gather(fetch_url(url), read_file(file_path))print(result)if __name__ == '__main__':start = perf_counter()# asyncio.run()会创建一个事件循环,然后将main()函数包装成任务,让事件循环调度asyncio.run(main())end = perf_counter()print(f"Time taken: {end - start}")
输出结果如下:
Fetching example.com
Reading example.txt
Finished example.com
Finished example.txt
['url_content', 'file_content']
Time taken: 1.0032507400010218
- asyncio.as_completed,不会等所有协程都完成后才返回,而是有一个运行完就返回一个结果
from os import read
from time import sleep, perf_counter
import asyncioasync def fetch_url(url):print(f"Fetching {url}")# 如何保证在当前协程暂停的情况下,await后的函数能够执行# 那就是await后的函数也必须是协程函数,因此需要使用asyncio.sleep()替换sleep()# await同时会将后面的协程包装成任务,让事件循环调度await asyncio.sleep(1)print(f"Finished {url}")return 'url_content'async def read_file(file_path):print(f"Reading {file_path}")await asyncio.sleep(1)print(f"Finished {file_path}")return 'file_content'async def main():url = 'example.com'file_path = 'example.txt'# 返回迭代器,按照协程完成的顺序依次输出results = asyncio.as_completed([fetch_url(url), read_file(file_path)])for result in results:# 使用await获取执行结果print(await result)if __name__ == '__main__':start = perf_counter()# asyncio.run()会创建一个事件循环,然后将main()函数包装成任务,让事件循环调度asyncio.run(main())end = perf_counter()print(f"Time taken: {end - start}")
其他的异步库
除了asyncio
之外,处理请求可以使用aiohttp
,处理文件可以使用aiofiles
pip install aiohttp
pip install aiofiles
from os import read
from time import sleep, perf_counter
import asyncio
import aiohttp
import aiofiles
import sslasync def fetch_url(url):async with aiohttp.ClientSession() as session:async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:return await response.text()async def read_file(file_path):async with aiofiles.open(file_path, 'r') as f:return await f.read() # read the entire file as a string and return it as a coroutine.async def main():# url = 'http://jsonplaceholder.typicode.com/posts'url = 'http://www.baidu.com'file_path = 'example.txt'results = asyncio.as_completed([fetch_url(url), read_file(file_path)])for result in results:print(await result)if __name__ == '__main__':start = perf_counter()# asyncio.run()会创建一个事件循环,然后将main()函数包装成任务,让事件循环调度asyncio.run(main())end = perf_counter()print(f"Time taken: {end - start}")
在新线程中运行同步函数
如何在新线程中运行同步函数,不阻塞事件循环。
from os import read
from time import sleep, perf_counter
import asyncio
import aiohttp
import aiofiles
import sslasync def fetch_url(url):async with aiohttp.ClientSession() as session:async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:return await response.text()async def read_file(file_path):async with aiofiles.open(file_path, 'r') as f:return await f.read() # read the entire file as a string and return it as a coroutine.# 这个函数可以在异步上下文中运行,但是它是一个阻塞函数,所以需要使用asyncio.to_thread()将其包装成一个协程。
# 这样,这个函数就可以在异步上下文中运行了。
def foo(*args):sleep(1)return 'foo'async def main():# url = 'http://jsonplaceholder.typicode.com/posts'url = 'http://www.baidu.com'file_path = 'example.txt'results = asyncio.as_completed([fetch_url(url), read_file(file_path), asyncio.to_thread(foo, 'bar')])for result in results:print(await result)if __name__ == '__main__':start = perf_counter()# asyncio.run()会创建一个事件循环,然后将main()函数包装成任务,让事件循环调度asyncio.run(main())end = perf_counter()print(f"Time taken: {end - start}")
参考B站学习视频