当前位置: 首页 > news >正文

线程通信模型

1 线程通信

不同语言对线程通信有不同的实现,但整体分为两种范式:共享内存消息传递

优先使用消息传递。

2 共享内存

线程通过读写共享内存区域交换数据。关键问题:

  • 同步:避免竞态条件(Race Condition),即并发操作的执行顺序的不确定性导致结果的不确定性
    典型竞态条件:检查并修改、读-改-写、对象构造逃逸、多个变量的竞态条件
  • 可见性:确保一个线程的修改对其他线程立即可见
  • 有序性:编译器和CPU可能重排指令

常用工具:

  • CAS(Compare-And-Swap):硬件原子指令,ABA问题
  • 内存屏障:禁止指令重排,无锁队列 linux kfifo 使用了写屏障 smp_wmb()
  • 互斥锁(Lock、Mutex):是否可重入,是否公平
  • 读写锁(RWLock)
  • 原子操作(Atomic):内部使用CAS实现
  • 并发集合:Java ConcurrentHashMap, C# ConcurrentDictionary,内部使用CAS实现
  • volatile(Java/C#):保证可见性(但不保证原子性),使用内存屏障实现
  • 信号量(Semaphore):通过一个计数器和原子操作管理对共享资源的访问,常用于资源池、限流等
    – wait() / P(),计数器减1,若计数器为0则阻塞线程,直到资源可用
    – signal() / V(),计数器加1,若有线程在等待队列中,则唤醒其中一个线程
  • 条件变量(wait/notify):等待特定条件成立,与一个互斥锁关联使用,常用于生产者-消费者模式等
  • RCU (Read-Copy-Update):读操作不加锁,写操作通过复制-修改-原子替换指针的方式更新数据,适用于读多写少的场景

死锁的四个必要条件:

  1. 互斥访问 资源不能被共享,一次只能被一个线程占用
  2. 持有并等待 线程已持有部分资源,同时等待其他线程占有的资源
  3. 不可抢占 资源只能由持有者主动释放,不能被强制剥夺
  4. 循环等待 A等B,B等C,C等A。

3 消息传递

线程通过显式发送和接收消息进行通信,不直接共享状态。
天然减少共享状态,降低同步复杂度。

3.1 CSP模型与Actor模型

CSP (Communicating Sequential Processes - 通信顺序进程)

  • 一种形式化并发模型,描述独立的、并发的进程 (Processes) 如何仅通过 Channel 进行同步通信。它更侧重于通信的机制和约束。
  • Processes: 并发执行的实体,具体实现可以是 Goroutine、轻量级线程或操作系统线程
  • Channel: Processes 之间通信的唯一方式。CSP 理论中,Channel 通常是同步的(无缓冲),通信是点对点的直接握手。发送方和接收方必须同时准备好才能完成通信。Go 的 Channel 扩展了有缓冲 Channel(异步)。
  • Processes 是匿名的,它们通过共享的 Channel 进行通信,不需要知道对方是谁。
  • 可通过多路复用处理多个通道的消息(如Go的select,Rust的select!宏)
  • 典型代表:Go语言的Goroutine/Channel

Actor (参与者/演员)

  • 封装了状态和行为的独立并发实体。强调通过消息进行通信的自治对象。
  • 状态封装:每个 Actor 拥有自己的私有状态,外部无法直接访问或修改。
  • 消息驱动:每个 Actor 都有一个消息队列用于接收消息,Actor 之间仅通过消息进行通信。
  • 串行处理: Actor 一次只处理一条消息
  • 地址 : 每个 Actor 有一个唯一地址,要向 Actor 发送消息,必须知道其地址。
  • 监督层次: Actor 可以创建子 Actor 并监督它们。如果子 Actor 异常,父 Actor 会收到通知并决定如何处理。
  • 典型代表: Erlang/Elixir语言、Java Akka框架、Akka.NET框架

3.2 不同语言消息队列的实现

  • C++:queue + mutex + condition_variable
  • Go:Channel
  • Java:BlockingQueue
  • Rust:MPSC,多生产者单消费者通道
  • Erlang/Elixir:Actor模型
  • Python:Queue + asyncio
  • C#:Channel(.NET Core+)

消息传递同样有死锁的问题,比如Go中两个goroutine互相等待对方从channel读取

4 Future/Promise 与 结构化并发

Future/Promise 是一种获取异步计算结果的机制:

  • Future:表示一个异步计算的结果容器。消费者可以获取结果(阻塞或非阻塞)、注册回调函数。
  • Promise:表示异步计算的结果生产者。生产者通过它设置结果或异常,并通知关联的 Future。

不同语言的实现:

  • C++:promise + future
  • Java:CompletableFuture
  • JavaScript:Promise
  • C#:Task + TaskCompletionSource
  • Python:asyncio.Future

结构化并发 (Structured Concurrency):子任务的生命周期严格嵌套在其父任务内,确保资源正确释放、错误传播和取消

  • 生命周期绑定:任务必须在父任务退出前完成
  • 错误传播:子任务失败(抛出异常),这个错误会向上传播到父任务
  • 取消传播:如果父任务被取消,取消信号会传播给所有其创建的子任务,子任务需要支持协作式取消
  • 资源管理:因为父任务会等待所有子任务完成,所以可以安全地释放作用域内的共享资源
  • 可读性与可维护性:代码结构清晰反映了任务的并发结构,易于调试和推理

实现:Java StructuredTaskScope 、 Python asyncio.TaskGroup

Java CompletableFuture 和 StructuredTaskScope 代码风格对比:

用户订单查询场景:
1. 查询用户基本信息
2. 并发查询用户订单和地址
3. 合并结果返回

CompletableFuture 实现(异步回调风格):

public CompletableFuture<OrderResult> getUserOrderAsync(String userId) {  // 1. 异步获取用户信息  CompletableFuture<UserInfo> userFuture = userService.getInfoAsync(userId);  // 2. 并发查询订单和地址  CompletableFuture<List<Order>> ordersFuture = orderService.getOrdersAsync(userId);  CompletableFuture<Address> addressFuture = addressService.getPrimaryAsync(userId);  // 3. 合并结果(嵌套回调)  return userFuture.thenCompose(user ->  ordersFuture.thenCombine(addressFuture, (orders, address) -> {  return new OrderResult(user, orders, address);  })  ).exceptionally(ex -> {  // 统一错误处理(需解包 CompletionException)  if (ex instanceof CompletionException) {  throw new ServiceException(ex.getCause());  }  throw new ServiceException(ex);  });  
}  

虚拟线程实现(同步风格):

// Java 21+  
public OrderResult getUserOrder(String userId) throws Exception {  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 1. 提交子任务  Future<UserInfo> userFuture = scope.fork(() -> userService.getInfo(userId));  Future<List<Order>> ordersFuture = scope.fork(() -> orderService.getOrders(userId));  Future<Address> addressFuture = scope.fork(() -> addressService.getPrimary(userId));  // 2. 等待所有任务完成  scope.join();  scope.throwIfFailed();  // 统一异常处理  // 3. 获取结果(同步写法)  UserInfo user = userFuture.resultNow();  List<Order> orders = ordersFuture.resultNow();  Address address = addressFuture.resultNow();  return new OrderResult(user, orders, address);  } // 作用域退出自动取消未完成任务  
} 

5 任务集合

任务集合指一组可并行或串行执行的计算单元(Task),通常包含:

  • 任务本身:待执行的逻辑(函数/闭包/Runnable)
  • 任务间关系:依赖、顺序、优先级等约束
  • 执行策略:并行度、调度机制、资源分配

核心目标:高效利用多核资源,解决复杂问题分解后的协同执行。

关键点:

问题描述示例场景
任务分解将大问题拆解为可并行执行的子任务矩阵乘法 → 分块计算
依赖管理处理任务间的先后顺序约束B任务依赖A任务的输出结果
资源竞争多个任务争用有限资源(CPU/内存/I/O)线程池满时新任务等待
负载均衡动态分配任务到工作线程避免空闲或过载偷取工作(Work Stealing)
错误处理单个任务失败时整体策略(终止/重试/忽略)批量数据处理中的单条记录失败
结果聚合合并子任务结果生成最终输出MapReduce中的Reduce阶段

任务间的通信依赖共享内存、消息传递、Future/Promise等机制。

按任务依赖关系分类:

  • 独立任务集:任务间无依赖,可完全并行
  • 有向无环图:任务拓扑结构,需按依赖顺序执行
  • 流水线:任务链式处理数据(生产者-消费者模式)

不同语言的实现:

  • Java:Executors
  • Go:sync.WaitGroup
  • C++:std::async
  • Python:concurrent.futures.ThreadPoolExecutor
http://www.dtcms.com/a/295717.html

相关文章:

  • 中国西北典型绿洲区土壤水分特征(2018-2019年)
  • [火了]-----FastGPT 插件系统架构演进:从 Monorepo 到独立生态
  • Spring MVC 统一响应格式:ResponseBodyAdvice 从浅入深
  • 快速将前端得依赖打为tar包(yarn.lock版本)并且推送至nexus私有依赖仓库(笔记)
  • 【工具变量】省市县空气流通系数数据集(1940-2025.3年)
  • Dataease2.10 前端二次开发
  • Windows 系统中 CURL 命令使用指南及常见错误解析
  • Silly Tavern 教程②:首次启动与基础设置
  • 极客大挑战2019-HTTP
  • Vulnhub Matrix-Breakout-2-Morpheus靶机攻略
  • 网络资源模板--基于Android Studio 实现的线上点餐系统
  • 【Linux基础知识系列】第六十三篇 - 文件编辑器基础:vim
  • 自己动手造轮子:如何创建JAR并通过Maven在Spring Boot中引用
  • Opencv C# 重叠 粘连 Overlap 轮廓分割 (不知道不知道)
  • Unity 进行 3D 游戏开发如何入门
  • AUTOSAR进阶图解==>AUTOSAR_SWS_BSWModeManager
  • 智慧驾驶疲劳检测算法的实时性优化
  • 深入思考【九九八十一难】的意义,试用歌曲能否解释
  • 【论文阅读50】-融合领域知识与可解释深度学习
  • 如何构建企业级 Mentor EDA 仿真平台
  • 进程调度的艺术:从概念本质到 Linux 内核实现
  • 从“各玩各的”到“无缝贴贴”:Modbus转Profinet让机器人告别“信息孤岛”
  • 【自动化运维神器Ansible】Ansible常用模块之shell模块详解
  • 数据版本控制系统(Oxen)
  • Terraform与Ansible的关系
  • Mysql-UDF提权
  • 家政小程序系统开发:开启智慧家政新时代
  • 详解力扣高频 SQL 50 题-1757.可回收且低脂的产品【入门】
  • 使用phpstudy极简快速安装mysql
  • LLM层归一化:γβ与均值方差的协同奥秘