当前位置: 首页 > news >正文

一文入门python中的进程、线程和协程

文章目录

  • 进程(Process)和线程(Thread)
    • 多进程
      • multiprocessing
      • Pool
      • 子进程
      • 进程间通信
    • 多线程
      • Lock
        • 单例模式
      • 多核CPU
      • ThreadLocal
    • 进程 vs. 线程
      • 线程切换
      • 计算密集型 vs. IO密集型
      • 异步IO
  • 协程
    • 关键字
        • async
        • await
    • asyncio.sleep
    • 任务
      • 创建任务
      • 同时运行多个任务
      • 取消任务和设置超时
        • 取消任务
        • 设置超时并使用wait_for执行取消
  • 参考文献

进程(Process)和线程(Thread)

现代操作系统(Mac OS X,UNIX,Linux,Windows)都是支持“多任务”的操作系统
“多任务”:操作系统可以同时运行多个任务⇒\Rightarrow打个比方,你一边在用浏览器上网,一边在听MP3,一边在用Word赶作业,至少同时有3个任务正在运行
单核CPU执行多任务的方式:操作系统轮流让各个任务交替执行→\to任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去
只能在多核CPU上实现「真正的并行执行多任务」
对于操作系统而言,一个任务就是一个进程(Process)⇒\Rightarrow比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就是启动一个记事本进程
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,进程内的这些“子任务”被称为线程(Thread)
由于每个进程至少要干一件事,所以,一个进程至少有一个线程
线程是最小的执行单元,而进程由至少一个线程组成
在python中,多任务的实现方式:多进程模式;多线程模式;多进程+多线程模式

多进程

fork():Unix/Linux操作系统中的系统调用,不同于普通函数的一次调用返回一次,fork()的一次调用将返回两次⇒\Rightarrow原因:操作系统自动把当前父进程复制一份为子进程,然后分别在父进程和子进程内返回→\to子进程永远返回0,而父进程返回子进程的ID
在python的父进程中创建子进程

import osprint('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/macOS:
pid = os.fork()
if pid == 0:print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

运行结果

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

在Unix/Linux操作系统中,通过fork()调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务

multiprocessing

multiprocessing模块通过提供Process类来表示一个进程对象

from multiprocessing import Process
import os# 子进程要执行的代码
def run_proc(name):print('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__':print('Parent process %s.' % os.getpid())p = Process(target=run_proc, args=('test',))print('Child process will start.')p.start()p.join()print('Child process end.')

执行结果

Parent process 928.
Child process will start.
Run child process test (929)...
Process end.
  • start():启动进程
  • join():等待子进程结束后再继续往下运行

Pool

可以通过进程池的方式启动大量的子进程

from multiprocessing import Pool
import os, time, randomdef long_time_task(name):print('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__':print('Parent process %s.' % os.getpid())p = Pool(4)for i in range(5):p.apply_async(long_time_task, args=(i,))print('Waiting for all subprocesses done...')p.close()p.join()print('All subprocesses done.')

执行结果

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
  • apply_async()异步提交任务,任务提交后,主进程可以立即继续执行,不会阻塞等待结果
  • apply():同步提交任务,必须等函数执行完成才能继续下一步
  • close():调用之后不能继续添加新的Process
  • join():等待所有子进程执行完毕

子进程

在外部进程作为子进程时,需要控制子进程的输入和输出
subprocess模块:启动一个子进程,然后控制其输入和输出
在python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的

import subprocessprint('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
  • nslookup(name server lookup):用于查询 DNS(域名系统) 信息
    运行结果
$ nslookup www.python.org
Server:		192.168.19.4
Address:	192.168.19.4#53Non-authoritative answer:
www.python.org	canonical name = python.map.fastly.net.
Name:	python.map.fastly.net
Address: 199.27.79.223Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入

import subprocessprint('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
  • communicate():向子进程 stdin 写入字节串→\to等待子进程执行完成→\to返回输出和错误信息

上面的代码相当于在命令行执行命令nslookup,然后手动输入

set q=mx       # 设置查询类型为 MX(邮件服务器)
python.org     # 查询 python.org 域名
exit           # 退出 nslookup

运行结果

$ nslookup
Server:		192.168.19.4
Address:	192.168.19.4#53Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.Authoritative answers can be found from:
mail.python.org	internet address = 82.94.164.166
mail.python.org	has AAAA address 2001:888:2000:d::a6Exit code: 0

进程间通信

multiprocessing模块提供了Queue、Pipes等多种方式来交换数据
以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据

from multiprocessing import Process, Queue
import os, time, random# 写数据进程执行的代码:
def write(q):print('Process to write: %s' % os.getpid())for value in ['A', 'B', 'C']:print('Put %s to queue...' % value)q.put(value)time.sleep(random.random())# 读数据进程执行的代码:
def read(q):print('Process to read: %s' % os.getpid())while True:value = q.get(True)print('Get %s from queue.' % value)if __name__=='__main__':# 父进程创建Queue,并传给各个子进程:q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))# 启动子进程pw,写入:pw.start()# 启动子进程pr,读取:pr.start()# 等待pw结束:pw.join()# pr进程里是死循环,无法等待其结束,只能强行终止:pr.terminate()

运行结果

Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成
进程是由若干线程组成的,一个进程至少有一个线程
python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装

import time, threading# 新线程执行的代码:
def loop():print('thread %s is running...' % threading.current_thread().name)n = 0while n < 5:n = n + 1print('thread %s >>> %s' % (threading.current_thread().name, n))time.sleep(1)print('thread %s ended.' % threading.current_thread().name)print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

执行结果

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程

Lock

多线程和多进程

  • 多进程:同一个变量,各自有一份拷贝存在于每个进程中,互不影响
  • 多线程:所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改

线程共享数据的最大风险在于,当多个线程同时修改同一个变量时,可能会导致数据被破坏或出现错误

# multithread
import time, threading# 假定这是你的银行存款:
balance = 0def change_it(n):# 先存后取,结果应该为0:global balancebalance = balance + nbalance = balance - ndef run_thread(n):for i in range(10000000):change_it(n)t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

我们先定义了一个共享变量 balance,初始值设为 0。然后启动两个线程:一个负责存入,一个负责取出。按理说,最终结果应该仍然是 0。然而,由于线程调度由操作系统控制,当线程 t1t2 交替执行时,如果循环次数足够多,balance 的最终结果就可能偏离 0
高级语言中的一条语句在 CPU 中往往需要分解为多条指令来执行

balance = balance + n

也分两步:

  1. 计算balance + n,存入临时变量中
  2. 将临时变量的值赋给balance

由于x是局部变量,两个线程各自都有自己的x

初始值 balance = 0t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0结果 balance = 0

但是t1t2是交替运行的,如果操作系统以下面的顺序执行t1t2

初始值 balance = 0t1: x1 = balance + 5  # x1 = 0 + 5 = 5t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2      # balance = -8结果 balance = -8

由于修改 balance 并不是一条原子操作,而是由多条语句组成的。在这些语句执行的过程中,线程可能会被切换,从而导致多个线程同时修改同一个对象,结果就把数据弄乱了
为了确保 balance 的计算结果正确,需要在 change_it() 方法外加上一把锁。当某个线程开始执行 change_it() 时,它会先获取锁,因此其他线程必须等待,直到该锁被释放后才能继续执行。由于同一时间最多只有一个线程能够持有这把锁,就避免了多个线程同时修改 balance 所造成的冲突。在 python 中,可以通过 threading.Lock() 来创建这样一把锁

balance = 0
lock = threading.Lock()def run_thread(n):for i in range(100000):# 先要获取锁:lock.acquire()try:# 放心地改吧:change_it(n)finally:# 改完了一定要释放锁:lock.release()

当多个线程同时调用 lock.acquire() 时,只有一个线程能成功拿到锁并继续执行,其余线程则会阻塞等待,直到锁被释放后才能依次获取并运行
线程在获取锁后,必须在使用完毕后及时释放。如果不释放,其他等待锁的线程就会一直被阻塞,最终造成死锁。为避免这种情况,通常会使用 try...finally 结构来确保锁一定能被释放
锁的优点在于,它能保证某段关键代码只能由一个线程完整执行,从而避免并发冲突。但缺点也很明显:首先,锁会阻碍多线程的并行性,使相关代码实际退化为单线程执行,效率因此下降。其次,如果系统中存在多个锁,不同线程各自持有一部分锁并同时尝试获取对方的锁,就可能陷入死锁状态,导致所有相关线程都无法继续运行,也不能正常结束,只能依靠操作系统强制终止

单例模式

单例模式

  • 类只能创建一个实例
  • 全局共享这个实例
import threadingclass Singleton:_instance = None_lock = threading.Lock()  # 类级别锁,保证线程安全def __new__(cls, *args, **kwargs):if not cls._instance:  # 第一层检查,提高效率with cls._lock:    # 加锁,防止多线程同时创建if not cls._instance:  # 第二层检查,防止重复创建cls._instance = super().__new__(cls)return cls._instancedef __init__(self, value=None):self.value = value# 测试多线程
def task(name):obj = Singleton(name)print(f"{name} -> {obj.value}, id: {id(obj)}")threads = []
for i in range(5):t = threading.Thread(target=task, args=(f"Thread-{i}",))threads.append(t)t.start()for t in threads:t.join()

多核CPU

在多核 CPU 上,很多人会认为多个线程能够真正并行执行。那如果我们编写一个死循环线程,会发生什么呢?
通过 Mac OS X 的活动监视器或 Windows 的任务管理器观察,可以发现:一个死循环线程会占满一个 CPU 核心(CPU 使用率 100%)。如果同时运行两个死循环线程,在多核环境下就会占用两个核心(CPU 使用率 200%)。因此,要想让 N 核 CPU 的所有核心都跑满,就需要同时启动 N 个死循环线程

import threading, multiprocessingdef loop():x = 0while True:x = x ^ 1for i in range(multiprocessing.cpu_count()):t = threading.Thread(target=loop)t.start()

如果启动与 CPU 核心数量相同的 N 个线程,在 4 核 CPU 上观察,CPU 使用率可能只有约 102%,也就是只真正使用了一核
而用 C、C++ 或 Java 实现相同的死循环,则可以轻松把所有核心跑满:4 核跑到 400%,8 核跑到 800%。为什么 python 做不到呢?
原因在于 python 虽然有真正的线程,但解释器存在一个 GIL(Global Interpreter Lock,全局解释器锁)。任何 Python 线程在执行前都必须先获取 GIL,每执行大约 100 条字节码后,解释器才会释放 GIL,让其他线程有机会执行。由于这个全局锁,python 的多线程只能交替运行,即使 100 个线程在 100 核 CPU 上,也只能用到一个核心
GIL 是 python 解释器设计的历史遗留问题,尤其是官方的 CPython。要真正利用多核,除非使用不带 GIL 的解释器或进行 C 扩展,否则 python 多线程无法有效利用多核
不过不用太担心:python 虽然不能用多线程充分利用多核,但可以通过多进程实现多核并行。每个 python 进程都有独立的 GIL,互不干扰,因此可以充分利用多核 CPU

ThreadLocal

在多线程环境中,每个线程都有自己的数据。使用线程的局部变量比使用全局变量更安全,因为局部变量只对该线程可见,不会影响其他线程;而全局变量的修改则必须加锁才能避免冲突
不过,局部变量也有缺点:在函数调用时,需要手动传递,使用起来比较麻烦

def process_student(name):std = Student(name)# std是局部变量,但是每个函数都要用它,因此必须传进去:do_task_1(std)do_task_2(std)def do_task_1(std):do_subtask_1(std)do_subtask_2(std)def do_task_2(std):do_subtask_2(std)do_subtask_2(std)

如果每个函数都一层层传递参数,那就太麻烦了。直接用全局变量也不行,因为每个线程处理的都是不同的 Student 对象,不能共享
有没有办法把所有 Student 对象放在一个全局字典里,然后用线程自身作为 key 来获取对应线程的 Student 对象呢?

global_dict = {}def std_thread(name):std = Student(name)# 把std放到全局变量global_dict中:global_dict[threading.current_thread()] = stddo_task_1()do_task_2()def do_task_1():# 不传入std,而是根据当前线程查找:std = global_dict[threading.current_thread()]...def do_task_2():# 任何函数都可以查找出当前线程的std变量:std = global_dict[threading.current_thread()]...

这种方法理论上是可行的,它最大的优点是解决了 Student 对象在每层函数中传递的问题。不过,每个函数都需要去字典里获取对象,代码看起来有些冗长。有没有更简单的办法呢?这时,ThreadLocal 应运而生:它可以自动为每个线程管理数据,不需要手动查找字典

import threading# 创建全局ThreadLocal对象:
local_school = threading.local()def process_student():# 获取当前线程关联的student:std = local_school.studentprint('Hello, %s (in %s)' % (std, threading.current_thread().name))def process_thread(name):# 绑定ThreadLocal的student:local_school.student = nameprocess_student()t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

执行结果

import threading# 创建全局ThreadLocal对象:
local_school = threading.local()def process_student():# 获取当前线程关联的student:std = local_school.studentprint('Hello, %s (in %s)' % (std, threading.current_thread().name))def process_thread(name):# 绑定ThreadLocal的student:local_school.student = nameprocess_student()t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

全局变量 local_school 是一个 ThreadLocal 对象,每个线程都可以读写它的 student 属性,但互不干扰。你可以把 local_school 看作一个全局变量,但它的属性(如 local_school.student)实际上是线程的局部变量,可以自由读写,不需要手动管理锁,ThreadLocal 会在内部处理这些细节
可以把 ThreadLocal 想象成一个特殊的字典,不仅可以用 local_school.student,还可以绑定其他变量,比如 local_school.teacher 等等

进程 vs. 线程

要实现多任务,通常会采用 Master-Worker 模式:Master 负责分配任务,Worker 负责执行任务。因此,多任务环境下一般是一个 Master 配合多个 Worker

  • 多进程实现:主进程作为 Master,其他子进程作为 Worker
    • 优点:稳定性高,一个子进程崩溃不会影响主进程或其他子进程(当然如果 Master 挂掉,所有进程都会受影响,但 Master 只负责分配任务,挂掉的概率低)
    • 缺点:创建进程开销大。在 Unix/Linux 下用 fork 开销还可以接受,但在 Windows 上创建进程代价很高。此外,操作系统能同时调度的进程数有限,如果同时运行几千个进程,操作系统甚至可能连调度都成问题
  • 多线程实现:主线程作为 Master,其他线程作为 Worker
    • 优点:通常比多进程稍快,但提升有限
    • 缺点:危险性大,任何一个线程出错都可能导致整个进程崩溃,因为所有线程共享进程内存。在 Windows 上,如果某个线程执行异常,你经常会看到“该程序执行了非法操作,即将关闭”的提示,实际上只是某个线程出错,但操作系统会强制结束整个进程

线程切换

无论是多进程还是多线程,当数量一多,效率往往不会提升,为什么呢?
我们打个比方:假设你正在准备中考,每天晚上需要完成语文、数学、英语、物理、化学五科作业,每科作业耗时 1 小时

  • 单任务模型:你先花 1 小时做语文作业,再花 1 小时做数学作业,依次完成所有作业,一共需要 5 小时
  • 多任务模型:你尝试轮流做每科作业,每科做 1 分钟再切换到下一科,如此循环。只要切换够快,从表面上看,你似乎同时在做 5 科作业,就像单核 CPU 在执行多任务一样

问题是,切换作业是有代价的。比如从语文切换到数学,你要先收拾语文书、钢笔(保存现场),再打开数学课本、找出直尺圆规(准备新环境)。操作系统在切换进程或线程时也类似:需要先保存当前执行环境(CPU 寄存器、内存页等),再恢复新任务的环境,才能开始执行。虽然切换很快,但仍然消耗时间
如果同时有几千个任务,操作系统可能主要忙于切换任务,而没有足够时间真正执行任务。结果就是,多任务一旦过多,就会耗尽系统资源,效率急剧下降,所有任务都完成得很慢

计算密集型 vs. IO密集型

在决定是否采用多任务时,另一个重要因素是任务类型。任务大体可分为计算密集型和 IO 密集型两类

  • 计算密集型任务:主要消耗 CPU 资源,需要大量计算,例如计算圆周率、高清视频解码等。这类任务虽然可以使用多任务,但任务越多,花在切换任务上的时间也越多,CPU 执行效率反而下降。因此,为了最有效地利用 CPU,计算密集型任务同时运行的数量最好与 CPU 核心数相等。由于计算密集型任务依赖 CPU 性能,代码运行效率非常重要。像 python 这样的脚本语言运行速度较慢,不适合处理计算密集型任务;这类任务最好使用 C 语言编写
  • IO 密集型任务:涉及网络或磁盘操作,CPU 消耗很少,大部分时间都在等待 IO 完成(因为 IO 速度远低于 CPU 和内存)。对于 IO 密集型任务,增加任务数量可以提升 CPU 利用率,但也有上限。常见的 Web 应用大多属于 IO 密集型任务。由于执行期间绝大部分时间花在等待 IO,使用运行速度更快的 C 语言替换 python 并不会显著提高效率。因此,对于 IO 密集型任务,选择开发效率高、代码量少的语言最合适,脚本语言是首选,而 C 语言开发效率相对较低

异步IO

由于 CPU 和 IO 速度差异巨大,任务在执行过程中大部分时间都在等待 IO,如果仍然使用单进程单线程模型,其他任务就无法并行执行。因此,我们才需要多进程或多线程来实现多任务并发
现代操作系统在 IO 方面做了大量优化,其中最重要的就是对异步 IO 的支持。如果充分利用异步 IO,即便是单进程单线程,也能高效执行多任务,这就是所谓的事件驱动模型。例如 Nginx 就是一款支持异步 IO 的 Web 服务器:在单核 CPU 上,它用单进程就能高效处理大量请求;在多核 CPU 上,可以启动与核心数相等的进程,充分利用多核资源。由于系统总进程数有限,操作系统的调度也非常高效
在 python 中,单线程的异步编程模型称为协程

协程

可将协程想象成一个普通的 python 函数,但它具有一个超能力:在遇到可能需要一段时间才能完成的操作时,能够暂停执行。当长时间运行的操作完成时,可唤醒暂停的协程,并执行该协程中的其他代码。当一个暂停的协程正在等待操作完成时,可运行其他代码,等待时其他代码的运行是应用程序并发的原因。还可同时运行多个耗时的操作,这能大大提高应用程序的性能。
当有一个长时间运行的操作时,await 关键字可以让我们暂停协程

关键字

async

async 关键字将函数标记为协程函数,而不是普通的 python 函数

async def coroutine():print("hello world")

这是一个简单的协程函数,不执行任何长时间的操作,它只是输出信息并返回。将上述协程放在事件循环中时,它将立即执行,因为没有任何阻塞I/O,没有任何操作暂停执行

async def coroutine_add_one(number):return number + 1
def add_one(number):return number + 1
function_result = add_one(1)
coroutine_result = coroutine_add_one(1)
print(function_result)
print(type(function_result))
"""
2
<class 'int'>
"""
print(coroutine_result)
print(type(coroutine_result))
"""
<coroutine object coroutine_add_one at 0x000002977045BAC0>
<class 'coroutine'>
"""

调用普通的 add_one 函数时,它会立即执行并返回我们期望的一个整数。但当调用 coroutine_add_one 时,并不会执行协程中的代码,而是得到一个协程对象。当直接调用协程函数时,协程不会被执行。相反,它创建了一个可以稍后执行的协程对象,要执行协程,需要在事件循环中显式执行它。那么如何创建个事件循环并执行协程呢?
使用asyncio.run()函数来运行协程

import asyncio
async def coroutine_add_one(number):return number + 1
coroutine_result = asyncio.run(coroutine_add_one(1))
print(coroutine_result)  # 2

正如我们期望的一样,我们已经正确地将协程放在事件循环中,并且已经执行了它
asyncio.run 在这种情况下完成了一些重要的事情,首先创建了一个全新的事件循环。一旦成功创建,就会接受我们传递给它的任何协程,并运行它直到完成,然后返回结果,一切完成后,它会关闭并结束事件循环

await

asyncio 的真正优势是能暂停执行,让事件循环在长时间运行的操作期间,运行其他任务

import asyncio
async def add_one(number):return number + 1
async def main():# main() 协程将暂停执行,直到 add_one(1) 运行完毕one_plus_one = await add_one(1)# main() 协程将暂停执行,直到 add_one(2) 运行完毕two_plus_one = await add_one(2)print(one_plus_one)print(two_plus_one)
asyncio.run(main())
"""
2
3
"""

在上面的代码中,我们两次暂停执行。首先等待对 add_one(1) 的调用,一旦得到结果,主函数将取消暂停并将 add_one(1) 的返回值分配给变量 one_plus_one。然后对 add_one(2) 执行相同的操作,并输出结果。我们来将应用程序的执行流程可视化一下,如下图所示,图中的每个块代表一行或多行代码在任何给定时刻发生的事情
在这里插入图片描述

asyncio.sleep

使用 asyncio.sleep 让协程休眠给定的秒数,这将在预定的时间内暂停协程。由于 asyncio.sleep 本身是一个协程,所以必须将它与 await 关键字一起使用,如果单独调用它,会得到一个协程对象。既然 asyncio.sleep 是一个协程,这意味着当协程等待它时,其他代码也能够运行

import asyncio
async def hello_world():# 暂停 hello_world 协程一秒钟await asyncio.sleep(1)return "hello world"
async def main():# 暂停 main 协程,直到 hello_world 协程运行完毕message = await hello_world()print(message)
asyncio.run(main())
"""
hello world
"""

运行这个应用程序时,程序将等待 1 秒钟,然后输出打印信息。由于 hello_world 是一个协程,使用 asyncio.sleep 将其暂停 1 秒,因此现在有 1 秒的时间可以同时运行其他代码

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def add_one(number):return number + 1async def hello_world():await delay(1)return "hello world"async def main():# 暂停 main(),直到 add_one(1) 返回    one_plus_one = await add_one(1)# 暂停 main(),直到 hello_world() 返回message = await hello_world()print(one_plus_one)print(message)asyncio.run(main())
"""
开始休眠 1 秒
休眠完成
2
hello world
"""

main 协程里面分别通过 await 驱动 add_one(1)hello_world() 两个协程执行,然后打印它们的返回值,但是在打印 one_plus_one 之前需要等待一秒,因为在 hello_world() 协程里面 sleep 了一秒。但我们真正想要的结果是,在 await sleep 的时候,立刻执行其它的代码,比如立刻打印 one_plus_one,但实际情况却没有。
这是为什么呢?答案是在 await 暂停当前的协程之后、以及 await 表达式给我们一个值之前不会执行该协程中的其他任何代码。因为 hello_world_message 函数需要 1 秒后才能给出一个值,所以主协程将暂停 1 秒。这种情况下,代码表现得好像它是串行的
在这里插入图片描述
事实上从源代码本身也能够理解,因为代码是一行一行写的,所以自然也要一行一行执行。而 await 后面跟一个协程之后,会驱动协程执行,并等到驱动的协程运行完毕之后才往下执行。因此这个逻辑就决定了,await 是串行的,一个 await 执行完毕之后才能执行下一个 await。如果我们想摆脱这种顺序模型,同时运行 add_onehello_world,那么需要引入一个被称为 “任务” 的概念

任务

直接调用协程时,并没有把它放在事件循环中运行,相反会得到一个协程对象。如果想运行,要么通过 asyncio.run,要么在一个协程里面通过 await 关键字进行驱动(在 A 协程里面 await B 协程,如果 A 协程运行了,那么 B 协程也会被驱动)

创建任务

创建任务是通过 asyncio.create_task 函数来实现的,当调用这个函数时,需要给它传递一个协程,然后返回一个任务对象。一旦有了一个任务对象,就可以把它放在一个 await 表达式中,它完成后就会提取返回值

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():# 将 delay(3) 包装成任务,注:包装完之后直接就丢到事件循环里面运行了# 因此这里会立即返回,而返回值是一个 asyncio.Task 对象sleep_for_three = asyncio.create_task(delay(3))print("sleep_for_three:", sleep_for_three.__class__)# 至于协程究竟有没有运行完毕,我们可以通过 Task 对象来查看# 当协程运行完毕或者报错,都看做是运行完毕了,那么调用 Task 对象的 done 方法会返回 True# 否则返回 False,由于代码是立即执行,还没有到 3 秒钟,因此打印结果为 Falseprint("协程(任务)是否执行完毕:", sleep_for_three.done())# 这里则保证必须等到 Task 对象里面的协程运行完毕后,才能往下执行result = await sleep_for_threeprint("协程(任务)是否执行完毕:", sleep_for_three.done())print("返回值:", result)asyncio.run(main())
"""
开始休眠 3 秒
sleep_for_three: <class '_asyncio.Task'>
协程(任务)是否执行完毕: False
休眠完成
协程(任务)是否执行完毕: True
返回值: 3
"""

如果我们直接 await delay(3),那么在打印之前需要至少等待 3 秒,但通过将它包装成任务,会立即扔到事件循环里面运行。此时主程序可以直接往下执行,至于协程到底什么时候执行完毕、有没有执行完毕,则通过 Task 对象(任务)来查看。当然你也可以 await 一个 Task 对象,保证里面的协程运行完毕后才能往下执行

同时运行多个任务

任务是立即创建并计划尽快运行,这允许同时运行许多长时间的任务

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():sleep_for_three = asyncio.create_task(delay(3))sleep_again = asyncio.create_task(delay(3))sleep_once_more = asyncio.create_task(delay(3))await sleep_for_threeawait sleep_againawait sleep_once_moreasyncio.run(main())
"""
开始休眠 3 秒
开始休眠 3 秒
开始休眠 3 秒
休眠完成
休眠完成
休眠完成
"""

在上面的代码中启动了三个任务,每个任务需要 3 秒才能完成。但由于create_task 的每次调用都会立即返回,因此会立即到达 await sleep_for_three语句,并且三个任务都丢到了事件循环,开启执行。由于 asyncio.sleep 属于 IO,因此会进行切换,所以三个任务是并发执行的,这也意味着整个程序会在 3 秒钟左右完成,而不是 9 秒钟
在这里插入图片描述
随着我们添加更多任务,性能提升效果会更明显,比如启动了 10 个这样的任务,仍然只需要大约 3 秒,从而使速度提高 10 倍

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def hello_from_second():for i in range(10):await asyncio.sleep(1)print("你好,我每秒钟负责打印一次")async def main():sleep_for_three = asyncio.create_task(delay(3))sleep_again = asyncio.create_task(delay(3))await hello_from_second()asyncio.run(main())
"""
开始休眠 3 秒
开始休眠 3 秒
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
休眠完成
休眠完成
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
你好,我每秒钟负责打印一次
"""

在实际工作中,不要直接 await 一个协程,而是将协程包装成任务来让它运行。当你的代码逻辑依赖某个任务的执行结果时,再对该任务执行 await,拿到它的返回值

取消任务和设置超时

网络连接可能不可靠,用户的连接可能因为网速变慢而中断,或者网络服务器崩溃导致现有的请求无法处理。因此对于发出的请求,需要特别小心,不要无限期地等待。如果无限期等待一个不会出现的结果,可能导致应用程序挂起,从而导致精糕的用户体验
在之前的示例中,如果任务一直持续下去,我们将被困在等待 await 语句完成而没有反馈的情况,也没有办法阻止这样的事情发生。因此 asyncio 提供了一个机制,允许我们手动取消任务,或者超时之后自动取消

取消任务

取消任务很简单,每个任务对象都有一个名为 cancel 的方法,可以在想要停止任务时调用它。取消一个任务将导致该任务在执行 await 时引发 CancelledError,然后再根据需要处理它
为说明这一点,假设启动了一个长时间运行的任务,但我们不希望它运行的时间超过 5 秒。如果任务没有在 5 秒内完成,就可以停止该任务,并向用户报告:该任务花费了太长时间,我们正在停止它。我们还希望每秒钟都输出一个状态更新,为用户提供最新信息,这样就可以让用户了解任务的运行状态

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():long_task = asyncio.create_task(delay(10))seconds_elapsed = 0while not long_task.done():print("检测到任务尚未完成,一秒钟之后继续检测")await asyncio.sleep(1)seconds_elapsed += 1# 时间超过 5 秒,取消任务if seconds_elapsed == 5:long_task.cancel()try:# 等待 long_task 完成,显然执行到这里的时候,任务已经被取消# 不管是 await 一个已经取消的任务,还是 await 的时候任务被取消# 都会引发 asyncio.CancelledErrorawait long_taskexcept asyncio.CancelledError:print("任务被取消")asyncio.run(main())
"""
检测到任务尚未完成,一秒钟之后继续检测
开始休眠 10 秒
检测到任务尚未完成,一秒钟之后继续检测
检测到任务尚未完成,一秒钟之后继续检测
检测到任务尚未完成,一秒钟之后继续检测
检测到任务尚未完成,一秒钟之后继续检测
检测到任务尚未完成,一秒钟之后继续检测
任务被取消
"""

在代码中我们创建了一个任务,它需要花费 10 秒的时间才能运行完成。然后创建一个 while 循环来检查该任务是否已完成,任务的 done 方法在任务完成时返回 True,否则返回 False。每一秒,我们检查任务是否已经完成,并记录到目前为止经历了多少秒。如果任务已经花费了 5 秒,就取消这个任务。然后来到 await long_task,将输出 “任务被取消”,这表明捕获了一个 CancelledError
关于取消任务需要注意的是,CancelledError 只能从 await 语句抛出

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():long_task = asyncio.create_task(delay(3))# 立刻取消long_task.cancel()# 但 CancelledError 只有在 await 取消的协程时才会触发# 所以下面的语句会正常执行print("我会正常执行")print("Hello World")print(list(range(10)))await asyncio.sleep(5)try:# 引发 CancelledErrorawait long_taskexcept asyncio.CancelledError:print("任务被取消")asyncio.run(main())
"""
我会正常执行
Hello World
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
任务被取消
"""

注意:如果任务在取消的时候已经运行完毕了,那么 await 的时候就不会抛 CancelledError

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():long_task = asyncio.create_task(delay(3))await asyncio.sleep(5)# 显然执行到这里,任务已经结束了long_task.cancel()try:await long_taskprint("任务执行完毕")except asyncio.CancelledError:print("任务被取消")asyncio.run(main())
"""
开始休眠 3 秒
休眠完成
任务执行完毕
"""

所以对一个已完成的任务调用 cancel 方法,没有任何影响

设置超时并使用wait_for执行取消

每秒(或其他时间间隔)执行检查然后取消任务,并不是处理超时的最简单方法。理想情况下,我们应该有一个辅助函数,它允许指定超时并自动取消任务。
asyncio 通过名为 asyncio.wait_for 的函数提供此功能,该函数接收协程或任务对象,以及以秒为单位的超时时间。如果任务完成所需的时间超过了设定的超时时间,则会引发 TimeoutException,任务将自动取消
为说明 wait_for 的工作原理,我们使用一个案例来说明:有一个任务需要 2 秒才能完成,但我们将它的超时时间设定为 1 秒。当得到一个 TimeoutError 异常时,我们将捕获异常,并检查任务是否被取消

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():delay_task = asyncio.create_task(delay(2))try:result = await asyncio.wait_for(delay_task, 1)print("返回值:", result)except asyncio.TimeoutError:print("超时啦")# delay_task.cancelled() 用于判断任务是否被取消# 任务被取消:返回 True,没有被取消:返回 Falseprint("任务是否被取消:", delay_task.cancelled())asyncio.run(main())
"""
开始休眠 2 秒
超时啦
任务是否被取消: True
"""

应用程序运行 1 秒后,wait_for 语句将引发 TimeoutError,然后我们对其进行处理,并且 delay_task 被取消了。所以当一个任务超时的时候,会被自动取消
所以通过 wait_for 语句就很方便,如果直接 await 一个任务,那么必须等到任务完成之后才能继续往下执行。如果任务一直完成不了,那么就会一直陷入阻塞。我们的目的是希望这个任务的执行时间是可控的,那么便可以使用 wait_for 并指定超时时间。注:使用 wait_for 必须要搭配 await,阻塞等待任务完成并拿到返回值、或者达到超时时间引发 TimeoutError 之后,程序才能往下执行
因此 await 任务” 和 “await asyncio.wait_for(任务, timeout)” 的效果是类似的,都是等待后面的任务完成并拿到它的返回值。但使用 wait_for 可以指定超时时间,在规定时间内如果没有完成,则抛出 TimeoutError,而不会一直陷入阻塞
如果任务花费的时间比预期的长,在引发 TimeoutError 之后自动取消任务通常是个好主意。否则,可能有一个协程无限期地等待,占用永远不会释放的资源。但在某些情况下,我们可能希望保持协程运行。例如,我们可能想通知用户:某任务花费的时间比预期的要长,但即便超过了规定的超时时间,也不取消该任务。为此,可使用 asyncio.shield 函数包装任务,这个函数将防止传入的协程被取消,会给它一个屏蔽,将取消请求将忽略掉

import asyncioasync def delay(seconds):print(f"开始休眠 {seconds} 秒")await asyncio.sleep(seconds)print(f"休眠完成")return secondsasync def main():delay_task = asyncio.create_task(delay(2))try:# 通过 asyncio.shield 将 delay_task 保护起来result = await asyncio.wait_for(asyncio.shield(delay_task), 1)print("返回值:", result)except asyncio.TimeoutError:print("超时啦")# 如果超时依旧会引发 TimeoutError,但和之前不同的是# 此时任务不会被取消了,因为 asyncio.shield 会将取消请求忽略掉print("任务是否被取消:", delay_task.cancelled())# 从出现超时的地方,继续执行,并等待它完成result = await delay_taskprint("返回值:", result)asyncio.run(main())
"""
开始休眠 2 秒
超时啦
任务是否被取消: False
休眠完成
返回值: 2
"""

参考文献

1、进程和线程
2、《asyncio 系列》详解 asyncio 的协程、任务、future,以及事件循环


文章转载自:

http://AQuWe3dr.rfmzs.cn
http://Qq4OSEtC.rfmzs.cn
http://3y4RXsMW.rfmzs.cn
http://qabTzlW6.rfmzs.cn
http://D6A3eiji.rfmzs.cn
http://B2x57H0S.rfmzs.cn
http://ofLu5sKI.rfmzs.cn
http://W8ycNZN6.rfmzs.cn
http://gMm1TlxS.rfmzs.cn
http://ZqTCgiSP.rfmzs.cn
http://Wqvqbrtv.rfmzs.cn
http://TDGHkfl8.rfmzs.cn
http://UHrlxs5r.rfmzs.cn
http://kG4OhMW1.rfmzs.cn
http://fqNOcCuk.rfmzs.cn
http://ebe0pwgI.rfmzs.cn
http://uQfHgoph.rfmzs.cn
http://nnjrK75s.rfmzs.cn
http://z1TXEDnz.rfmzs.cn
http://nDWRR2Ix.rfmzs.cn
http://ozSKAzYj.rfmzs.cn
http://8B8cXY97.rfmzs.cn
http://41r1E6by.rfmzs.cn
http://vIcJEkPP.rfmzs.cn
http://GMRk2KsC.rfmzs.cn
http://LTf8seoU.rfmzs.cn
http://Ubaunt1l.rfmzs.cn
http://UnSbleGm.rfmzs.cn
http://HQPpBQKc.rfmzs.cn
http://ToZ2pk5z.rfmzs.cn
http://www.dtcms.com/a/383278.html

相关文章:

  • Tempus Fugit: 3靶场
  • XXL-JOB-Admin后台手动执行任务传参过长被截断问题解决
  • 【AI推理部署】Docker篇02—Docker 快速入手
  • 【C语言描述】《数据结构和算法》一 绪论与时间、空间复杂度
  • 服务器 - 从一台服务器切换至另一台服务器(损失数十条访客记录)
  • 【Android】View 交互的事件处理机制
  • 软考中级信息安全与病毒防护知识点
  • 贪心算法应用:量子密钥路径选择问题详解
  • 【算法】【链表】160.相交链表--通俗讲解
  • v-model与.aync的区别
  • 淘宝返利app的前端性能优化:从资源加载到首屏渲染的全链路优化
  • 【LeetCode】38. 外观数列
  • ZYNQ7020 Bank划分
  • 【2025】Office核心组件Microsoft word,Excel,PowerPoint详细使用指南
  • ARM编译器的__inline和 __forceinline
  • Zookeeper介绍与部署(Linux)
  • [硬件电路-216]:电场是什么?只有正电荷或只有负电荷,能产生电场吗?
  • pthread_mutex_lock函数深度解析
  • 【记录】初赛复习 Day1
  • 深入理解跳表(Skip List):原理、实现与应用
  • SciKit-Learn 全面分析 20newsgroups 新闻组文本数据集(文本分类)
  • 使用 Neo4j 和 Ollama 在本地构建知识图谱
  • 【愚公系列】《人工智能70年》018-语音识别的历史性突破(剑桥语音的黄金十年)
  • Debezium日常分享系列之:MongoDB 新文档状态提取
  • Linux 日志分析:用 ELK 搭建个人运维监控平台
  • docker内如何用ollama启动大模型
  • Flask学习笔记(二)--路由和变量
  • FlashAttention(V3)深度解析:从原理到工程实现-Hopper架构下的注意力机制优化革命
  • 一文入门:机器学习
  • Uniswap:DeFi领域的革命性交易协议