【Python】并发——线程
目录
- 并发
- 并发与并行的基本概念
- 线程
- 线程基础
- 守护线程
- 线程同步
- 线程安全与竞态条件
- 锁
- 可重入锁 RLock
- 死锁
- 信号量
- 条件变量
- 线程池
并发
并发与并行的基本概念
什么是并发(Concurrency)
- 并发:同一时间段内,程序可以交替处理多个任务。
(并不一定是同时执行,而是“切换得很快”) - 例子:你边下载文件边看视频,电脑在不同任务之间快速切换。
并发更像是“时间片轮转”。
什么是并行(Parallelism)
- 并行:多个任务在真正的同一时刻同时执行。
- 例子:一台电脑有 4 个 CPU 核心,4 个任务可以同时各跑在一个核心上。
并行是“同时进行”。
Python 的 GIL(全局解释器锁)
- Python(CPython 实现)有一个 全局解释器锁 (Global Interpreter Lock)。
- 它的作用:保证同一时刻只有一个线程执行 Python 字节码。
- 影响:
- 多线程在 CPU 密集型任务上不能提升速度(因为 GIL 限制)。
- 多线程在 IO 密集型任务上仍然有效(因为等待 IO 时会释放 GIL)。
IO 密集型 vs CPU 密集型
- IO 密集型任务:主要消耗在输入/输出(磁盘读写、网络请求)。适合用 多线程 或 异步 IO。
- CPU 密集型任务:主要消耗在计算(数学运算、图像处理)。适合用 多进程。
Python 并发的三大方式
- 多线程 (
threading
) → IO 密集型最佳 - 多进程 (
multiprocessing
) → CPU 密集型最佳 - 异步 IO (
asyncio
) → 大量 IO(高并发场景)
模拟 并发下载 和 并行计算 的区别:
import timedef io_task():print("开始下载...")time.sleep(2) # 模拟网络 IOprint("下载完成!")def cpu_task():print("开始计算...")sum(x * x for x in range(10**6)) # 模拟CPU计算print("计算完成!")# 串行执行
start = time.time()
io_task()
cpu_task()
print("总耗时:", time.time() - start)
运行后你会看到:先花 2 秒下载,再花 1 秒计算,总共约在 2.05~2.1 秒之间。
线程
线程基础
什么是线程?
- 线程(Thread) 是程序中最小的执行单元。
- 一个进程(Process)里可以有多个线程,线程共享进程的内存空间。
- 在 Python 里,多线程由
threading
模块 提供。
适合 IO 密集型任务(等待网络、文件读写)。
创建线程的两种方式
方式一:直接用 threading.Thread(target=...)
import threading
import timedef worker():print("线程开始工作")time.sleep(2)print("线程结束")# 创建线程
t = threading.Thread(target=worker)# 启动线程
t.start()# 等待线程结束
t.join()print("主线程结束")
输出顺序:
线程开始工作
(等待 2 秒)
线程结束
主线程结束
-
threading.Thread(target=worker)
:创建线程对象-
target
参数指定子线程要执行的函数(这里是worker
),但此时子线程并未启动(只是创建了对象)。 -
若需要给
worker
传参,可通过args
或kwargs
参数(例如threading.Thread(target=worker, args=(10,))
)。
-
-
t.start()
:启动子线程-
必须调用
start()
才能让子线程真正运行(不能直接调用t.run()
,否则会变成 “主线程串行执行worker
,失去多线程意义”)。 -
启动后,子线程会独立于主线程执行
target
函数,两者的执行逻辑是 “并行” 的(但受 Python GIL 影响,CPU 密集型任务无法真正多核并行,IO 密集型任务可体现并发优势)。
-
-
t.join()
:主线程等待子线程-
作用:让主线程暂停执行,直到子线程终止(相当于 “主线程等子线程下班,再一起下班”)。
-
若不加
t.join()
,主线程会 “不等子线程”,直接执行后续代码,输出顺序会变成:线程开始工作 主线程结束 # 主线程先退出 (等待 2 秒后) 线程结束 # 子线程后退出(但主线程已结束,程序可能提前终止)
-
方式二:继承 threading.Thread
类
import threading
import timeclass MyThread(threading.Thread):def run(self):print(f"{self.name} 开始工作")time.sleep(2)print(f"{self.name} 结束")# 创建并启动线程
t = MyThread()
t.start()
t.join()
具体执行流程:
- 创建线程实例:
t = MyThread()
→ 实例化自定义线程类,此时子线程未启动。 - 启动子线程:
t.start()
→ 触发Thread
基类的start()
方法,该方法会创建新的子线程,并在子线程中调用你重写的run()
方法。 - 子线程执行
run()
:- 打印
{self.name} 开始工作
(self.name
是线程名,默认格式为Thread-1
、Thread-2
等); - 执行
time.sleep(2)
(子线程阻塞 2 秒,模拟 IO 耗时); - 打印
{self.name} 结束
,子线程终止。
- 打印
- 主线程等待:
t.join()
→ 主线程阻塞,直到子线程执行完run()
方法,再继续往下执行(此处主线程后续无额外代码,程序直接退出)。
注意事项:
- 必须重写
run()
方法:若自定义线程类不重写run()
,会执行Thread
基类的run()
(空实现),子线程什么都不做。 - 不能直接调用
run()
:若写t.run()
而非t.start()
,会变成 “主线程直接执行run()
方法”(无新线程创建,失去多线程意义)。 - GIL 影响:Python 的全局解释器锁(GIL)导致多线程在 CPU 密集型任务(如大量计算)中无法真正 “多核并行”,但在 IO 密集型任务(如
time.sleep
、网络请求)中能体现并发优势(避免主线程阻塞)。
两种创建线程的方式
实现方式 | 核心逻辑 | 优点 | 适用场景 |
---|---|---|---|
继承 Thread 类 | 重写 run() 方法,start() 自动调用 | 可封装更多属性 / 方法(如自定义线程名、状态变量),逻辑更集中 | 线程逻辑复杂(需维护线程专属状态、多方法协作) |
传 target 函数 | Thread(target=函数) ,start() 调用该函数 | 代码简洁,无需定义类,适合简单逻辑 | 线程仅执行单一函数(如简单的下载、计算任务) |
推荐 方式一(函数式),更简单;方式二(类继承)更适合大型项目。
查看线程信息
import threading
import timedef task():print("当前线程:", threading.current_thread().name)time.sleep(1)# 主线程
print("主线程:", threading.main_thread().name)# 子线程
t1 = threading.Thread(target=task, name="子线程1")
t2 = threading.Thread(target=task, name="子线程2")t1.start()
t2.start()
t1.join()
t2.join()
关键函数:
threading.current_thread()
→ 获取当前线程对象threading.main_thread()
→ 获取主线程threading.active_count()
→ 获取当前活跃线程数threading.enumerate()
→ 列出所有活跃线程
守护线程
守护线程是什么?
- 守护线程是随着主线程(Main Thread)结束而自动退出的线程。
- **非守护线程(普通线程)**在主线程结束后,如果仍有线程在运行,程序会等待这些线程完成才退出。
简单比喻:
- 主线程 = 主体程序
- 守护线程 = 背景服务(比如日志、监控、自动保存)
- 程序结束时,后台服务不阻止程序退出。
如何创建守护线程
- 可以在 创建线程时指定
daemon=True
:
import threading
import timedef background_task():while True:print("后台线程运行中...")time.sleep(1)t = threading.Thread(target=background_task, daemon=True)
t.start()time.sleep(3)
print("主线程结束")
特点:
- 守护线程会在主线程结束时立刻被杀掉。
- 不会阻止程序退出。
- 常用于执行后台、周期性、辅助性的任务。
守护线程与非守护线程的区别
特性 | 守护线程(Daemon) | 非守护线程(普通线程) |
---|---|---|
是否阻止程序退出 | 不阻止 | 阻止,程序会等它结束 |
主线程退出后 | 会自动被杀掉 | 会继续运行,程序等它完成 |
常用场景 | 日志记录、监控、心跳、后台清理 | 业务逻辑任务、必须完成的计算任务 |
注意事项
-
不要在守护线程中执行关键任务
因为主线程一结束,守护线程会被立即杀掉,未完成的工作可能丢失。 -
守护线程可以通过
setDaemon(True)
方法设置(旧用法)t = threading.Thread(target=background_task) t.setDaemon(True) t.start()
-
主线程退出时守护线程不会执行
finally
或清理代码
所以不要在守护线程中依赖资源释放逻辑。
实例:守护线程 vs 普通线程
import threading
import timedef daemon_task():while True:print("守护线程运行中...")time.sleep(1)def normal_task():for i in range(3):print("普通线程运行中...", i)time.sleep(1)# 守护线程
t1 = threading.Thread(target=daemon_task, daemon=True)
t1.start()# 普通线程
t2 = threading.Thread(target=normal_task)
t2.start()time.sleep(2)
print("主线程结束")
运行结果分析:
- 守护线程 t1 会在主线程结束时立即停止(可能只打印 1-2 次)。
- 普通线程 t2 会继续运行,直到完成 3 次打印,程序才真正退出。
典型使用场景
- 后台日志:程序运行时收集日志,但程序退出时无需等待日志线程完成。
- 监控/心跳:周期性检查任务状态或网络连接。
- 定时清理:后台删除临时文件或缓存。
线程同步
线程安全与竞态条件
什么是线程安全?
- 线程安全(Thread-Safety) 指的是:当多个线程同时访问共享资源时,程序仍能正确执行,不会导致数据错误或程序崩溃。
- 如果一个函数、类或者数据结构是“线程安全”的,意味着我们可以在多线程环境中放心使用它,而不需要额外的同步措施。
比如:Python 的 queue.Queue
就是线程安全的,因为它内部已经做了同步控制。
什么是竞态条件?
- 竞态条件(race condition)发生在多个线程同时读写共享资源,而结果取决于线程调度顺序,导致程序行为不可预测。
- 竞态条件不是必然出错,而是 有时候对,有时候错,这种“偶发性”很危险。
通俗比喻:
- 两个人一起往同一个储蓄罐里存钱,每个人都“先取出罐子 → 计算新余额 → 放回去”。
- 如果他们同时操作,可能会导致一个人的修改被另一个覆盖,结果存进去的钱比实际少。
为什么会发生竞态条件?
因为线程在执行时不是原子的,它可能被操作系统“切片打断”。
来看这个例子:
balance = 0def add_money():global balancefor _ in range(100000):balance += 1
这句 balance += 1
背后其实包含了三个步骤:
- 读取 balance 的值
- 将值加 1
- 把结果写回 balance
如果两个线程几乎同时执行,就可能出现这种情况:
线程A:读取 balance = 100
线程B:读取 balance = 100
线程A:加 1 → 101
线程B:加 1 → 101
线程A:写回 balance = 101
线程B:写回 balance = 101
最终结果应该是 102,但实际上还是 101 —— 这就是 数据丢失(lost update)。
举个直观例子
import threadingbalance = 0def add_money():global balancefor _ in range(100000):balance += 1threads = []
for _ in range(2):t = threading.Thread(target=add_money)threads.append(t)t.start()for t in threads:t.join()print("最终余额:", balance)
理论上应该输出 200000 但实际可能得到的是小于 200000 的随机值。
这是因为两个线程的操作交错执行,产生了竞态条件。
竞态条件的危害
- 数据丢失:上例中的余额变少。
- 数据重复:可能出现重复写入的情况。
- 程序崩溃:某些情况下,内存状态损坏会导致运行时异常。
- 安全问题:银行系统、支付系统如果有竞态条件,可能造成严重后果。
如何解决竞态条件?
核心思想就是:让临界区的操作变成“互斥”的,即同一时刻只能有一个线程执行。
常见方式:
- 使用 锁(Lock)
- 使用 线程安全的数据结构(如
queue.Queue
) - 使用 原子操作(某些库支持)
补充:哪些操作是“原子”的?
在 Python 中,有些简单操作是 原子操作(不会被中断),例如:
- 读取和写入单个变量(整数、小字符串等不可变对象)
x = y
赋值
但是 x += 1
不是原子的,因为它涉及 读 → 改 → 写 三个步骤。
锁
为什么需要锁?
在多线程环境下,如果多个线程同时修改共享数据,就可能出现 竞态条件。
解决办法就是:
在一段关键代码(临界区,critical section)执行时,只允许一个线程进入。
这就需要 互斥锁(Mutex Lock)。
Lock 的基本原理
- Lock(互斥锁) 是最简单的线程同步机制。
- 一个 Lock 只有两种状态:
- 未锁定(Unlocked)
- 已锁定(Locked)
- 使用流程:
- 线程调用
acquire()
获取锁- 如果锁是 Unlocked → 立即拿到锁,进入临界区。
- 如果锁是 Locked → 阻塞,直到锁释放。
- 线程执行临界区代码。
- 线程调用
release()
释放锁,让其他线程有机会进入。
- 线程调用
Lock 的基本用法
import threadingbalance = 0
lock = threading.Lock() # 创建锁def add_money():global balancefor _ in range(100000):lock.acquire() # 上锁balance += 1lock.release() # 解锁threads = [threading.Thread(target=add_money) for _ in range(2)]for t in threads:t.start()
for t in threads:t.join()print("最终余额:", balance)
运行后,你会稳定得到 200000,不会再丢数据。
上下文管理器写法(推荐)
Python 的 Lock
可以配合 with
语句使用,代码更简洁,避免忘记释放锁。
def add_money():global balancefor _ in range(100000):with lock: # 等价于 acquire() + finally: release()balance += 1
推荐写法,因为即使函数异常退出,也会自动释放锁。
try_lock 非阻塞获取
有时你不希望线程一直等待,可以用 非阻塞模式:
import threading
import timelock = threading.Lock()def task():if lock.acquire(blocking=False): # 尝试获取锁,不阻塞print("拿到锁,执行任务")time.sleep(2)lock.release()else:print("没拿到锁,放弃任务")t1 = threading.Thread(target=task)
t2 = threading.Thread(target=task)t1.start()
t2.start()
运行结果:一个线程会拿到锁并执行,另一个线程直接放弃。
死锁(Deadlock)
- 死锁:多个线程相互等待对方释放锁,导致程序卡死。
- 例子:
lock1 = threading.Lock()
lock2 = threading.Lock()def task1():with lock1:print("线程1拿到 lock1")with lock2:print("线程1拿到 lock2")def task2():with lock2:print("线程2拿到 lock2")with lock1:print("线程2拿到 lock1")t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
如果 task1
拿到 lock1
,同时 task2
拿到 lock2
,两个线程就互相等待对方释放,程序卡死。
避免死锁的办法
-
加锁顺序一致
所有线程获取锁的顺序要保持一致。
例如:都先拿lock1
,再拿lock2
。 -
使用
RLock
(可重入锁)
如果同一个线程可能多次获取同一把锁,必须用RLock
,否则会死锁。 -
超时机制
在acquire(timeout=...)
中指定超时时间。if lock.acquire(timeout=2):try:# 临界区passfinally:lock.release() else:print("获取锁失败,避免死锁")
应用场景
- 银行转账
- 多个线程操作同一账户余额,必须加锁避免数据错误。
- 多线程日志写入
- 多个线程同时写入同一个文件时,必须加锁。
- 资源池管理
- 例如数据库连接池,多线程取用/归还连接时需要加锁。
方法总结
方法 | 说明 |
---|---|
acquire() | 获取锁,阻塞直到成功 |
acquire(blocking=False) | 尝试获取锁,失败立即返回 |
acquire(timeout=5) | 超时获取锁,避免死锁 |
release() | 释放锁 |
with lock: | 推荐写法,自动释放锁 |
可重入锁 RLock
什么是 RLock?
在并发编程中,锁(Lock) 的主要作用是保证临界区代码(共享资源操作)在同一时间只允许一个线程进入,防止数据竞争。
但是,普通的 Lock
有一个缺陷:
- 同一个线程 如果多次获取同一把
Lock
,会造成 死锁。
例如:
import threadinglock = threading.Lock()def task():lock.acquire()print("第一次获取锁")# 递归或调用其他函数时再次 acquirelock.acquire() # ❌ 死锁,永远阻塞print("第二次获取锁")lock.release()lock.release()t = threading.Thread(target=task)
t.start()
执行时,第二次 lock.acquire()
会阻塞自己,形成 自死锁。
为了解决这个问题,Python 提供了 RLock(可重入锁,Reentrant Lock)。
- RLock 的特点:
- 允许 同一个线程 多次获得同一把锁,而不会死锁。
- 内部会维护一个 计数器,记录该线程获取锁的次数。
- 只有在调用了相同次数的
release()
之后,锁才会真正被释放。
RLock 的使用方法
基本语法
rlock = threading.RLock()rlock.acquire()
# 临界区代码
rlock.release()
或者用 上下文管理器(推荐):
with rlock:# 临界区代码# 会自动 acquire 和 release
示例:避免自死锁
import threadingrlock = threading.RLock()def recursive_task(n):with rlock:print(f"{threading.current_thread().name} 获取锁,n={n}")if n > 0:recursive_task(n - 1)t = threading.Thread(target=recursive_task, args=(3,), name="Worker")
t.start()
t.join()
输出:
Worker 获取锁,n=3
Worker 获取锁,n=2
Worker 获取锁,n=1
Worker 获取锁,n=0
可以看到,同一个线程多次获取同一把 RLock 没有死锁问题。
RLock 的工作原理
- 内部维护两个数据:
- owner:记录当前持有锁的线程 ID。
- count:计数器,记录该线程获取锁的次数。
- 获取锁时:
- 如果锁空闲 → 线程获取锁,
owner
设为该线程,count=1
。 - 如果锁已被同一线程持有 →
count+1
,允许再次进入。 - 如果锁被其他线程持有 → 阻塞等待。
- 如果锁空闲 → 线程获取锁,
- 释放锁时:
count-1
。- 只有当
count==0
时,真正释放锁,owner=None
。
使用场景
-
递归调用需要加锁的函数
def safe_recursive(n):with rlock:if n > 0:safe_recursive(n - 1)
-
同一线程在不同函数中多次 acquire
def func1():with rlock:print("func1 获取锁")func2()def func2():with rlock:print("func2 获取锁")func1()
-
多线程访问共享资源(和普通 Lock 类似)
balance = 0 rlock = threading.RLock()def deposit(amount):global balancewith rlock:balance += amountprint(f"{threading.current_thread().name} 存入 {amount},余额 {balance}")def withdraw(amount):global balancewith rlock:if balance >= amount:balance -= amountprint(f"{threading.current_thread().name} 取出 {amount},余额 {balance}")else:print(f"{threading.current_thread().name} 取钱失败,余额不足")t1 = threading.Thread(target=deposit, args=(100,)) t2 = threading.Thread(target=withdraw, args=(50,)) t1.start() t2.start() t1.join() t2.join()
RLock 与 Lock 的对比
特性 | Lock | RLock |
---|---|---|
同一线程可否多次获取 | 会死锁 | 可以 |
内部实现 | 二进制锁(0/1) | 计数器 + 线程 ID |
适用场景 | 简单的互斥 | 递归、函数嵌套、复杂调用 |
经验法则:
- 如果你确定不会出现 同一线程重复加锁 → 用
Lock
(性能稍好)。 - 如果可能出现 递归调用或函数间重复加锁 → 用
RLock
。
注意事项
- 别忘记释放锁:获取多少次就要释放多少次(上下文管理器最安全)。
- 性能开销:RLock 内部多了计数器,性能略低于 Lock。
- 避免死锁:虽然 RLock 防止了自死锁,但多个 RLock 交叉使用时仍可能出现死锁(例如线程 A 拿 rlock1 等 rlock2,线程 B 拿 rlock2 等 rlock1)。
死锁
什么是死锁?
死锁 指的是:两个或多个线程在执行过程中,因竞争资源而造成的一种相互等待的状态。
在这种状态下,如果没有外力干涉,它们永远不会再执行下去。
死锁的必要条件(操作系统里经典的“四个条件”):
- 互斥条件:资源一次只能被一个线程使用。
- 请求与保持条件:线程已持有一个资源,同时又请求其他资源。
- 不剥夺条件:线程获得的资源不能被强行剥夺,只能主动释放。
- 循环等待条件:存在一个线程集合
{T1, T2, …, Tn}
,其中 T1 等待 T2 持有的资源,T2 等待 T3 的资源,…,Tn 等待 T1 的资源,形成环路。
只要这四个条件同时成立,就可能发生死锁。
死锁示例
import threading
import timelock1 = threading.Lock()
lock2 = threading.Lock()def task1():with lock1:print("线程1 获取 lock1")time.sleep(1)with lock2:print("线程1 获取 lock2")def task2():with lock2:print("线程2 获取 lock2")time.sleep(1)with lock1:print("线程2 获取 lock1")t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)t1.start()
t2.start()t1.join()
t2.join()
运行结果:
线程1 获取 lock1
线程2 获取 lock2
然后就 卡住不动 —— 因为:
- 线程1 拿着
lock1
,等lock2
。 - 线程2 拿着
lock2
,等lock1
。
→ 形成循环等待,发生死锁。
解决死锁的常见方法
-
避免循环等待:统一加锁顺序
只要保证所有线程 按照相同的顺序 请求资源,就不会死锁。
def task1():with lock1:print("线程1 获取 lock1")time.sleep(1)with lock2: # 先 lock1 再 lock2print("线程1 获取 lock2")def task2():with lock1: # 修改为同样的顺序:先 lock1 再 lock2print("线程2 获取 lock1")time.sleep(1)with lock2:print("线程2 获取 lock2")
所有线程都先拿
lock1
,再拿lock2
,不会形成环路。 -
使用
acquire(timeout=…)
加超时机制Lock.acquire()
方法支持 timeout 参数,如果在指定时间内拿不到锁,会自动返回 False,而不是无限等待。def task1():if lock1.acquire(timeout=1):print("线程1 获取 lock1")time.sleep(1)if lock2.acquire(timeout=1):print("线程1 获取 lock2")lock2.release()lock1.release()
即使死锁场景下,线程也不会无限阻塞,可以选择重试或放弃。
-
使用更高级的同步机制
有时候直接用锁太原始,Python 还提供了更高级的并发工具:
threading.Semaphore
:信号量,可以限制资源的最大并发访问数。threading.Condition
:条件变量,用于更灵活的线程通信。queue.Queue
:线程安全队列,通常替代手写锁,避免死锁。
例如:用
Queue
代替共享资源:import queueq = queue.Queue() # 所有线程通过 q.put() / q.get() 访问数据,内部自带锁
-
死锁检测与恢复(高级思路)
- 在复杂系统中,可以定期扫描线程状态,如果检测到可能的循环等待,可以主动中断某些线程。
- 不过 Python 标准库没有内置死锁检测机制,需要手动实现或借助第三方库。
信号量
什么是信号量 (Semaphore)?
- 信号量 是一种用来控制多个线程访问共享资源的 计数器型锁。
- 它和普通的
Lock
很像,但有个关键区别:Lock
只能同时允许 1 个线程 进入临界区。Semaphore(n)
最多允许 n 个线程 同时进入临界区。
可以理解为:
Lock
就是一把只能过一个人的门锁。Semaphore(n)
是一个有n
张通行证的闸机。
基本用法
Python 提供了 threading.Semaphore
类。
import threadingsema = threading.Semaphore(3) # 最多允许 3 个线程同时进入
常用方法:
acquire([blocking], [timeout])
- 获取一个信号量,内部计数减一。
- 如果计数为 0,则阻塞或超时返回。
release()
- 释放一个信号量,内部计数加一。
代码示例:
-
限制并发数
假设有 5 个线程同时执行任务,但我们希望最多只有 2 个线程能同时进入临界区。
import threading, timesema = threading.Semaphore(2) # 最多 2 个线程同时进入def worker(n):with sema: # acquire 和 release 的简写print(f"线程 {n} 开始工作")time.sleep(2)print(f"线程 {n} 完成工作")threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] for t in threads: t.start() for t in threads: t.join()
输出(部分,顺序可能不同):
线程 0 开始工作 线程 1 开始工作 # 等待 2 秒 线程 0 完成工作 线程 1 完成工作 线程 3 开始工作 线程 2 开始工作 线程 3 完成工作 线程 2 完成工作 线程 4 开始工作 线程 4 完成工作
一次只有 2 个线程在执行,符合限制。
-
限制资源访问
假设我们有 3 个数据库连接,但有 10 个线程要访问数据库,我们可以用
Semaphore(3)
控制最多 3 个线程同时使用数据库。db_sema = threading.Semaphore(3)def access_db(tid):with db_sema:print(f"线程 {tid} 正在访问数据库")time.sleep(1)print(f"线程 {tid} 释放数据库")threads = [threading.Thread(target=access_db, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join()
保证了同时最多只有 3 个线程访问数据库。
信号量 vs 互斥锁 (Lock)
特性 | Lock | Semaphore |
---|---|---|
最大并发数 | 1 | n(可自定义) |
使用场景 | 互斥,保护临界区 | 限制资源并发访问 |
内部机制 | 二进制锁 | 计数器 |
是否可递归 | ❌ | ❌(不可递归) |
经验法则:
- 如果资源一次只能给 1 个线程用 → 用
Lock
。 - 如果资源有多个份(比如多个数据库连接) → 用
Semaphore
。
信号量的变体
threading.BoundedSemaphore
Semaphore
默认可以无限release()
,导致计数超过初始值。BoundedSemaphore
限制了最大计数,防止错误释放。
sema = threading.BoundedSemaphore(2)
sema.acquire()
sema.release()
sema.release() # 会报 ValueError
推荐使用 BoundedSemaphore
,更安全。
信号量的应用场景
- 限制并发线程数:比如同时最多 3 个线程下载文件。
- 控制对资源池的访问:比如数据库连接池、线程池、爬虫的请求并发数。
- 实现生产者-消费者模型:使用两个信号量分别表示“空槽”和“已用槽”。
- 限流 (Rate Limiting):限制某个接口的并发调用次数。
注意事项
- 不要多次 release(超过初始计数会出错,除非用普通 Semaphore)。
- 不适合递归调用(需要 RLock)。
- 性能比 Lock 略低,但更灵活。
- 和 Queue 搭配 更安全(Queue 内部就用了信号量)。
条件变量
什么是条件变量(Condition)?
- 条件变量 是一种高级同步原语,用于让一个或多个线程 等待某个条件,直到其他线程通知条件发生改变。
- 它通常和 Lock 或 RLock 配合使用。
- 条件变量的核心方法:
wait()
:线程等待条件,同时释放底层锁。notify()
:唤醒一个等待该条件的线程。notify_all()
:唤醒所有等待该条件的线程。
简单比喻:
- 条件变量 = “红绿灯”
- 线程 A 等红灯(wait),线程 B 变绿灯(notify),A 才能继续走。
条件变量的基本用法
-
wait() + notify()
import threading import timecondition = threading.Condition() data_ready = False # 条件状态def consumer():global data_readywith condition:print("消费者等待数据...")while not data_ready: # 条件不满足就 waitcondition.wait() # 释放锁并阻塞,等待通知print("消费者开始处理数据")def producer():global data_readytime.sleep(2)with condition:data_ready = Trueprint("生产者生产完数据,通知消费者")condition.notify() # 唤醒一个等待线程t1 = threading.Thread(target=consumer) t2 = threading.Thread(target=producer) t1.start() t2.start() t1.join() t2.join()
输出示例:
消费者等待数据... 生产者生产完数据,通知消费者 消费者开始处理数据
特点:
wait()
会自动释放锁,允许其他线程进入临界区。notify()
只能唤醒在 同一个 Condition 上等待的线程。- 推荐在
while
循环中检查条件,防止“虚假唤醒”。
-
notify_all() 唤醒所有线程
with condition:condition.notify_all() # 所有等待线程都会被唤醒
适用于多个消费者线程都在等待同一个条件时。
条件变量的应用场景
-
生产者-消费者模型
import threading import time import randomqueue = [] queue_size = 5 condition = threading.Condition()def producer():global queuewhile True:item = random.randint(1,100)with condition:while len(queue) >= queue_size:condition.wait() # 队列满,等待消费者queue.append(item)print(f"生产者放入: {item}")condition.notify() # 通知消费者time.sleep(1)def consumer():global queuewhile True:with condition:while not queue:condition.wait() # 队列空,等待生产者item = queue.pop(0)print(f"消费者取出: {item}")condition.notify() # 通知生产者time.sleep(2)t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start()
这种模式可以保证:
- 队列满时生产者等待
- 队列空时消费者等待
- 线程安全、高效协作
-
等待某个条件满足再执行
- 比如线程 A 等待配置文件加载完成再启动处理。
- 线程 B 加载完毕后调用
notify()
唤醒等待线程。
条件变量的原理
- 内部依赖一个 Lock/RLock 来保证临界区安全。
wait()
执行流程:- 当前线程释放锁。
- 进入等待队列。
- 被
notify()
唤醒后,重新获取锁,继续执行。
notify()
执行流程:- 从等待队列中选择一个线程(FIFO 或操作系统调度)
- 唤醒线程,等待它重新获取锁
重要:Condition 只是线程间通信的机制,共享数据还要自己管理。
注意事项
-
必须在锁内调用
wait()
/notify()
,否则会报异常。 -
使用 while 检查条件,而不是 if:
while not condition_met:condition.wait()
防止“虚假唤醒”。
-
不要滥用 notify_all(),会唤醒所有线程,可能造成性能下降。
线程池
什么是线程池?
- 线程池(Thread Pool) 是一种线程管理机制:
- 提前创建一定数量的线程(池子),然后复用这些线程执行任务。
- 避免频繁创建和销毁线程的开销,提高性能。
- Python 的标准库
concurrent.futures
提供了ThreadPoolExecutor
类来实现线程池。
优势:
- 线程复用,降低创建销毁开销
- 任务提交方便,不用手动管理每个 Thread
- 支持任务结果返回,便于获取执行结果
- 内置异常处理机制
ThreadPoolExecutor 的创建
from concurrent.futures import ThreadPoolExecutor# 创建线程池,最多同时有 5 个线程
executor = ThreadPoolExecutor(max_workers=5)
max_workers
:线程池中同时存在的最大线程数- 如果未指定,Python 会根据 CPU 核心数自动设置一个默认值
提交任务
-
使用
submit()
提交任务import time from concurrent.futures import ThreadPoolExecutordef task(n):time.sleep(1)return f"任务 {n} 完成"with ThreadPoolExecutor(max_workers=3) as executor:future1 = executor.submit(task, 1) # 提交任务future2 = executor.submit(task, 2)print(future1.result()) # 获取返回值print(future2.result())
输出:
任务 1 完成 任务 2 完成
submit()
返回一个 Future 对象,表示异步执行的任务。- 可以用
future.result()
阻塞等待结果。
-
使用
map()
批量提交任务with ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(task, range(5)) # 自动提交任务for r in results:print(r)
输出:
任务 0 完成 任务 1 完成 任务 2 完成 任务 3 完成 任务 4 完成
map()
返回一个迭代器,顺序与输入一致- 内部自动调度线程执行任务
线程池常用方法
方法 | 说明 |
---|---|
submit(fn, *args, **kwargs) | 提交单个任务,返回 Future |
map(fn, *iterables) | 批量提交任务,返回结果迭代器 |
shutdown(wait=True) | 关闭线程池,wait=True 会等待所有任务完成再退出 |
推荐使用 上下文管理器:
with ThreadPoolExecutor(max_workers=5) as executor:...
# 自动调用 shutdown(wait=True)
Future 对象
submit()
返回的 Future
对象常用方法:
result(timeout=None)
:获取任务结果,支持超时done()
:任务是否完成add_done_callback(fn)
:任务完成后自动回调
示例:
def callback(future):print("回调函数,结果:", future.result())with ThreadPoolExecutor(max_workers=2) as executor:future = executor.submit(task, 10)future.add_done_callback(callback)
输出:
回调函数,结果: 任务 10 完成
线程池异常处理
def task_error(n):if n == 2:raise ValueError("出现错误")return nwith ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(task_error, i) for i in range(4)]for f in futures:try:print(f.result())except Exception as e:print("捕获异常:", e)
输出:
0
1
捕获异常: 出现错误
3
- 异常会在
result()
时抛出,可以捕获。 - 避免线程直接崩掉,方便统一处理。
应用场景
- I/O 密集型任务:爬虫、文件下载、网络请求
- 任务数量多,但每个任务耗时不长:批量处理图片、数据分析
- 需要统一管理线程和结果:避免自己手动管理
Thread
和锁
注意事项
- 线程池适合 I/O 密集型任务:CPU 密集型任务可能受 GIL 限制,建议用
ProcessPoolExecutor
- 不要提交无限任务:提交任务过多会占用大量内存
- 任务异常要捕获:否则 Future.result() 会抛异常
- 线程池自动复用线程:避免频繁创建销毁 Thread,提高效率