当前位置: 首页 > news >正文

【Python】并发——线程

目录

  • 并发
    • 并发与并行的基本概念
    • 线程
      • 线程基础
      • 守护线程
      • 线程同步
        • 线程安全与竞态条件
        • 可重入锁 RLock
        • 死锁
        • 信号量
        • 条件变量
      • 线程池

并发

并发与并行的基本概念

什么是并发(Concurrency)

  • 并发:同一时间段内,程序可以交替处理多个任务。
    (并不一定是同时执行,而是“切换得很快”)
  • 例子:你边下载文件边看视频,电脑在不同任务之间快速切换。

并发更像是“时间片轮转”。

什么是并行(Parallelism)

  • 并行:多个任务在真正的同一时刻同时执行。
  • 例子:一台电脑有 4 个 CPU 核心,4 个任务可以同时各跑在一个核心上。

并行是“同时进行”。

Python 的 GIL(全局解释器锁)

  • Python(CPython 实现)有一个 全局解释器锁 (Global Interpreter Lock)。
  • 它的作用:保证同一时刻只有一个线程执行 Python 字节码。
  • 影响:
    • 多线程在 CPU 密集型任务上不能提升速度(因为 GIL 限制)。
    • 多线程在 IO 密集型任务上仍然有效(因为等待 IO 时会释放 GIL)。

IO 密集型 vs CPU 密集型

  • IO 密集型任务:主要消耗在输入/输出(磁盘读写、网络请求)。适合用 多线程异步 IO
  • CPU 密集型任务:主要消耗在计算(数学运算、图像处理)。适合用 多进程

Python 并发的三大方式

  1. 多线程 (threading) → IO 密集型最佳
  2. 多进程 (multiprocessing) → CPU 密集型最佳
  3. 异步 IO (asyncio) → 大量 IO(高并发场景)

模拟 并发下载并行计算 的区别:

import timedef io_task():print("开始下载...")time.sleep(2)   # 模拟网络 IOprint("下载完成!")def cpu_task():print("开始计算...")sum(x * x for x in range(10**6))  # 模拟CPU计算print("计算完成!")# 串行执行
start = time.time()
io_task()
cpu_task()
print("总耗时:", time.time() - start)

运行后你会看到:先花 2 秒下载,再花 1 秒计算,总共约在 2.05~2.1 秒之间。

线程

线程基础

什么是线程?

  • 线程(Thread) 是程序中最小的执行单元。
  • 一个进程(Process)里可以有多个线程,线程共享进程的内存空间。
  • 在 Python 里,多线程由 threading 模块 提供。

适合 IO 密集型任务(等待网络、文件读写)。

创建线程的两种方式

方式一:直接用 threading.Thread(target=...)

import threading
import timedef worker():print("线程开始工作")time.sleep(2)print("线程结束")# 创建线程
t = threading.Thread(target=worker)# 启动线程
t.start()# 等待线程结束
t.join()print("主线程结束")

输出顺序:

线程开始工作
(等待 2 秒)
线程结束
主线程结束
  1. threading.Thread(target=worker):创建线程对象

    • target 参数指定子线程要执行的函数(这里是 worker),但此时子线程并未启动(只是创建了对象)。

    • 若需要给 worker 传参,可通过 argskwargs 参数(例如 threading.Thread(target=worker, args=(10,)))。

  2. t.start():启动子线程

    • 必须调用 start() 才能让子线程真正运行(不能直接调用 t.run(),否则会变成 “主线程串行执行 worker,失去多线程意义”)。

    • 启动后,子线程会独立于主线程执行 target 函数,两者的执行逻辑是 “并行” 的(但受 Python GIL 影响,CPU 密集型任务无法真正多核并行,IO 密集型任务可体现并发优势)。

  3. t.join():主线程等待子线程

    • 作用:让主线程暂停执行,直到子线程终止(相当于 “主线程等子线程下班,再一起下班”)。

    • 若不加 t.join(),主线程会 “不等子线程”,直接执行后续代码,输出顺序会变成:

      线程开始工作
      主线程结束  # 主线程先退出
      (等待 2 秒后)
      线程结束    # 子线程后退出(但主线程已结束,程序可能提前终止)
      

方式二:继承 threading.Thread

import threading
import timeclass MyThread(threading.Thread):def run(self):print(f"{self.name} 开始工作")time.sleep(2)print(f"{self.name} 结束")# 创建并启动线程
t = MyThread()
t.start()
t.join()

具体执行流程:

  1. 创建线程实例t = MyThread() → 实例化自定义线程类,此时子线程未启动。
  2. 启动子线程t.start() → 触发 Thread 基类的 start() 方法,该方法会创建新的子线程,并在子线程中调用你重写的 run() 方法。
  3. 子线程执行 run()
    • 打印 {self.name} 开始工作self.name 是线程名,默认格式为 Thread-1Thread-2 等);
    • 执行 time.sleep(2)(子线程阻塞 2 秒,模拟 IO 耗时);
    • 打印 {self.name} 结束,子线程终止。
  4. 主线程等待t.join() → 主线程阻塞,直到子线程执行完 run() 方法,再继续往下执行(此处主线程后续无额外代码,程序直接退出)。

注意事项:

  1. 必须重写 run() 方法:若自定义线程类不重写 run(),会执行 Thread 基类的 run()(空实现),子线程什么都不做。
  2. 不能直接调用 run():若写 t.run() 而非 t.start(),会变成 “主线程直接执行 run() 方法”(无新线程创建,失去多线程意义)。
  3. GIL 影响:Python 的全局解释器锁(GIL)导致多线程在 CPU 密集型任务(如大量计算)中无法真正 “多核并行”,但在 IO 密集型任务(如 time.sleep、网络请求)中能体现并发优势(避免主线程阻塞)。

两种创建线程的方式

实现方式核心逻辑优点适用场景
继承 Thread重写 run() 方法,start() 自动调用可封装更多属性 / 方法(如自定义线程名、状态变量),逻辑更集中线程逻辑复杂(需维护线程专属状态、多方法协作)
target 函数Thread(target=函数)start() 调用该函数代码简洁,无需定义类,适合简单逻辑线程仅执行单一函数(如简单的下载、计算任务)

推荐 方式一(函数式),更简单;方式二(类继承)更适合大型项目。

查看线程信息

import threading
import timedef task():print("当前线程:", threading.current_thread().name)time.sleep(1)# 主线程
print("主线程:", threading.main_thread().name)# 子线程
t1 = threading.Thread(target=task, name="子线程1")
t2 = threading.Thread(target=task, name="子线程2")t1.start()
t2.start()
t1.join()
t2.join()

关键函数:

  • threading.current_thread() → 获取当前线程对象
  • threading.main_thread() → 获取主线程
  • threading.active_count() → 获取当前活跃线程数
  • threading.enumerate() → 列出所有活跃线程

守护线程

守护线程是什么?

  • 守护线程是随着主线程(Main Thread)结束而自动退出的线程。
  • **非守护线程(普通线程)**在主线程结束后,如果仍有线程在运行,程序会等待这些线程完成才退出。

简单比喻:

  • 主线程 = 主体程序
  • 守护线程 = 背景服务(比如日志、监控、自动保存)
  • 程序结束时,后台服务不阻止程序退出。

如何创建守护线程

  • 可以在 创建线程时指定 daemon=True
import threading
import timedef background_task():while True:print("后台线程运行中...")time.sleep(1)t = threading.Thread(target=background_task, daemon=True)
t.start()time.sleep(3)
print("主线程结束")

特点

  1. 守护线程会在主线程结束时立刻被杀掉。
  2. 不会阻止程序退出。
  3. 常用于执行后台、周期性、辅助性的任务

守护线程与非守护线程的区别

特性守护线程(Daemon)非守护线程(普通线程)
是否阻止程序退出不阻止阻止,程序会等它结束
主线程退出后会自动被杀掉会继续运行,程序等它完成
常用场景日志记录、监控、心跳、后台清理业务逻辑任务、必须完成的计算任务

注意事项

  1. 不要在守护线程中执行关键任务
    因为主线程一结束,守护线程会被立即杀掉,未完成的工作可能丢失。

  2. 守护线程可以通过 setDaemon(True) 方法设置(旧用法)

    t = threading.Thread(target=background_task)
    t.setDaemon(True)
    t.start()
    
  3. 主线程退出时守护线程不会执行 finally 或清理代码
    所以不要在守护线程中依赖资源释放逻辑。

实例:守护线程 vs 普通线程

import threading
import timedef daemon_task():while True:print("守护线程运行中...")time.sleep(1)def normal_task():for i in range(3):print("普通线程运行中...", i)time.sleep(1)# 守护线程
t1 = threading.Thread(target=daemon_task, daemon=True)
t1.start()# 普通线程
t2 = threading.Thread(target=normal_task)
t2.start()time.sleep(2)
print("主线程结束")

运行结果分析

  • 守护线程 t1 会在主线程结束时立即停止(可能只打印 1-2 次)。
  • 普通线程 t2 会继续运行,直到完成 3 次打印,程序才真正退出。

典型使用场景

  1. 后台日志:程序运行时收集日志,但程序退出时无需等待日志线程完成。
  2. 监控/心跳:周期性检查任务状态或网络连接。
  3. 定时清理:后台删除临时文件或缓存。

线程同步

线程安全与竞态条件

什么是线程安全?

  • 线程安全(Thread-Safety) 指的是:当多个线程同时访问共享资源时,程序仍能正确执行,不会导致数据错误或程序崩溃。
  • 如果一个函数、类或者数据结构是“线程安全”的,意味着我们可以在多线程环境中放心使用它,而不需要额外的同步措施。

比如:Python 的 queue.Queue 就是线程安全的,因为它内部已经做了同步控制。

什么是竞态条件?

  • 竞态条件(race condition)发生在多个线程同时读写共享资源,而结果取决于线程调度顺序,导致程序行为不可预测。
  • 竞态条件不是必然出错,而是 有时候对,有时候错,这种“偶发性”很危险。

通俗比喻:

  • 两个人一起往同一个储蓄罐里存钱,每个人都“先取出罐子 → 计算新余额 → 放回去”。
  • 如果他们同时操作,可能会导致一个人的修改被另一个覆盖,结果存进去的钱比实际少。

为什么会发生竞态条件?

因为线程在执行时不是原子的,它可能被操作系统“切片打断”。
来看这个例子:

balance = 0def add_money():global balancefor _ in range(100000):balance += 1

这句 balance += 1 背后其实包含了三个步骤:

  1. 读取 balance 的值
  2. 将值加 1
  3. 把结果写回 balance

如果两个线程几乎同时执行,就可能出现这种情况:

线程A:读取 balance = 100
线程B:读取 balance = 100
线程A:加 1 → 101
线程B:加 1 → 101
线程A:写回 balance = 101
线程B:写回 balance = 101

最终结果应该是 102,但实际上还是 101 —— 这就是 数据丢失(lost update)

举个直观例子

import threadingbalance = 0def add_money():global balancefor _ in range(100000):balance += 1threads = []
for _ in range(2):t = threading.Thread(target=add_money)threads.append(t)t.start()for t in threads:t.join()print("最终余额:", balance)

理论上应该输出 200000 但实际可能得到的是小于 200000 的随机值。

这是因为两个线程的操作交错执行,产生了竞态条件。

竞态条件的危害

  • 数据丢失:上例中的余额变少。
  • 数据重复:可能出现重复写入的情况。
  • 程序崩溃:某些情况下,内存状态损坏会导致运行时异常。
  • 安全问题:银行系统、支付系统如果有竞态条件,可能造成严重后果。

如何解决竞态条件?

核心思想就是:让临界区的操作变成“互斥”的,即同一时刻只能有一个线程执行。
常见方式:

  • 使用 锁(Lock)
  • 使用 线程安全的数据结构(如 queue.Queue
  • 使用 原子操作(某些库支持)

补充:哪些操作是“原子”的?

在 Python 中,有些简单操作是 原子操作(不会被中断),例如:

  • 读取和写入单个变量(整数、小字符串等不可变对象)
  • x = y 赋值

但是 x += 1 不是原子的,因为它涉及 读 → 改 → 写 三个步骤。

为什么需要锁?

在多线程环境下,如果多个线程同时修改共享数据,就可能出现 竞态条件
解决办法就是:

在一段关键代码(临界区,critical section)执行时,只允许一个线程进入。

这就需要 互斥锁(Mutex Lock)

Lock 的基本原理

  • Lock(互斥锁) 是最简单的线程同步机制。
  • 一个 Lock 只有两种状态:
    • 未锁定(Unlocked)
    • 已锁定(Locked)
  • 使用流程:
    1. 线程调用 acquire() 获取锁
      • 如果锁是 Unlocked → 立即拿到锁,进入临界区。
      • 如果锁是 Locked → 阻塞,直到锁释放。
    2. 线程执行临界区代码。
    3. 线程调用 release() 释放锁,让其他线程有机会进入。

Lock 的基本用法

import threadingbalance = 0
lock = threading.Lock() # 创建锁def add_money():global balancefor _ in range(100000):lock.acquire()       # 上锁balance += 1lock.release()       # 解锁threads = [threading.Thread(target=add_money) for _ in range(2)]for t in threads:t.start()
for t in threads:t.join()print("最终余额:", balance)

运行后,你会稳定得到 200000,不会再丢数据。

上下文管理器写法(推荐)

Python 的 Lock 可以配合 with 语句使用,代码更简洁,避免忘记释放锁

def add_money():global balancefor _ in range(100000):with lock:   # 等价于 acquire() + finally: release()balance += 1

推荐写法,因为即使函数异常退出,也会自动释放锁。

try_lock 非阻塞获取

有时你不希望线程一直等待,可以用 非阻塞模式

import threading
import timelock = threading.Lock()def task():if lock.acquire(blocking=False):  # 尝试获取锁,不阻塞print("拿到锁,执行任务")time.sleep(2)lock.release()else:print("没拿到锁,放弃任务")t1 = threading.Thread(target=task)
t2 = threading.Thread(target=task)t1.start()
t2.start()

运行结果:一个线程会拿到锁并执行,另一个线程直接放弃。

死锁(Deadlock)

  • 死锁:多个线程相互等待对方释放锁,导致程序卡死。
  • 例子:
lock1 = threading.Lock()
lock2 = threading.Lock()def task1():with lock1:print("线程1拿到 lock1")with lock2:print("线程1拿到 lock2")def task2():with lock2:print("线程2拿到 lock2")with lock1:print("线程2拿到 lock1")t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()

如果 task1 拿到 lock1,同时 task2 拿到 lock2,两个线程就互相等待对方释放,程序卡死。

避免死锁的办法

  1. 加锁顺序一致
    所有线程获取锁的顺序要保持一致。
    例如:都先拿 lock1,再拿 lock2

  2. 使用 RLock(可重入锁)
    如果同一个线程可能多次获取同一把锁,必须用 RLock,否则会死锁。

  3. 超时机制
    acquire(timeout=...) 中指定超时时间。

    if lock.acquire(timeout=2):try:# 临界区passfinally:lock.release()
    else:print("获取锁失败,避免死锁")
    

应用场景

  1. 银行转账
    • 多个线程操作同一账户余额,必须加锁避免数据错误。
  2. 多线程日志写入
    • 多个线程同时写入同一个文件时,必须加锁。
  3. 资源池管理
    • 例如数据库连接池,多线程取用/归还连接时需要加锁。

方法总结

方法说明
acquire()获取锁,阻塞直到成功
acquire(blocking=False)尝试获取锁,失败立即返回
acquire(timeout=5)超时获取锁,避免死锁
release()释放锁
with lock:推荐写法,自动释放锁
可重入锁 RLock

什么是 RLock?

在并发编程中,锁(Lock) 的主要作用是保证临界区代码(共享资源操作)在同一时间只允许一个线程进入,防止数据竞争。
但是,普通的 Lock 有一个缺陷:

  • 同一个线程 如果多次获取同一把 Lock,会造成 死锁

例如:

import threadinglock = threading.Lock()def task():lock.acquire()print("第一次获取锁")# 递归或调用其他函数时再次 acquirelock.acquire()  # ❌ 死锁,永远阻塞print("第二次获取锁")lock.release()lock.release()t = threading.Thread(target=task)
t.start()

执行时,第二次 lock.acquire() 会阻塞自己,形成 自死锁

为了解决这个问题,Python 提供了 RLock(可重入锁,Reentrant Lock)

  • RLock 的特点:
    • 允许 同一个线程 多次获得同一把锁,而不会死锁。
    • 内部会维护一个 计数器,记录该线程获取锁的次数。
    • 只有在调用了相同次数的 release() 之后,锁才会真正被释放。

RLock 的使用方法

基本语法

rlock = threading.RLock()rlock.acquire()
# 临界区代码
rlock.release()

或者用 上下文管理器(推荐):

with rlock:# 临界区代码# 会自动 acquire 和 release

示例:避免自死锁

import threadingrlock = threading.RLock()def recursive_task(n):with rlock:print(f"{threading.current_thread().name} 获取锁,n={n}")if n > 0:recursive_task(n - 1)t = threading.Thread(target=recursive_task, args=(3,), name="Worker")
t.start()
t.join()

输出:

Worker 获取锁,n=3
Worker 获取锁,n=2
Worker 获取锁,n=1
Worker 获取锁,n=0

可以看到,同一个线程多次获取同一把 RLock 没有死锁问题。

RLock 的工作原理

  • 内部维护两个数据:
    1. owner:记录当前持有锁的线程 ID。
    2. count:计数器,记录该线程获取锁的次数。
  • 获取锁时:
    • 如果锁空闲 → 线程获取锁,owner 设为该线程,count=1
    • 如果锁已被同一线程持有 → count+1,允许再次进入。
    • 如果锁被其他线程持有 → 阻塞等待。
  • 释放锁时:
    • count-1
    • 只有当 count==0 时,真正释放锁,owner=None

使用场景

  1. 递归调用需要加锁的函数

    def safe_recursive(n):with rlock:if n > 0:safe_recursive(n - 1)
    
  2. 同一线程在不同函数中多次 acquire

    def func1():with rlock:print("func1 获取锁")func2()def func2():with rlock:print("func2 获取锁")func1()
    
  3. 多线程访问共享资源(和普通 Lock 类似)

    balance = 0
    rlock = threading.RLock()def deposit(amount):global balancewith rlock:balance += amountprint(f"{threading.current_thread().name} 存入 {amount},余额 {balance}")def withdraw(amount):global balancewith rlock:if balance >= amount:balance -= amountprint(f"{threading.current_thread().name} 取出 {amount},余额 {balance}")else:print(f"{threading.current_thread().name} 取钱失败,余额不足")t1 = threading.Thread(target=deposit, args=(100,))
    t2 = threading.Thread(target=withdraw, args=(50,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

RLock 与 Lock 的对比

特性LockRLock
同一线程可否多次获取会死锁可以
内部实现二进制锁(0/1)计数器 + 线程 ID
适用场景简单的互斥递归、函数嵌套、复杂调用

经验法则:

  • 如果你确定不会出现 同一线程重复加锁 → 用 Lock(性能稍好)。
  • 如果可能出现 递归调用或函数间重复加锁 → 用 RLock

注意事项

  1. 别忘记释放锁:获取多少次就要释放多少次(上下文管理器最安全)。
  2. 性能开销:RLock 内部多了计数器,性能略低于 Lock。
  3. 避免死锁:虽然 RLock 防止了自死锁,但多个 RLock 交叉使用时仍可能出现死锁(例如线程 A 拿 rlock1 等 rlock2,线程 B 拿 rlock2 等 rlock1)。
死锁

什么是死锁?

死锁 指的是:两个或多个线程在执行过程中,因竞争资源而造成的一种相互等待的状态。
在这种状态下,如果没有外力干涉,它们永远不会再执行下去。

死锁的必要条件(操作系统里经典的“四个条件”):

  1. 互斥条件:资源一次只能被一个线程使用。
  2. 请求与保持条件:线程已持有一个资源,同时又请求其他资源。
  3. 不剥夺条件:线程获得的资源不能被强行剥夺,只能主动释放。
  4. 循环等待条件:存在一个线程集合 {T1, T2, …, Tn},其中 T1 等待 T2 持有的资源,T2 等待 T3 的资源,…,Tn 等待 T1 的资源,形成环路。

只要这四个条件同时成立,就可能发生死锁。

死锁示例

import threading
import timelock1 = threading.Lock()
lock2 = threading.Lock()def task1():with lock1:print("线程1 获取 lock1")time.sleep(1)with lock2:print("线程1 获取 lock2")def task2():with lock2:print("线程2 获取 lock2")time.sleep(1)with lock1:print("线程2 获取 lock1")t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)t1.start()
t2.start()t1.join()
t2.join()

运行结果:

线程1 获取 lock1
线程2 获取 lock2

然后就 卡住不动 —— 因为:

  • 线程1 拿着 lock1,等 lock2
  • 线程2 拿着 lock2,等 lock1
    → 形成循环等待,发生死锁。

解决死锁的常见方法

  1. 避免循环等待:统一加锁顺序

    只要保证所有线程 按照相同的顺序 请求资源,就不会死锁。

    def task1():with lock1:print("线程1 获取 lock1")time.sleep(1)with lock2:  # 先 lock1 再 lock2print("线程1 获取 lock2")def task2():with lock1:  # 修改为同样的顺序:先 lock1 再 lock2print("线程2 获取 lock1")time.sleep(1)with lock2:print("线程2 获取 lock2")
    

    所有线程都先拿 lock1,再拿 lock2,不会形成环路。

  2. 使用 acquire(timeout=…) 加超时机制

    Lock.acquire() 方法支持 timeout 参数,如果在指定时间内拿不到锁,会自动返回 False,而不是无限等待。

    def task1():if lock1.acquire(timeout=1):print("线程1 获取 lock1")time.sleep(1)if lock2.acquire(timeout=1):print("线程1 获取 lock2")lock2.release()lock1.release()
    

    即使死锁场景下,线程也不会无限阻塞,可以选择重试或放弃。

  3. 使用更高级的同步机制

    有时候直接用锁太原始,Python 还提供了更高级的并发工具:

    • threading.Semaphore:信号量,可以限制资源的最大并发访问数。
    • threading.Condition:条件变量,用于更灵活的线程通信。
    • queue.Queue:线程安全队列,通常替代手写锁,避免死锁。

    例如:用 Queue 代替共享资源:

    import queueq = queue.Queue()
    # 所有线程通过 q.put() / q.get() 访问数据,内部自带锁
    
  4. 死锁检测与恢复(高级思路)

    • 在复杂系统中,可以定期扫描线程状态,如果检测到可能的循环等待,可以主动中断某些线程。
    • 不过 Python 标准库没有内置死锁检测机制,需要手动实现或借助第三方库。
信号量

什么是信号量 (Semaphore)?

  • 信号量 是一种用来控制多个线程访问共享资源的 计数器型锁
  • 它和普通的 Lock 很像,但有个关键区别:
    • Lock 只能同时允许 1 个线程 进入临界区。
    • Semaphore(n) 最多允许 n 个线程 同时进入临界区。

可以理解为:

  • Lock 就是一把只能过一个人的门锁。
  • Semaphore(n) 是一个有 n 张通行证的闸机。

基本用法

Python 提供了 threading.Semaphore 类。

import threadingsema = threading.Semaphore(3)  # 最多允许 3 个线程同时进入

常用方法:

  • acquire([blocking], [timeout])
    • 获取一个信号量,内部计数减一。
    • 如果计数为 0,则阻塞或超时返回。
  • release()
    • 释放一个信号量,内部计数加一。

代码示例:

  1. 限制并发数

    假设有 5 个线程同时执行任务,但我们希望最多只有 2 个线程能同时进入临界区。

    import threading, timesema = threading.Semaphore(2)  # 最多 2 个线程同时进入def worker(n):with sema:  # acquire 和 release 的简写print(f"线程 {n} 开始工作")time.sleep(2)print(f"线程 {n} 完成工作")threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
    for t in threads: t.start()
    for t in threads: t.join()
    

    输出(部分,顺序可能不同):

    线程 0 开始工作
    线程 1 开始工作
    # 等待 2 秒
    线程 0 完成工作
    线程 1 完成工作
    线程 3 开始工作
    线程 2 开始工作
    线程 3 完成工作
    线程 2 完成工作
    线程 4 开始工作
    线程 4 完成工作
    

    一次只有 2 个线程在执行,符合限制。

  2. 限制资源访问

    假设我们有 3 个数据库连接,但有 10 个线程要访问数据库,我们可以用 Semaphore(3) 控制最多 3 个线程同时使用数据库。

    db_sema = threading.Semaphore(3)def access_db(tid):with db_sema:print(f"线程 {tid} 正在访问数据库")time.sleep(1)print(f"线程 {tid} 释放数据库")threads = [threading.Thread(target=access_db, args=(i,)) for i in range(10)]
    for t in threads: t.start()
    for t in threads: t.join()
    

    保证了同时最多只有 3 个线程访问数据库。

信号量 vs 互斥锁 (Lock)

特性LockSemaphore
最大并发数1n(可自定义)
使用场景互斥,保护临界区限制资源并发访问
内部机制二进制锁计数器
是否可递归❌(不可递归)

经验法则

  • 如果资源一次只能给 1 个线程用 → 用 Lock
  • 如果资源有多个份(比如多个数据库连接) → 用 Semaphore

信号量的变体

threading.BoundedSemaphore

  • Semaphore 默认可以无限 release(),导致计数超过初始值。
  • BoundedSemaphore 限制了最大计数,防止错误释放。
sema = threading.BoundedSemaphore(2)
sema.acquire()
sema.release()
sema.release()  # 会报 ValueError

推荐使用 BoundedSemaphore,更安全。

信号量的应用场景

  1. 限制并发线程数:比如同时最多 3 个线程下载文件。
  2. 控制对资源池的访问:比如数据库连接池、线程池、爬虫的请求并发数。
  3. 实现生产者-消费者模型:使用两个信号量分别表示“空槽”和“已用槽”。
  4. 限流 (Rate Limiting):限制某个接口的并发调用次数。

注意事项

  1. 不要多次 release(超过初始计数会出错,除非用普通 Semaphore)。
  2. 不适合递归调用(需要 RLock)。
  3. 性能比 Lock 略低,但更灵活。
  4. 和 Queue 搭配 更安全(Queue 内部就用了信号量)。
条件变量

什么是条件变量(Condition)?

  • 条件变量 是一种高级同步原语,用于让一个或多个线程 等待某个条件,直到其他线程通知条件发生改变。
  • 它通常和 Lock 或 RLock 配合使用。
  • 条件变量的核心方法:
    1. wait():线程等待条件,同时释放底层锁。
    2. notify():唤醒一个等待该条件的线程。
    3. notify_all():唤醒所有等待该条件的线程。

简单比喻:

  • 条件变量 = “红绿灯”
  • 线程 A 等红灯(wait),线程 B 变绿灯(notify),A 才能继续走。

条件变量的基本用法

  1. wait() + notify()

    import threading
    import timecondition = threading.Condition()
    data_ready = False  # 条件状态def consumer():global data_readywith condition:print("消费者等待数据...")while not data_ready:   # 条件不满足就 waitcondition.wait()   # 释放锁并阻塞,等待通知print("消费者开始处理数据")def producer():global data_readytime.sleep(2)with condition:data_ready = Trueprint("生产者生产完数据,通知消费者")condition.notify()     # 唤醒一个等待线程t1 = threading.Thread(target=consumer)
    t2 = threading.Thread(target=producer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

    输出示例:

    消费者等待数据...
    生产者生产完数据,通知消费者
    消费者开始处理数据
    

    特点:

    • wait() 会自动释放锁,允许其他线程进入临界区。
    • notify() 只能唤醒在 同一个 Condition 上等待的线程
    • 推荐在 while 循环中检查条件,防止“虚假唤醒”。
  2. notify_all() 唤醒所有线程

    with condition:condition.notify_all()  # 所有等待线程都会被唤醒
    

    适用于多个消费者线程都在等待同一个条件时。

条件变量的应用场景

  1. 生产者-消费者模型

    import threading
    import time
    import randomqueue = []
    queue_size = 5
    condition = threading.Condition()def producer():global queuewhile True:item = random.randint(1,100)with condition:while len(queue) >= queue_size:condition.wait()  # 队列满,等待消费者queue.append(item)print(f"生产者放入: {item}")condition.notify()  # 通知消费者time.sleep(1)def consumer():global queuewhile True:with condition:while not queue:condition.wait()  # 队列空,等待生产者item = queue.pop(0)print(f"消费者取出: {item}")condition.notify()  # 通知生产者time.sleep(2)t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    

    这种模式可以保证:

    • 队列满时生产者等待
    • 队列空时消费者等待
    • 线程安全、高效协作
  2. 等待某个条件满足再执行

    • 比如线程 A 等待配置文件加载完成再启动处理。
    • 线程 B 加载完毕后调用 notify() 唤醒等待线程。

条件变量的原理

  • 内部依赖一个 Lock/RLock 来保证临界区安全。
  • wait() 执行流程:
    1. 当前线程释放锁。
    2. 进入等待队列。
    3. notify() 唤醒后,重新获取锁,继续执行。
  • notify() 执行流程:
    1. 从等待队列中选择一个线程(FIFO 或操作系统调度)
    2. 唤醒线程,等待它重新获取锁

重要:Condition 只是线程间通信的机制,共享数据还要自己管理

注意事项

  1. 必须在锁内调用 wait() / notify(),否则会报异常。

  2. 使用 while 检查条件,而不是 if:

    while not condition_met:condition.wait()
    

    防止“虚假唤醒”。

  3. 不要滥用 notify_all(),会唤醒所有线程,可能造成性能下降。

线程池

什么是线程池?

  • 线程池(Thread Pool) 是一种线程管理机制:
    • 提前创建一定数量的线程(池子),然后复用这些线程执行任务。
    • 避免频繁创建和销毁线程的开销,提高性能。
  • Python 的标准库 concurrent.futures 提供了 ThreadPoolExecutor 类来实现线程池。

优势:

  1. 线程复用,降低创建销毁开销
  2. 任务提交方便,不用手动管理每个 Thread
  3. 支持任务结果返回,便于获取执行结果
  4. 内置异常处理机制

ThreadPoolExecutor 的创建

from concurrent.futures import ThreadPoolExecutor# 创建线程池,最多同时有 5 个线程
executor = ThreadPoolExecutor(max_workers=5)
  • max_workers:线程池中同时存在的最大线程数
  • 如果未指定,Python 会根据 CPU 核心数自动设置一个默认值

提交任务

  1. 使用 submit() 提交任务

    import time
    from concurrent.futures import ThreadPoolExecutordef task(n):time.sleep(1)return f"任务 {n} 完成"with ThreadPoolExecutor(max_workers=3) as executor:future1 = executor.submit(task, 1)  # 提交任务future2 = executor.submit(task, 2)print(future1.result())  # 获取返回值print(future2.result())
    

    输出:

    任务 1 完成
    任务 2 完成
    
    • submit() 返回一个 Future 对象,表示异步执行的任务。
    • 可以用 future.result() 阻塞等待结果。
  2. 使用 map() 批量提交任务

    with ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(task, range(5))  # 自动提交任务for r in results:print(r)
    

    输出:

    任务 0 完成
    任务 1 完成
    任务 2 完成
    任务 3 完成
    任务 4 完成
    
    • map() 返回一个迭代器,顺序与输入一致
    • 内部自动调度线程执行任务

线程池常用方法

方法说明
submit(fn, *args, **kwargs)提交单个任务,返回 Future
map(fn, *iterables)批量提交任务,返回结果迭代器
shutdown(wait=True)关闭线程池,wait=True 会等待所有任务完成再退出

推荐使用 上下文管理器

with ThreadPoolExecutor(max_workers=5) as executor:...
# 自动调用 shutdown(wait=True)

Future 对象

submit() 返回的 Future 对象常用方法:

  • result(timeout=None):获取任务结果,支持超时
  • done():任务是否完成
  • add_done_callback(fn):任务完成后自动回调

示例:

def callback(future):print("回调函数,结果:", future.result())with ThreadPoolExecutor(max_workers=2) as executor:future = executor.submit(task, 10)future.add_done_callback(callback)

输出:

回调函数,结果: 任务 10 完成

线程池异常处理

def task_error(n):if n == 2:raise ValueError("出现错误")return nwith ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(task_error, i) for i in range(4)]for f in futures:try:print(f.result())except Exception as e:print("捕获异常:", e)

输出:

0
1
捕获异常: 出现错误
3
  • 异常会在 result() 时抛出,可以捕获。
  • 避免线程直接崩掉,方便统一处理。

应用场景

  1. I/O 密集型任务:爬虫、文件下载、网络请求
  2. 任务数量多,但每个任务耗时不长:批量处理图片、数据分析
  3. 需要统一管理线程和结果:避免自己手动管理 Thread 和锁

注意事项

  1. 线程池适合 I/O 密集型任务:CPU 密集型任务可能受 GIL 限制,建议用 ProcessPoolExecutor
  2. 不要提交无限任务:提交任务过多会占用大量内存
  3. 任务异常要捕获:否则 Future.result() 会抛异常
  4. 线程池自动复用线程:避免频繁创建销毁 Thread,提高效率
http://www.dtcms.com/a/420775.html

相关文章:

  • 网站开发专业前景设计分享网站
  • Kafka03-知识速记
  • dede网站名称不能中文网上在线看视频为什么卡
  • 网站怎么做排名呢wordpress怎么取当前点击的tag
  • 网站制作公司代理云浮北京网站建设
  • 网站建设设计文档模板化妆网站源码
  • 网站 购买小企业网站建设哪找
  • 程序员做音乐网站wordpress 执行流程
  • 建设银行租房网站6网站建设发展历程
  • 怎样做网站模板网站站点建设中端口号的作用
  • 在哪做网站鞍钢节能公司网站开发
  • 网站放到iis如何做指向内网安装wordpress
  • 去他的4.3a
  • 成都网站seo厂家wordpress rss解析
  • 网站开发 接口还是ajax如何选择o2o网站建设
  • 个人介绍网站模板舆情分析报告范文
  • 传输层协议与 Socket API 网络编程
  • 山东网站制作上海建站系统
  • 甘肃网站建设哪家好js模拟点击网站的按钮
  • 摇一摇抽签用什么网站做阿里邮箱企业版app下载
  • 做网站傻瓜软件淮安网站制作设计
  • 常州网站建设方案维护建设银行etc信用卡申请网站
  • 企业网站的网址通常包括网站色调设计方案
  • 建设网站深圳seo点击排名
  • Linux之vi编辑器
  • 范例网站怎么做制作ppt免费软件
  • 网站做桌面应用 iOS网站做影集安全吗
  • 企业静态网站需要备案吗成都网站制作网站设计
  • 设计的网站都有哪些上海有哪些做网站的公司
  • 网站建设公司宣传语网页制作哪家服务好