当前位置: 首页 > news >正文

深深浅浅地理解 Python 中的 `Barrier` 对象

在 Python 的多线程编程中,threading.Barrier(屏障)对象是一个强大的同步原语,它允许多个线程在达到某个共同点时同步,并且只有当所有参与的线程都达到这个点后,它们才能一起继续执行。你可以把它想象成一场赛跑的起点线,所有选手必须到达起点线,发令枪响后才能一起开跑。

为什么需要 Barrier

在某些多线程场景中,我们可能需要确保一组线程在执行特定操作之前,都完成了各自的前置任务。例如:

  • 并行计算的分阶段任务: 多个线程独立计算数据的不同部分,但在进入下一个计算阶段之前,所有线程都必须完成当前阶段的计算结果。
  • 模拟仿真: 多个模拟实体需要在一个时间步长内独立更新状态,然后等待所有实体更新完毕后,再一起进入下一个时间步长。
  • 资源初始化: 在应用程序启动时,多个线程可能并行初始化不同的模块,但主程序只有在所有模块都初始化完毕后才能正式启动。

如果没有 Barrier,你可能需要使用复杂的 LockConditionSemaphore 组合来实现这种“汇合点”逻辑,这会非常容易出错且难以维护。Barrier 提供了一种简洁、优雅的方式来解决这种**“多线程集合点”**的需求。

Barrier 的基本原理

Barrier 对象内部维护着一个计数器,这个计数器代表了当前还没有到达屏障的线程数量。

  1. 创建屏障: 创建 Barrier 时,你需要指定参与屏障的线程总数 (parties)。
  2. 线程到达: 当一个线程调用 barrier.wait() 方法时:
    • Barrier 内部的计数器会减 1。
    • 如果这个线程是最后一个到达屏障的(即计数器减到 0),那么所有在 wait() 处阻塞的线程都会被唤醒,并且 wait() 方法会返回一个非零整数(通常是该线程在所有参与者中到达的序号,从 0 到 parties - 1)。
    • 如果这个线程不是最后一个到达的,那么它会阻塞,直到所有 parties 数量的线程都调用了 wait()
  3. 重置屏障: 一旦所有线程都通过了屏障,Barrier 会自动重置其内部状态,为下一轮的同步做好准备。

Barrier 对象的创建与使用

1. 创建 Barrier 对象
import threading# 创建一个 Barrier,需要 3 个线程参与同步
b = threading.Barrier(3)

threading.Barrier(parties, action=None, timeout=None)

  • parties必需参数,指定必须调用 wait() 才能解除阻塞的线程数量。
  • action可选参数,一个可调用对象(函数),当所有线程都到达屏障并准备释放时,由最后一个到达的线程执行。这个函数没有参数,其返回值会被 wait() 方法返回给所有被释放的线程(除了最后一个到达的线程,它会返回自己的序号)。如果 action 抛出异常,屏障会进入破损状态。
  • timeout可选参数,指定 wait() 方法的默认超时时间(秒)。如果 wait() 在超时时间内没有被所有线程达到,屏障会进入破损状态。
2. Barrier 的主要方法
  • wait(timeout=None):

    • 表示当前线程已经到达屏障,并等待其他线程。
    • 如果当前线程是最后一个到达的,所有等待的线程都会被唤醒。
    • 返回一个整数,表示当前线程在所有参与者中到达的序号(从 0 开始)。
    • 如果提供了 timeout,并且在超时时间内没有所有 parties 数量的线程到达,wait() 会抛出 BrokenBarrierError 异常。
  • abort():

    • 手动将屏障设置为“破损”状态。
    • 一旦屏障破损,所有当前和未来调用 wait() 的线程都会立即抛出 BrokenBarrierError 异常。
    • 这通常用于错误处理或提前终止同步过程。
  • reset():

    • 将屏障重置为初始的空状态。
    • 只有在所有参与者都通过了屏障,或者屏障处于破损状态时,才能安全地调用此方法。
    • 如果在屏障活跃(有线程在等待)时调用 reset(),可能会导致未定义的行为。
  • parties 属性:

    • 只读属性,返回创建屏障时设定的参与线程总数。
  • n_waiting 属性:

    • 只读属性,返回当前正在等待屏障的线程数量。

BrokenBarrierError

当以下情况发生时,wait() 方法会抛出 BrokenBarrierError 异常:

  • wait() 方法超时。
  • 某个线程在 wait() 处收到信号(如中断),但不是由 Barrier 自身触发的。
  • 有线程调用了 barrier.abort()
  • action 函数执行时抛出异常。

一旦屏障进入破损状态,它将不再可用,所有后续尝试通过 wait() 的线程都会立即抛出 BrokenBarrierError。要重新使用,必须创建一个新的 Barrier 或调用 reset()(但通常推荐创建新的)。

Barrier 的典型使用模式

让我们用一个简单的例子来演示 Barrier 的使用:模拟多线程分阶段任务。

import threading
import time
import random# 定义参与屏障的线程数量
NUM_THREADS = 3
# 创建一个 Barrier 对象,需要 NUM_THREADS 个线程参与
# action 参数会在所有线程到达屏障时由最后一个线程执行
barrier = threading.Barrier(NUM_THREADS, action=lambda: print("\n--- 所有线程已完成当前阶段,屏障已解除!---\n"))def worker(name):print(f"线程 {name}: 开始第一阶段任务...")time.sleep(random.uniform(0.5, 2)) # 模拟第一阶段任务print(f"线程 {name}: 第一阶段任务完成,到达屏障。")# 等待所有线程完成第一阶段任务try:# wait() 返回当前线程在所有参与者中的序号# 最后一个到达的线程会执行 action 函数index = barrier.wait()print(f"线程 {name}: 序号 {index},通过屏障,开始第二阶段任务...")except threading.BrokenBarrierError:print(f"线程 {name}: 屏障已破损,无法继续第二阶段任务。")return # 屏障破损,线程退出time.sleep(random.uniform(0.5, 2)) # 模拟第二阶段任务print(f"线程 {name}: 第二阶段任务完成。")if __name__ == "__main__":threads = []for i in range(NUM_THREADS):t = threading.Thread(target=worker, args=(f"Worker-{i+1}",))threads.append(t)t.start()for t in threads:t.join()print("\n所有线程任务完成。")

运行上述代码,你会看到:

  1. 所有线程会并行执行“第一阶段任务”。
  2. 当一个线程完成第一阶段后,它会打印“到达屏障”,然后阻塞。
  3. 只有当所有 3 个线程都到达屏障后,action 函数会被执行(打印屏障解除信息),然后所有阻塞的线程会同时解除阻塞,并继续执行“第二阶段任务”。
  4. 最后,所有线程完成各自的第二阶段任务。

Barrier 与其他同步原语的区别

  • Lock (互斥锁): Lock 用于保护临界区,确保在任何时候只有一个线程能访问共享资源。Barrier 则是协调多个线程在特定点上进行同步。
  • Condition (条件变量): Condition 用于线程间的通信,让一个或多个线程等待某个条件满足,并由另一个线程在条件满足时发出通知。Barrier 更侧重于“会合点”,所有线程必须在同一时间点集合。虽然可以用 Condition 模拟 Barrier,但 Barrier 提供了更简洁、更专门的接口。
  • Semaphore (信号量): Semaphore 用于控制同时访问某个资源的线程数量。它维护一个计数器,但其目的是控制并发量,而不是强制所有线程在某个点同步。
  • Event (事件): Event 只是一个简单的信号开关,一个线程设置事件,其他线程等待。它不像 Barrier 那样需要固定数量的线程参与才能解除阻塞,也不具备 action 函数。

总结

threading.Barrier 对象是 Python 多线程编程中一个非常有用的高级同步工具,它为需要“所有线程都到达某一点才能继续”的并行任务提供了优雅的解决方案。通过 Barrier,你可以确保一组线程在分阶段执行任务时的同步性,简化了复杂的多线程协调逻辑。掌握 Barrier 能够让你在处理多线程协作问题时拥有更灵活、更强大的工具。

http://www.dtcms.com/a/270150.html

相关文章:

  • emscripten编译cocos2dx项目输入框支持中文
  • MySQL 全库表记录统计与空间估算教程
  • 猿人学js逆向比赛第一届第十五题
  • SpringAI学习笔记-MCP服务器简单示例
  • 软考(软件设计师)数据库原理-SQL
  • HTML+JS+CSS制作一个数独游戏
  • CSS揭秘:9.自适应的椭圆
  • 记一次mount point is busy问题排查
  • 数据结构 —— 栈(stack)在算法思维中的巧妙运用
  • C++进阶—二叉树进阶
  • 笔记/TCP/IP四层模型
  • Fence-音视频设备资源同步
  • IT 技术领域创作者三周年纪念日
  • 【CodeTop】每日练习 2025.7.8
  • Java 阻塞队列:7种类型全解析
  • 起重机械的工作循环门限值计算逻辑
  • 容器技术入门与Docker环境部署
  • Ntfs!LfsRestartLogFile函数分析之两次调用Ntfs!LfsReadRestart函数的目的
  • (生活比喻-图文并茂)http2.0和http3.0的队头阻塞,http2.0应用层解决,TCP层存在,3.0就是彻底解决,到底怎么理解区别???
  • AI健康小屋“15分钟服务圈”:如何重构社区健康生态?
  • MyBatis-Plus:深入探索与最佳实践
  • C#,js如何对网页超文本内容按行拆分,选择第A-B个字符返回HTM?
  • stack_queue扩展学习 --- 反向迭代器
  • 戴尔3670装win11和ubuntu双系统踩坑教程
  • 自动驾驶传感器的标定与数据融合
  • 【Android】组件及布局介绍
  • CAN主站转Modbus TCP网关:高铁门控系统的“毫秒级响应”密码
  • 【ZYNQ Linux开发】BRAM的几种驱动方式
  • 微服务集成snail-job分布式定时任务系统实践
  • Mac安装Docker(使用orbstack代替)