线程通信模型
1 线程通信
不同语言对线程通信有不同的实现,但整体分为两种范式:共享内存 和 消息传递
优先使用消息传递。
2 共享内存
线程通过读写共享内存区域交换数据。关键问题:
- 同步:避免竞态条件(Race Condition),即并发操作的执行顺序的不确定性导致结果的不确定性
典型竞态条件:检查并修改、读-改-写、对象构造逃逸、多个变量的竞态条件 - 可见性:确保一个线程的修改对其他线程立即可见
- 有序性:编译器和CPU可能重排指令
常用工具:
- CAS(Compare-And-Swap):硬件原子指令,ABA问题
- 内存屏障:禁止指令重排,无锁队列 linux kfifo 使用了写屏障 smp_wmb()
- 互斥锁(Lock、Mutex):是否可重入,是否公平
- 读写锁(RWLock)
- 原子操作(Atomic):内部使用CAS实现
- 并发集合:Java ConcurrentHashMap, C# ConcurrentDictionary,内部使用CAS实现
- volatile(Java/C#):保证可见性(但不保证原子性),使用内存屏障实现
- 信号量(Semaphore):通过一个计数器和原子操作管理对共享资源的访问,常用于资源池、限流等
– wait() / P(),计数器减1,若计数器为0则阻塞线程,直到资源可用
– signal() / V(),计数器加1,若有线程在等待队列中,则唤醒其中一个线程 - 条件变量(wait/notify):等待特定条件成立,与一个互斥锁关联使用,常用于生产者-消费者模式等
- RCU (Read-Copy-Update):读操作不加锁,写操作通过复制-修改-原子替换指针的方式更新数据,适用于读多写少的场景
死锁的四个必要条件:
- 互斥访问 资源不能被共享,一次只能被一个线程占用
- 持有并等待 线程已持有部分资源,同时等待其他线程占有的资源
- 不可抢占 资源只能由持有者主动释放,不能被强制剥夺
- 循环等待 A等B,B等C,C等A。
3 消息传递
线程通过显式发送和接收消息进行通信,不直接共享状态。
天然减少共享状态,降低同步复杂度。
3.1 CSP模型与Actor模型
CSP (Communicating Sequential Processes - 通信顺序进程)
- 一种形式化并发模型,描述独立的、并发的进程 (Processes) 如何仅通过 Channel 进行同步通信。它更侧重于通信的机制和约束。
- Processes: 并发执行的实体,具体实现可以是 Goroutine、轻量级线程或操作系统线程
- Channel: Processes 之间通信的唯一方式。CSP 理论中,Channel 通常是同步的(无缓冲),通信是点对点的直接握手。发送方和接收方必须同时准备好才能完成通信。Go 的 Channel 扩展了有缓冲 Channel(异步)。
- Processes 是匿名的,它们通过共享的 Channel 进行通信,不需要知道对方是谁。
- 可通过多路复用处理多个通道的消息(如Go的select,Rust的select!宏)
- 典型代表:Go语言的Goroutine/Channel
Actor (参与者/演员)
- 封装了状态和行为的独立并发实体。强调通过消息进行通信的自治对象。
- 状态封装:每个 Actor 拥有自己的私有状态,外部无法直接访问或修改。
- 消息驱动:每个 Actor 都有一个消息队列用于接收消息,Actor 之间仅通过消息进行通信。
- 串行处理: Actor 一次只处理一条消息
- 地址 : 每个 Actor 有一个唯一地址,要向 Actor 发送消息,必须知道其地址。
- 监督层次: Actor 可以创建子 Actor 并监督它们。如果子 Actor 异常,父 Actor 会收到通知并决定如何处理。
- 典型代表: Erlang/Elixir语言、Java Akka框架、Akka.NET框架
3.2 不同语言消息队列的实现
- C++:queue + mutex + condition_variable
- Go:Channel
- Java:BlockingQueue
- Rust:MPSC,多生产者单消费者通道
- Erlang/Elixir:Actor模型
- Python:Queue + asyncio
- C#:Channel(.NET Core+)
消息传递同样有死锁的问题,比如Go中两个goroutine互相等待对方从channel读取
4 Future/Promise 与 结构化并发
Future/Promise 是一种获取异步计算结果的机制:
- Future:表示一个异步计算的结果容器。消费者可以获取结果(阻塞或非阻塞)、注册回调函数。
- Promise:表示异步计算的结果生产者。生产者通过它设置结果或异常,并通知关联的 Future。
不同语言的实现:
- C++:promise + future
- Java:CompletableFuture
- JavaScript:Promise
- C#:Task + TaskCompletionSource
- Python:asyncio.Future
结构化并发 (Structured Concurrency):子任务的生命周期严格嵌套在其父任务内,确保资源正确释放、错误传播和取消
- 生命周期绑定:任务必须在父任务退出前完成
- 错误传播:子任务失败(抛出异常),这个错误会向上传播到父任务
- 取消传播:如果父任务被取消,取消信号会传播给所有其创建的子任务,子任务需要支持协作式取消
- 资源管理:因为父任务会等待所有子任务完成,所以可以安全地释放作用域内的共享资源
- 可读性与可维护性:代码结构清晰反映了任务的并发结构,易于调试和推理
实现:Java StructuredTaskScope 、 Python asyncio.TaskGroup
Java CompletableFuture 和 StructuredTaskScope 代码风格对比:
用户订单查询场景:
1. 查询用户基本信息
2. 并发查询用户订单和地址
3. 合并结果返回
CompletableFuture 实现(异步回调风格):
public CompletableFuture<OrderResult> getUserOrderAsync(String userId) { // 1. 异步获取用户信息 CompletableFuture<UserInfo> userFuture = userService.getInfoAsync(userId); // 2. 并发查询订单和地址 CompletableFuture<List<Order>> ordersFuture = orderService.getOrdersAsync(userId); CompletableFuture<Address> addressFuture = addressService.getPrimaryAsync(userId); // 3. 合并结果(嵌套回调) return userFuture.thenCompose(user -> ordersFuture.thenCombine(addressFuture, (orders, address) -> { return new OrderResult(user, orders, address); }) ).exceptionally(ex -> { // 统一错误处理(需解包 CompletionException) if (ex instanceof CompletionException) { throw new ServiceException(ex.getCause()); } throw new ServiceException(ex); });
}
虚拟线程实现(同步风格):
// Java 21+
public OrderResult getUserOrder(String userId) throws Exception { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 1. 提交子任务 Future<UserInfo> userFuture = scope.fork(() -> userService.getInfo(userId)); Future<List<Order>> ordersFuture = scope.fork(() -> orderService.getOrders(userId)); Future<Address> addressFuture = scope.fork(() -> addressService.getPrimary(userId)); // 2. 等待所有任务完成 scope.join(); scope.throwIfFailed(); // 统一异常处理 // 3. 获取结果(同步写法) UserInfo user = userFuture.resultNow(); List<Order> orders = ordersFuture.resultNow(); Address address = addressFuture.resultNow(); return new OrderResult(user, orders, address); } // 作用域退出自动取消未完成任务
}
5 任务集合
任务集合指一组可并行或串行执行的计算单元(Task),通常包含:
- 任务本身:待执行的逻辑(函数/闭包/Runnable)
- 任务间关系:依赖、顺序、优先级等约束
- 执行策略:并行度、调度机制、资源分配
核心目标:高效利用多核资源,解决复杂问题分解后的协同执行。
关键点:
问题 | 描述 | 示例场景 |
---|---|---|
任务分解 | 将大问题拆解为可并行执行的子任务 | 矩阵乘法 → 分块计算 |
依赖管理 | 处理任务间的先后顺序约束 | B任务依赖A任务的输出结果 |
资源竞争 | 多个任务争用有限资源(CPU/内存/I/O) | 线程池满时新任务等待 |
负载均衡 | 动态分配任务到工作线程避免空闲或过载 | 偷取工作(Work Stealing) |
错误处理 | 单个任务失败时整体策略(终止/重试/忽略) | 批量数据处理中的单条记录失败 |
结果聚合 | 合并子任务结果生成最终输出 | MapReduce中的Reduce阶段 |
任务间的通信依赖共享内存、消息传递、Future/Promise等机制。
按任务依赖关系分类:
- 独立任务集:任务间无依赖,可完全并行
- 有向无环图:任务拓扑结构,需按依赖顺序执行
- 流水线:任务链式处理数据(生产者-消费者模式)
不同语言的实现:
- Java:Executors
- Go:sync.WaitGroup
- C++:std::async
- Python:concurrent.futures.ThreadPoolExecutor