多线程之屏障(Barrier)
文章目录
- 1、举个栗子
- 2、屏障的核心本质
- 2.1 内核数据结构
- 2.2 状态机流转机制
- 3、底层原理深入解析
- 3.1 阶段管理机制
- 3.2 内存屏障和可见性
- 4、屏障的完整使用
- 4.1 基础用法 - 并行计算
- 4.2 复杂协调 - 多阶段数据处理
- 4.3 动态参与者管理
- 4.4 异常处理和屏障恢复
- 4.5 超时控制和取消操作
- 5、屏障的内部等待机制
- 5.1 阶段号检查机制
- 6、最佳实践和注意事项
- 6.1 正确模式
- 6.2 常见陷阱
- 7、性能考虑
- 7.1 屏障 vs 其他同步机制
- 8、总结
1、举个栗子
场景:团队自驾游
-
你们有 4 辆车(4个线程)一起旅行
-
每个景点都是一个 集合点(屏障点)
-
规则:
-
所有车必须到达当前景点后,才能一起出发去下一个景点
-
如果有车抛锚(线程异常),整个团队等待或调整计划
-
到达集合点时,队长可以宣布下一站路线(阶段后动作)
-
屏障的本质:让一组线程在某个点同步,所有线程都到达后才能继续执行下一阶段。
2、屏障的核心本质
2.1 内核数据结构
屏障在操作系统内核中的概念结构:
// 概念性结构 - 展示屏障的内部状态
class BarrierInternalState
{int ParticipantCount; // 总参与线程数int CurrentPhase; // 当前阶段号int ArrivedCount; // 已到达的线程数int RemainingCount; // 剩余等待线程数Queue<Thread> WaitQueue; // 等待队列Action<Barrier> PostPhaseAction; // 阶段后动作bool IsBroken; // 屏障是否已损坏// ... 其他同步信息
}
2.2 状态机流转机制
class BarrierStateFlow
{// 初始状态:等待所有参与者void InitialState(){ParticipantCount = 4;CurrentPhase = 0;ArrivedCount = 0;RemainingCount = 4;IsBroken = false;}// 当线程调用 SignalAndWait() 时:void OnThreadArrival(){ArrivedCount++;RemainingCount--;if (RemainingCount == 0){// 所有线程到达,进入下一阶段ExecutePostPhaseAction();CurrentPhase++;ArrivedCount = 0;RemainingCount = ParticipantCount;WakeAllWaitingThreads();}else{// 还有线程未到达,进入等待EnterWaitQueue();}}
}
3、底层原理深入解析
3.1 阶段管理机制
屏障的核心是基于阶段的同步:
class BarrierPhaseManagement
{private int _currentPhase = 0;private int _participants;private int _remainingParticipants;public void SignalAndWait(){int currentPhase = _currentPhase;// 原子性地减少剩余参与者计数int remaining = Interlocked.Decrement(ref _remainingParticipants);if (remaining == 0){// 最后一个到达的线程负责重置屏障ResetBarrierForNextPhase();}else{// 等待其他线程WaitForOtherParticipants(currentPhase);}}private void ResetBarrierForNextPhase(){// 执行阶段后动作if (_postPhaseAction != null){_postPhaseAction(this);}// 原子性地递增阶段号Interlocked.Increment(ref _currentPhase);// 重置剩余参与者计数Interlocked.Exchange(ref _remainingParticipants, _participants);// 唤醒所有等待线程WakeAllWaitingThreads();}private void WaitForOtherParticipants(int expectedPhase){// 自旋等待for (int i = 0; i < 100; i++){if (_currentPhase != expectedPhase)return;Thread.SpinWait(100);}// 进入内核等待KernelWait(expectedPhase);}
}
3.2 内存屏障和可见性
屏障隐含内存屏障,确保阶段间的内存可见性:
class BarrierMemoryVisibility
{private int _phaseData;private Barrier _barrier;public void ProcessPhase(int threadId){for (int phase = 0; phase < 3; phase++){// 阶段开始:每个线程处理自己的数据int localResult = DoWork(threadId, phase);// 屏障确保:// 1. 阶段内的所有内存写入对其他线程可见// 2. 阶段间的操作不会重排_barrier.SignalAndWait();// 阶段结束:所有线程看到一致的内存状态if (threadId == 0){// 只有在一个阶段结束后才能安全地聚合结果AggregateResults();}_barrier.SignalAndWait(); // 等待聚合完成}}
}
4、屏障的完整使用
4.1 基础用法 - 并行计算
class ParallelComputationWithBarrier
{private Barrier _barrier;private double[] _partialResults;private double _finalResult = 0;public double Compute(int numberOfThreads){_barrier = new Barrier(numberOfThreads, (barrier) =>{// 阶段后动作:当所有线程到达屏障时执行Console.WriteLine($"阶段 {barrier.CurrentPhaseNumber} 完成");if (barrier.CurrentPhaseNumber == 1){// 第一阶段:汇总部分结果_finalResult = _partialResults.Sum();Console.WriteLine($"部分结果汇总: {_finalResult}");}});_partialResults = new double[numberOfThreads];// 启动工作线程var threads = new Thread[numberOfThreads];for (int i = 0; i < numberOfThreads; i++){int threadId = i;threads[i] = new Thread(() => Worker(threadId));threads[i].Start();}// 等待所有线程完成foreach (var thread in threads){thread.Join();}_barrier.Dispose();return _finalResult;}private void Worker(int threadId){Console.WriteLine($"线程 {threadId} 开始阶段 0");// 阶段 0: 计算部分结果_partialResults[threadId] = CalculatePartialResult(threadId);Console.WriteLine($"线程 {threadId} 完成阶段 0,结果: {_partialResults[threadId]}");_barrier.SignalAndWait(); // 等待所有线程完成阶段0// 阶段 1: 使用汇总结果进行进一步计算Console.WriteLine($"线程 {threadId} 开始阶段 1,使用最终结果: {_finalResult}");// 这里可以进行第二阶段的计算_barrier.SignalAndWait(); // 等待所有线程完成阶段1Console.WriteLine($"线程 {threadId} 完成所有工作");}private double CalculatePartialResult(int threadId){// 模拟计算工作Thread.Sleep(1000);return threadId * 10.0 + 5.0;}
}// 使用示例
class ComputationExample
{static void Main(){var computer = new ParallelComputationWithBarrier();double result = computer.Compute(4);Console.WriteLine($"最终结果: {result}");}
}
4.2 复杂协调 - 多阶段数据处理
class MultiStageDataProcessing
{private Barrier _barrier;private int[][] _dataChunks;private int[][] _processedChunks;private int[] _finalResult;public void ProcessData(int numberOfWorkers){_barrier = new Barrier(numberOfWorkers, OnPhaseCompleted);InitializeData(numberOfWorkers);var workers = new Thread[numberOfWorkers];for (int i = 0; i < numberOfWorkers; i++){int workerId = i;workers[i] = new Thread(() => DataProcessor(workerId));workers[i].Start();}foreach (var worker in workers){worker.Join();}_barrier.Dispose();PrintResults();}private void OnPhaseCompleted(Barrier barrier){switch (barrier.CurrentPhaseNumber){case 1:Console.WriteLine($"=== 阶段 1 完成: 数据加载 ===");break;case 2:Console.WriteLine($"=== 阶段 2 完成: 数据预处理 ===");break;case 3:Console.WriteLine($"=== 阶段 3 完成: 数据分析 ===");break;case 4:Console.WriteLine($"=== 阶段 4 完成: 结果汇总 ===");break;}}private void DataProcessor(int workerId){// 阶段 1: 数据加载Console.WriteLine($"工作线程 {workerId} 加载数据块");LoadDataChunk(workerId);_barrier.SignalAndWait();// 阶段 2: 数据预处理Console.WriteLine($"工作线程 {workerId} 预处理数据");PreprocessData(workerId);_barrier.SignalAndWait();// 阶段 3: 数据分析Console.WriteLine($"工作线程 {workerId} 分析数据");AnalyzeData(workerId);_barrier.SignalAndWait();// 阶段 4: 结果汇总Console.WriteLine($"工作线程 {workerId} 汇总结果");AggregateResults(workerId);_barrier.SignalAndWait();Console.WriteLine($"工作线程 {workerId} 完成所有阶段");}private void InitializeData(int chunks){_dataChunks = new int[chunks][];_processedChunks = new int[chunks][];_finalResult = new int[chunks];Random rand = new Random();for (int i = 0; i < chunks; i++){_dataChunks[i] = new int[100];for (int j = 0; j < 100; j++){_dataChunks[i][j] = rand.Next(1, 100);}}}private void LoadDataChunk(int workerId){// 模拟数据加载Thread.Sleep(500);}private void PreprocessData(int workerId){_processedChunks[workerId] = _dataChunks[workerId].Select(x => x * 2) // 简单的预处理:乘以2.ToArray();Thread.Sleep(300);}private void AnalyzeData(int workerId){// 模拟数据分析:计算平均值_finalResult[workerId] = (int)_processedChunks[workerId].Average();Thread.Sleep(400);}private void AggregateResults(int workerId){// 最后一个阶段可以做一些最终处理if (workerId == 0){Console.WriteLine($"各数据块平均值: [{string.Join(", ", _finalResult)}]");}Thread.Sleep(200);}private void PrintResults(){Console.WriteLine($"\n处理完成!最终结果: {_finalResult.Average():F2}");}
}
4.3 动态参与者管理
class DynamicBarrierExample
{private Barrier _barrier;private int _activeWorkers;private readonly object _lock = new object();public void RunDynamicWorkload(){// 初始3个参与者_barrier = new Barrier(3, OnPhaseCompleted);_activeWorkers = 3;Console.WriteLine("=== 动态屏障演示 ===\n");// 启动初始工作线程for (int i = 0; i < 3; i++){StartWorker(i);}// 模拟动态添加和移除工作者Thread.Sleep(2000);AddWorker(3);Thread.Sleep(2000);RemoveWorker(1);Thread.Sleep(2000);AddWorker(4);// 让系统运行一段时间Thread.Sleep(5000);_barrier.Dispose();Console.WriteLine("演示完成");}private void StartWorker(int workerId){Thread worker = new Thread(() => DynamicWorker(workerId));worker.Start();Console.WriteLine($"启动工作线程 {workerId}");}private void DynamicWorker(int workerId){try{for (int phase = 0; phase < 10; phase++){if (!IsWorkerActive(workerId)){Console.WriteLine($"工作线程 {workerId} 被移除,退出");return;}Console.WriteLine($"工作线程 {workerId} 执行阶段 {phase}");Thread.Sleep(1000); // 模拟工作_barrier.SignalAndWait();// 检查是否应该继续if (!IsWorkerActive(workerId)){Console.WriteLine($"工作线程 {workerId} 在阶段 {phase} 后被移除");return;}}Console.WriteLine($"工作线程 {workerId} 完成所有阶段");}catch (BarrierPostPhaseException ex){Console.WriteLine($"工作线程 {workerId} 遇到屏障异常: {ex.InnerException?.Message}");}}private void AddWorker(int workerId){lock (_lock){_barrier.AddParticipant();_activeWorkers++;StartWorker(workerId);Console.WriteLine($"\n添加工作线程 {workerId},当前参与者: {_barrier.ParticipantCount}");}}private void RemoveWorker(int workerId){lock (_lock){_barrier.RemoveParticipant();_activeWorkers--;Console.WriteLine($"\n移除工作线程 {workerId},当前参与者: {_barrier.ParticipantCount}");}}private bool IsWorkerActive(int workerId){// 在实际应用中,这里会有更复杂的状态检查return true;}private void OnPhaseCompleted(Barrier barrier){Console.WriteLine($"\n[阶段 {barrier.CurrentPhaseNumber} 完成] 当前参与者: {barrier.ParticipantCount}");// 检查是否应该停止(所有工作完成)if (barrier.CurrentPhaseNumber >= 10){Console.WriteLine("达到最大阶段数,准备停止");}}
}
4.4 异常处理和屏障恢复
class BarrierExceptionHandling
{private Barrier _barrier;private volatile bool _hasError = false;private string _errorMessage = "";public void RunWithErrorHandling(){_barrier = new Barrier(4, (barrier) =>{if (_hasError){Console.WriteLine($"阶段 {barrier.CurrentPhaseNumber} 因错误而终止");// 在实际应用中,这里可以决定是否继续或中止}else{Console.WriteLine($"阶段 {barrier.CurrentPhaseNumber} 成功完成");}});var threads = new Thread[4];for (int i = 0; i < 4; i++){int threadId = i;threads[i] = new Thread(() => WorkerWithErrorHandling(threadId));threads[i].Start();}foreach (var thread in threads){thread.Join();}_barrier.Dispose();}private void WorkerWithErrorHandling(int threadId){try{for (int phase = 0; phase < 3; phase++){if (_hasError){Console.WriteLine($"线程 {threadId} 检测到错误,跳过阶段 {phase}");break;}Console.WriteLine($"线程 {threadId} 开始阶段 {phase}");// 模拟工作 - 线程2在阶段1模拟错误if (threadId == 2 && phase == 1){SimulateError(threadId, phase);}DoWork(threadId, phase);Console.WriteLine($"线程 {threadId} 完成阶段 {phase} 工作");// 信号和等待,处理可能的异常try{_barrier.SignalAndWait();}catch (BarrierPostPhaseException bppe){Console.WriteLine($"线程 {threadId} 捕获阶段后异常: {bppe.InnerException?.Message}");_hasError = true;_errorMessage = bppe.InnerException?.Message ?? "未知错误";}catch (Exception ex){Console.WriteLine($"线程 {threadId} 捕获屏障异常: {ex.Message}");_hasError = true;_errorMessage = ex.Message;}if (_hasError){Console.WriteLine($"线程 {threadId} 因错误提前退出");break;}}}catch (Exception ex){Console.WriteLine($"线程 {threadId} 发生未处理异常: {ex.Message}");_hasError = true;_errorMessage = ex.Message;}}private void DoWork(int threadId, int phase){Thread.Sleep(500); // 模拟工作}private void SimulateError(int threadId, int phase){Console.WriteLine($"线程 {threadId} 在阶段 {phase} 模拟错误");throw new InvalidOperationException($"线程 {threadId} 在阶段 {phase} 发生模拟错误");}
}
4.5 超时控制和取消操作
class BarrierWithTimeoutAndCancellation
{private Barrier _barrier;private CancellationTokenSource _cancellationTokenSource;public void RunWithTimeout(int timeoutPerPhaseMs){_cancellationTokenSource = new CancellationTokenSource();_barrier = new Barrier(4, OnPhaseCompleted);var tasks = new Task[4];for (int i = 0; i < 4; i++){int taskId = i;tasks[i] = Task.Run(() => WorkerWithTimeout(taskId, timeoutPerPhaseMs));}// 5秒后取消所有任务_cancellationTokenSource.CancelAfter(5000);try{Task.WaitAll(tasks);Console.WriteLine("所有任务正常完成");}catch (AggregateException ae){foreach (var ex in ae.InnerExceptions){Console.WriteLine($"任务异常: {ex.Message}");}}finally{_barrier.Dispose();_cancellationTokenSource.Dispose();}}private async Task WorkerWithTimeout(int taskId, int timeoutMs){var token = _cancellationTokenSource.Token;for (int phase = 0; phase < 5; phase++){if (token.IsCancellationRequested){Console.WriteLine($"任务 {taskId} 在阶段 {phase} 被取消");return;}Console.WriteLine($"任务 {taskId} 开始阶段 {phase}");try{// 模拟工作await DoWorkAsync(taskId, phase, token);Console.WriteLine($"任务 {taskId} 完成阶段 {phase} 工作,等待其他任务");// 带超时的屏障等待bool completed = _barrier.SignalAndWait(timeoutMs, token);if (!completed){Console.WriteLine($"任务 {taskId} 在阶段 {phase} 等待超时");break;}Console.WriteLine($"任务 {taskId} 通过阶段 {phase} 屏障");}catch (OperationCanceledException){Console.WriteLine($"任务 {taskId} 在阶段 {phase} 被取消");break;}catch (BarrierPostPhaseException bppe){Console.WriteLine($"任务 {taskId} 阶段后异常: {bppe.InnerException?.Message}");break;}catch (Exception ex){Console.WriteLine($"任务 {taskId} 异常: {ex.Message}");break;}}Console.WriteLine($"任务 {taskId} 退出");}private async Task DoWorkAsync(int taskId, int phase, CancellationToken token){// 模拟异步工作await Task.Delay(300 + (taskId * 100), token);// 模拟偶尔的长时间工作if (taskId == 2 && phase == 2){await Task.Delay(2000, token); // 这个任务会超时}}private void OnPhaseCompleted(Barrier barrier){Console.WriteLine($"[回调] 阶段 {barrier.CurrentPhaseNumber} 完成,参与者: {barrier.ParticipantCount}");// 检查是否应该提前结束if (barrier.CurrentPhaseNumber >= 3 && _cancellationTokenSource.Token.IsCancellationRequested){Console.WriteLine("[回调] 检测到取消请求");}}
}
5、屏障的内部等待机制
5.1 阶段号检查机制
屏障使用阶段号来区分不同的同步点:
class BarrierPhaseCheck
{private int _currentPhase = 0;public void SignalAndWait(){int phaseAtEntry = _currentPhase;// 减少剩余计数int remaining = Interlocked.Decrement(ref _remainingParticipants);if (remaining == 0){// 最后一个线程:重置屏障ResetBarrier();}else{// 等待阶段号变化SpinWait spinWait = new SpinWait();while (_currentPhase == phaseAtEntry){spinWait.SpinOnce();// 如果自旋等待时间过长,进入内核等待if (spinWait.Count > 100){KernelWait(phaseAtEntry);break;}}}}private void ResetBarrier(){// 执行阶段后动作try{_postPhaseAction?.Invoke(this);}catch (Exception ex){// 包装异常,让所有等待线程都能看到throw new BarrierPostPhaseException(ex);}// 递增阶段号Interlocked.Increment(ref _currentPhase);// 重置剩余计数Interlocked.Exchange(ref _remainingParticipants, _participantCount);// 唤醒所有等待线程WakeAllThreads();}
}
6、最佳实践和注意事项
6.1 正确模式
class BarrierBestPractices
{private Barrier _barrier;public void GoodPattern1(){// 总是使用 using 或手动 Disposeusing (_barrier = new Barrier(4, OnPhaseComplete)){RunParallelWork();}}public void GoodPattern2(){// 处理阶段后动作中的异常_barrier = new Barrier(4, (b) =>{try{OnPhaseComplete(b);}catch (Exception ex){Console.WriteLine($"阶段后动作异常: {ex.Message}");// 决定是否继续或中止}});}public void GoodPattern3(){// 使用超时避免永久阻塞if (!_barrier.SignalAndWait(5000)){Console.WriteLine("屏障等待超时");// 处理超时情况}}public void GoodPattern4(){// 动态调整参与者时的线程安全lock (this){_barrier.AddParticipant();StartNewWorker();}}private void OnPhaseComplete(Barrier barrier){Console.WriteLine($"阶段 {barrier.CurrentPhaseNumber} 完成");// 检查终止条件if (barrier.CurrentPhaseNumber >= 10){Console.WriteLine("达到最大阶段数");}}private void RunParallelWork(){Parallel.For(0, 4, i =>{for (int phase = 0; phase < 5; phase++){DoWork(i, phase);_barrier.SignalAndWait();}});}private void DoWork(int workerId, int phase){Thread.Sleep(100);}private void StartNewWorker(){// 启动新工作线程}
}
6.2 常见陷阱
class BarrierCommonMistakes
{// 错误1:死锁 - 线程数不匹配public void Mistake1_Deadlock(){var barrier = new Barrier(3); // 需要3个参与者// 但只启动了2个线程for (int i = 0; i < 2; i++){Thread t = new Thread(() => {barrier.SignalAndWait(); // 永远等待第三个参与者});t.Start();}}// 错误2:阶段后动作中的异常public void Mistake2_PostPhaseException(){var barrier = new Barrier(2, (b) =>{throw new Exception("阶段后动作失败!"); // 导致所有线程收到 BarrierPostPhaseException});Thread t1 = new Thread(() => {try{barrier.SignalAndWait();}catch (BarrierPostPhaseException ex){Console.WriteLine($"线程1捕获异常: {ex.InnerException.Message}");}});Thread t2 = new Thread(() => {try{barrier.SignalAndWait();}catch (BarrierPostPhaseException ex){Console.WriteLine($"线程2捕获异常: {ex.InnerException.Message}");}});t1.Start();t2.Start();t1.Join();t2.Join();}// 错误3:忘记处理屏障损坏public void Mistake3_BrokenBarrier(){var barrier = new Barrier(2);Thread t1 = new Thread(() => {try{barrier.SignalAndWait();}catch (Exception){// 线程异常退出,导致屏障损坏}});Thread t2 = new Thread(() => {Thread.Sleep(1000);try{barrier.SignalAndWait(); // 抛出 BrokenBarrierException}catch (BarrierPostPhaseException){Console.WriteLine("屏障已损坏");}});t1.Start();t2.Start();}// 错误4:不正确的阶段间数据共享private int _sharedData;private Barrier _barrier;public void Mistake4_DataRace(){_barrier = new Barrier(2);Thread t1 = new Thread(() => {_sharedData = 42;_barrier.SignalAndWait();// 这里可能看到不正确的 _sharedData 值Console.WriteLine($"线程1看到: {_sharedData}");});Thread t2 = new Thread(() => {_sharedData = 100;_barrier.SignalAndWait();// 这里可能看到不正确的 _sharedData 值Console.WriteLine($"线程2看到: {_sharedData}");});t1.Start();t2.Start();}
}
7、性能考虑
7.1 屏障 vs 其他同步机制
class BarrierPerformanceComparison
{public void CompareWithOtherSyncMethods(){int iterations = 1000;int threadCount = 4;TestBarrier(threadCount, iterations);TestManualSync(threadCount, iterations);TestCountdownEvent(threadCount, iterations);}private void TestBarrier(int threadCount, int iterations){var barrier = new Barrier(threadCount);var stopwatch = Stopwatch.StartNew();Parallel.For(0, threadCount, i => {for (int j = 0; j < iterations; j++){DoWork(i, j);barrier.SignalAndWait();}});Console.WriteLine($"Barrier: {stopwatch.ElapsedMilliseconds}ms");barrier.Dispose();}private void TestManualSync(int threadCount, int iterations){int completedCount = 0;object lockObj = new object();var stopwatch = Stopwatch.StartNew();Parallel.For(0, threadCount, i => {for (int j = 0; j < iterations; j++){DoWork(i, j);lock (lockObj){completedCount++;if (completedCount == threadCount){completedCount = 0;Monitor.PulseAll(lockObj);}else{Monitor.Wait(lockObj);}}}});Console.WriteLine($"Manual Sync: {stopwatch.ElapsedMilliseconds}ms");}private void TestCountdownEvent(int threadCount, int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, threadCount, i => {for (int j = 0; j < iterations; j++){DoWork(i, j);// CountdownEvent 需要重新初始化,不如屏障方便// 这里只是示意,实际需要更复杂的逻辑}});Console.WriteLine($"CountdownEvent: {stopwatch.ElapsedMilliseconds}ms");}private void DoWork(int threadId, int iteration){Thread.SpinWait(1000); // 模拟少量工作}
}
8、总结
屏障的本质:
-
是多阶段并行任务的同步协调器
-
通过阶段号管理不同的同步点
-
使用参与者计数确保所有线程就绪
-
支持阶段后动作执行聚合操作
核心优势:
-
精确的阶段同步:确保所有线程完成一个阶段后再进入下一阶段
-
内置聚合点:阶段后动作适合执行结果汇总
-
动态适应性:支持运行时调整参与者数量
-
内存一致性:隐含内存屏障,保证阶段间数据可见性
适用场景:
-
并行算法:多阶段计算、排序、搜索
-
模拟系统:离散事件模拟、时间步推进
-
数据处理:ETL管道、多阶段数据转换
-
复杂工作流:需要严格阶段协调的业务流程
使用要点:
-
参与者数量必须与实际线程数匹配
-
妥善处理阶段后动作中的异常
-
考虑使用超时避免永久阻塞
-
动态调整参与者时要保证线程安全
-
注意阶段间的数据依赖和内存可见性
屏障是多阶段并行计算的终极协调工具,在合适的场景下可以大大简化复杂并发逻辑的实现!
