多线程六脉神剑第三剑:信号量 (Semaphore)
文章目录
- 1、举个栗子🌰
- 2、信号量的核心本质
- 2.1 内核对象结构
- 2.1 三种基本操作
- 3、底层原理深入解析
- 3.1 原子计数器的核心作用
- 3.2 两阶段获取策略
- 4、信号量的完整使用
- 4.1 基础用法 - 控制并发数
- 4.2 生产者-消费者模式
- 4.3 现代替代品 - SemaphoreSlim
- 4.4 高级用法 - 资源池管理
- 5、命名信号量(跨进程同步)
- 6、信号量的内部等待机制
- 6.1 等待队列管理
- 7、最佳实践和注意事项
- 7.1 正确模式
- 7.2 常见陷阱
- 8、性能考虑
- 8.1 Semaphore vs SemaphoreSlim
- 9、总结
1、举个栗子🌰
场景:游乐场的旋转木马
-
旋转木马有 5 个座位(共享资源)
-
Semaphore 就是售票员手中的 5 张门票
-
游客想玩 → 买票(WaitOne)
-
如果没有票了 → 排队等待(阻塞)
-
有游客玩完离开 → 归还一张票(Release)
-
新游客拿到票 → 开始玩
信号量的本质:通过计数器来控制同时访问某个资源的线程数量。
2、信号量的核心本质
2.1 内核对象结构
信号量在操作系统内核中的概念结构:
// 概念性结构 - 实际在操作系统内核中实现
class SemaphoreKernelObject
{int CurrentCount; // 当前可用许可数int MaximumCount; // 最大许可数Queue<Thread> WaitQueue; // 等待队列// ... 其他同步信息
}
2.1 三种基本操作
class SemaphoreEssence
{// 1. 初始化:创建时有 N 个许可Semaphore semaphore = new Semaphore(initialCount: 3, maximumCount: 3);// 2. 获取许可:如果没有许可就等待semaphore.WaitOne(); // 或 WaitOne(timeout)// 3. 释放许可:归还一个许可semaphore.Release();
}
3、底层原理深入解析
3.1 原子计数器的核心作用
信号量的核心是一个线程安全的计数器:
class SemaphoreInternal
{private int _count;private int _maxCount;public bool WaitOne(int timeout){// 快速路径:如果计数器 > 0,原子减1int oldCount;do{oldCount = _count;if (oldCount == 0)break; // 需要进入慢速路径// 尝试原子性地减少计数器} while (Interlocked.CompareExchange(ref _count, oldCount - 1, oldCount) != oldCount);if (oldCount > 0)return true; // 快速获取成功// 慢速路径:进入内核等待return KernelWait(timeout);}public void Release(){int oldCount = Interlocked.Increment(ref _count);if (oldCount <= 0){// 有线程在等待,唤醒一个WakeOneWaitingThread();}else if (oldCount > _maxCount){// 防止超过最大许可数Interlocked.Decrement(ref _count);throw new SemaphoreFullException();}}
}
3.2 两阶段获取策略
现代信号量实现采用优化策略:
class OptimizedSemaphore
{private int _userModeCount; // 用户态计数器private KernelObject _kernelObject;public bool WaitOne(int timeout){// 第一阶段:用户态自旋尝试for (int spinCount = 0; spinCount < 100; spinCount++){int current = _userModeCount;if (current > 0 && Interlocked.CompareExchange(ref _userModeCount, current - 1, current) == current){return true; // 用户态快速获取成功}if (current == 0) break; // 没有许可,直接进入内核Thread.SpinWait(50); // 稍微自旋等待}// 第二阶段:内核等待return KernelWaitWithTimeout(_kernelObject, timeout);}
}
4、信号量的完整使用
4.1 基础用法 - 控制并发数
class DatabaseConnectionPool
{private Semaphore _semaphore;private List<Connection> _connections;public DatabaseConnectionPool(int poolSize){// 初始化信号量,许可数等于连接池大小_semaphore = new Semaphore(poolSize, poolSize);_connections = new List<Connection>();// 初始化连接for (int i = 0; i < poolSize; i++){_connections.Add(new Connection($"Connection-{i}"));}}public Connection GetConnection(int timeoutMs = 5000){Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] 等待获取数据库连接...");// 等待获取许可if (!_semaphore.WaitOne(timeoutMs)){throw new TimeoutException("获取数据库连接超时");}// 成功获取许可,分配连接lock (_connections){var connection = _connections[0];_connections.RemoveAt(0);Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] 获得连接: {connection.Name}");return new PooledConnection(connection, this);}}private void ReturnConnection(Connection connection){lock (_connections){_connections.Add(connection);Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] 归还连接: {connection.Name}");}// 释放许可_semaphore.Release();}// 包装类,确保连接被正确归还private class PooledConnection : Connection{private readonly DatabaseConnectionPool _pool;private readonly Connection _realConnection;public PooledConnection(Connection realConnection, DatabaseConnectionPool pool) : base(realConnection.Name){_realConnection = realConnection;_pool = pool;}public override void Close(){_pool.ReturnConnection(_realConnection);}}
}// 使用示例
class Program
{static void Main(){var pool = new DatabaseConnectionPool(3); // 只有3个连接// 启动10个线程模拟并发请求var tasks = new List<Task>();for (int i = 0; i < 10; i++){int threadId = i;tasks.Add(Task.Run(() => UseDatabase(pool, threadId)));}Task.WaitAll(tasks.ToArray());Console.WriteLine("所有数据库操作完成");}static void UseDatabase(DatabaseConnectionPool pool, int threadId){try{using (var connection = pool.GetConnection()){Console.WriteLine($"[线程{threadId}] 使用连接执行查询...");Thread.Sleep(2000); // 模拟数据库操作Console.WriteLine($"[线程{threadId}] 查询完成");} // 自动调用 Close() 归还连接}catch (TimeoutException ex){Console.WriteLine($"[线程{threadId}] 错误: {ex.Message}");}}
}
4.2 生产者-消费者模式
信号量非常适合实现生产者-消费者模式:
class BoundedBuffer<T>
{private readonly Semaphore _emptySlots; // 空槽位信号量private readonly Semaphore _filledSlots; // 已填充槽位信号量private readonly T[] _buffer;private int _putIndex = 0;private int _takeIndex = 0;public BoundedBuffer(int capacity){_buffer = new T[capacity];_emptySlots = new Semaphore(capacity, capacity); // 初始有capacity个空槽_filledSlots = new Semaphore(0, capacity); // 初始没有填充的槽}public void Put(T item){// 等待空槽位_emptySlots.WaitOne();lock (_buffer){_buffer[_putIndex] = item;_putIndex = (_putIndex + 1) % _buffer.Length;}// 通知消费者有新数据_filledSlots.Release();}public T Take(){// 等待有数据的槽位_filledSlots.WaitOne();T item;lock (_buffer){item = _buffer[_takeIndex];_takeIndex = (_takeIndex + 1) % _buffer.Length;}// 通知生产者有新空位_emptySlots.Release();return item;}
}// 使用示例
class ProducerConsumerExample
{static void Main(){var buffer = new BoundedBuffer<int>(5); // 容量为5的缓冲区// 生产者var producer = Task.Run(() => {for (int i = 0; i < 20; i++){buffer.Put(i);Console.WriteLine($"生产: {i}");Thread.Sleep(100);}});// 消费者var consumer = Task.Run(() => {for (int i = 0; i < 20; i++){int item = buffer.Take();Console.WriteLine($"消费: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}
4.3 现代替代品 - SemaphoreSlim
SemaphoreSlim 是轻量级版本,适合大多数场景:
class SemaphoreSlimExample
{private SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(3, 3); // 最大并发数3public async Task ProcessRequestsAsync(){var tasks = new List<Task>();for (int i = 0; i < 10; i++){tasks.Add(ProcessRequestAsync(i));}await Task.WhenAll(tasks);}private async Task ProcessRequestAsync(int requestId){Console.WriteLine($"请求 {requestId} 等待处理...");// 异步等待许可await _semaphoreSlim.WaitAsync();try{Console.WriteLine($"请求 {requestId} 开始处理,当前并发: {3 - _semaphoreSlim.CurrentCount}");// 模拟异步工作await Task.Delay(2000);Console.WriteLine($"请求 {requestId} 处理完成");}finally{_semaphoreSlim.Release();}}
}
4.4 高级用法 - 资源池管理
class ResourcePool<T> where T : new()
{private readonly Semaphore _semaphore;private readonly ConcurrentStack<T> _resources;private readonly int _maxSize;public ResourcePool(int maxSize){_maxSize = maxSize;_semaphore = new Semaphore(maxSize, maxSize);_resources = new ConcurrentStack<T>();// 预创建资源for (int i = 0; i < maxSize; i++){_resources.Push(new T());}}public PooledResource<T> Acquire(int timeoutMs = -1){if (!_semaphore.WaitOne(timeoutMs))throw new TimeoutException("获取资源超时");T resource;if (!_resources.TryPop(out resource)){resource = new T(); // 动态创建(如果栈为空但信号量有许可)}return new PooledResource<T>(resource, this);}private void Release(T resource){_resources.Push(resource);_semaphore.Release();}public class PooledResource<TRes> : IDisposable{public TRes Resource { get; }private readonly ResourcePool<TRes> _pool;private bool _disposed = false;public PooledResource(TRes resource, ResourcePool<TRes> pool){Resource = resource;_pool = pool;}public void Dispose(){if (!_disposed){_pool.Release(Resource);_disposed = true;}}}
}// 使用示例
class ResourcePoolExample
{static void Main(){var pool = new ResourcePool<StringBuilder>(2); // 最多2个StringBuilderParallel.For(0, 10, i => {using (var pooledResource = pool.Acquire(1000)){var sb = pooledResource.Resource;sb.Clear();sb.Append($"线程 {i} 正在使用StringBuilder");Console.WriteLine(sb.ToString());Thread.Sleep(500);}});}
}
5、命名信号量(跨进程同步)
信号量可以跨进程使用,用于协调多个应用程序:
// 进程 A
class ProcessA
{static void Main(){// 创建命名信号量using (var semaphore = new Semaphore(2, 2, "Global\\MyCrossProcessSemaphore")){Console.WriteLine("进程A: 已创建信号量,按任意键开始工作...");Console.ReadKey();for (int i = 0; i < 5; i++){semaphore.WaitOne();try{Console.WriteLine($"进程A: 获取许可,执行工作 {i}");Thread.Sleep(2000);}finally{semaphore.Release();Console.WriteLine($"进程A: 释放许可");}}}}
}// 进程 B
class ProcessB
{static void Main(){try{// 打开已有的命名信号量using (var semaphore = Semaphore.OpenExisting("Global\\MyCrossProcessSemaphore")){Console.WriteLine("进程B: 已连接信号量,开始工作...");for (int i = 0; i < 5; i++){semaphore.WaitOne();try{Console.WriteLine($"进程B: 获取许可,执行工作 {i}");Thread.Sleep(1500);}finally{semaphore.Release();Console.WriteLine($"进程B: 释放许可");}}}}catch (WaitHandleCannotBeOpenedException){Console.WriteLine("进程B: 信号量不存在,请先启动进程A");}}
}
6、信号量的内部等待机制
6.1 等待队列管理
class SemaphoreWaitQueue
{// 当线程调用 WaitOne() 但计数器为0时:public bool WaitOneInternal(int timeout){// 1. 将当前线程加入等待队列WaitQueue.Enqueue(CurrentThread);// 2. 线程状态改为等待CurrentThread.State = ThreadState.Waiting;// 3. 设置超时计时器(如果有超时)if (timeout != Timeout.Infinite){StartTimeoutTimer(timeout);}// 4. 触发线程调度Thread.SwitchToOtherThread();// 5. 被唤醒后检查是否超时return !TimedOut;}// 当有线程调用 Release() 时:public void ReleaseInternal(){if (WaitQueue.Count > 0){// 从等待队列中唤醒一个线程Thread threadToWake = WaitQueue.Dequeue();threadToWake.State = ThreadState.Running;// 注意:这里不会增加计数器,因为许可直接给了等待的线程}else{// 没有等待线程,增加计数器Interlocked.Increment(ref _count);}}
}
7、最佳实践和注意事项
7.1 正确模式
class CorrectUsage
{private Semaphore _semaphore = new Semaphore(5, 5);public void GoodPattern1(){// 总是使用 try-finally_semaphore.WaitOne();try{// 执行工作DoWork();}finally{_semaphore.Release();}}public bool GoodPattern2(){// 使用超时避免死锁if (!_semaphore.WaitOne(5000)){// 超时处理LogTimeout();return false;}try{DoWork();return true;}finally{_semaphore.Release();}}
}
7.2 常见陷阱
class CommonMistakes
{// 错误1:释放次数多于获取次数public void Mistake1(){var semaphore = new Semaphore(1, 1);semaphore.WaitOne();semaphore.Release();semaphore.Release(); // 抛出 SemaphoreFullException!}// 错误2:忘记释放信号量public void Mistake2(){var semaphore = new Semaphore(1, 1);semaphore.WaitOne();// 如果这里发生异常,信号量永远不会释放!// 应该使用 try-finally}// 错误3:错误的初始值public void Mistake3(){// 初始许可数大于最大许可数!var semaphore = new Semaphore(5, 3); // 抛出 ArgumentException}
}
8、性能考虑
8.1 Semaphore vs SemaphoreSlim
class PerformanceComparison
{// 重量级 - 适用于跨进程、复杂同步private Semaphore _semaphore = new Semaphore(5, 5);// 轻量级 - 适用于进程内同步,性能更好private SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5, 5);public async Task TestPerformance(){var stopwatch = Stopwatch.StartNew();// 测试 SemaphoreSlim(异步支持)var tasks = new List<Task>();for (int i = 0; i < 1000; i++){tasks.Add(UseSemaphoreSlimAsync());}await Task.WhenAll(tasks);Console.WriteLine($"SemaphoreSlim: {stopwatch.ElapsedMilliseconds}ms");stopwatch.Restart();// 测试 Semaphore(同步)var threads = new List<Thread>();for (int i = 0; i < 1000; i++){var thread = new Thread(UseSemaphore);thread.Start();threads.Add(thread);}foreach (var thread in threads) thread.Join();Console.WriteLine($"Semaphore: {stopwatch.ElapsedMilliseconds}ms");}
}
9、总结
信号量的本质:
-
是一个带计数器的同步原语
-
通过原子操作管理许可数量
-
使用等待队列处理竞争情况
-
支持跨进程同步(命名信号量)
核心优势:
-
精确控制并发数:限制同时访问资源的线程数量
-
资源池管理:数据库连接池、线程池等
-
生产者-消费者协调:缓冲区空/满状态管理
-
流量控制:限制系统负载
使用要点:
-
进程内同步优先使用 SemaphoreSlim
-
跨进程同步使用 Semaphore
-
总是使用 try-finally 确保释放
-
合理设置初始和最大许可数
-
考虑使用超时避免死锁
