多线程六脉神剑第六剑:事件同步 (AutoResetEvent/ManualResetEvent)
文章目录
- 1、举栗子
- 2、事件同步的核心本质
- 2.1 内核对象结构
- 2.2 两种事件类型
- 3、底层原理深入解析
- 3.1 状态管理机制
- 3.2 内核等待队列的工作原理
- 4、事件同步的完整使用
- 4.1 ManualResetEvent 使用示例
- 4.2 AutoResetEvent 使用示例
- 4.3 生产者-消费者模式
- 4.4 复杂的线程协调
- 4.5 超时控制和等待多个事件
- 4.6 现代替代品 - ManualResetEventSlim
- 5、事件同步的内部等待机制
- 5.1 内核切换的代价
- 5.2 轻量级版本的优化
- 6、最佳实践和注意事项
- 6.1 正确模式
- 6.2 常见陷阱
- 7、性能考虑
- 7.1 事件类型选择指南
- 7.2 性能对比
- 8、总结
1、举栗子
场景:火车站台与列车
-
ManualResetEvent:像火车站的大门
-
关闭时(无信号):所有乘客在站外等待
-
打开时(有信号):所有乘客可以同时进入
-
需要人工控制门的开关
-
-
AutoResetEvent:像旋转闸机
-
关闭时(无信号):乘客排队等待
-
打开时(有信号):只允许一个人通过,然后自动关闭
-
每次只放行一个人
-
事件同步的本质:通过信号状态来协调线程的执行顺序,让线程能够等待某个条件满足后再继续执行。
2、事件同步的核心本质
2.1 内核对象结构
事件在操作系统内核中的概念结构
// 概念性结构 - 实际在操作系统内核中实现
class EventKernelObject
{bool IsSignaled; // 当前事件状态(true=有信号,false=无信号)bool AutoReset; // 是否自动重置Queue<Thread> WaitQueue; // 等待线程队列// ... 其他同步信息
}
2.2 两种事件类型
class EventTypes
{// ManualResetEvent - 手动重置事件// 设置信号后,所有等待线程都会被释放,直到手动重置ManualResetEvent manualEvent = new ManualResetEvent(false);// AutoResetEvent - 自动重置事件 // 设置信号后,只释放一个等待线程,然后自动重置AutoResetEvent autoEvent = new AutoResetEvent(false);
}
3、底层原理深入解析
3.1 状态管理机制
事件的核心是一个布尔状态标志:
class EventInternalState
{private const int SIGNALED = 1;private const int NON_SIGNALED = 0;private int _state = NON_SIGNALED;// 当线程调用 WaitOne() 时:public bool WaitOne(int timeoutMs){// 快速路径:如果已经有信号,立即返回if (_state == SIGNALED){if (AutoReset){// 自动重置:原子性地将状态改回无信号Interlocked.CompareExchange(ref _state, NON_SIGNALED, SIGNALED);}return true;}// 慢速路径:进入内核等待队列return KernelWait(timeoutMs);}// 当线程调用 Set() 时:public void Set(){if (AutoReset){// 自动重置事件:如果有等待线程,唤醒一个;否则设置信号状态if (WaitQueue.Count > 0){WakeOneWaitingThread(); // 不改变状态(保持无信号)}else{Interlocked.Exchange(ref _state, SIGNALED);}}else{// 手动重置事件:唤醒所有等待线程,并保持有信号状态WakeAllWaitingThreads();Interlocked.Exchange(ref _state, SIGNALED);}}
}
3.2 内核等待队列的工作原理
class EventWaitQueue
{// 当线程进入等待时:void EnterWaitQueue(Thread thread){// 1. 将线程状态改为"等待"thread.State = ThreadState.WaitSleepJoin;// 2. 将线程加入事件的等待队列WaitQueue.Enqueue(thread);// 3. 触发线程调度,切换到其他可运行线程Thread.SwitchToOtherThread();}// 当事件被设置时:void WakeThreads(bool autoReset){if (autoReset){// 自动重置:只唤醒一个线程if (WaitQueue.Count > 0){Thread threadToWake = WaitQueue.Dequeue();threadToWake.State = ThreadState.Running;// 注意:状态保持无信号}else{// 没有等待线程,设置信号状态_state = SIGNALED;}}else{// 手动重置:唤醒所有等待线程while (WaitQueue.Count > 0){Thread threadToWake = WaitQueue.Dequeue();threadToWake.State = ThreadState.Running;}// 设置信号状态_state = SIGNALED;}}
}
4、事件同步的完整使用
4.1 ManualResetEvent 使用示例
class ManualResetEventExample
{private ManualResetEvent _gateOpened = new ManualResetEvent(false);private int _visitorCount = 0;public void RunExample(){Console.WriteLine("=== ManualResetEvent 演示 ===\n");// 启动多个访客线程(在门外等待)for (int i = 0; i < 5; i++){int visitorId = i;Thread visitor = new Thread(() => VisitPark(visitorId));visitor.Start();}Thread.Sleep(2000);// 第一次开门 - 所有等待的访客同时进入Console.WriteLine("\n管理员:开门!所有访客可以进入了!");_gateOpened.Set();Thread.Sleep(3000);// 关门 - 新来的访客需要等待Console.WriteLine("\n管理员:关门!新访客需要等待。");_gateOpened.Reset();// 新访客到达(会被阻塞)Thread lateVisitor = new Thread(() => VisitPark(99));lateVisitor.Start();Thread.Sleep(2000);// 第二次开门Console.WriteLine("\n管理员:再次开门!");_gateOpened.Set();Thread.Sleep(2000);Console.WriteLine($"\n总访客数: {_visitorCount}");}private void VisitPark(int visitorId){Console.WriteLine($"访客 {visitorId} 到达公园门口,等待开门...");_gateOpened.WaitOne(); // 等待开门信号Interlocked.Increment(ref _visitorCount);Console.WriteLine($"访客 {visitorId} 进入公园游玩!");// 模拟在公园内游玩Thread.Sleep(1000);Console.WriteLine($"访客 {visitorId} 离开公园");}
}// 运行示例
// new ManualResetEventExample().RunExample();
4.2 AutoResetEvent 使用示例
class AutoResetEventExample
{private AutoResetEvent _ticketMachine = new AutoResetEvent(false);private int _servedCustomers = 0;public void RunExample(){Console.WriteLine("=== AutoResetEvent 演示 ===\n");// 启动多个顾客线程for (int i = 0; i < 5; i++){int customerId = i;Thread customer = new Thread(() => BuyTicket(customerId));customer.Start();}Thread.Sleep(1000);// 售票员依次放行顾客for (int i = 0; i < 5; i++){Thread.Sleep(1000);Console.WriteLine($"\n售票员:下一位顾客请进!");_ticketMachine.Set(); // 每次只放行一个人// 注意:AutoResetEvent 在 Set() 后会自动重置为无信号状态}Thread.Sleep(2000);Console.WriteLine($"\n总服务顾客数: {_servedCustomers}");}private void BuyTicket(int customerId){Console.WriteLine($"顾客 {customerId} 在售票窗口前等待...");_ticketMachine.WaitOne(); // 等待叫号Interlocked.Increment(ref _servedCustomers);Console.WriteLine($"顾客 {customerId} 正在购买门票...");// 模拟购票过程Thread.Sleep(500);Console.WriteLine($"顾客 {customerId} 完成购票,离开窗口");}
}
4.3 生产者-消费者模式
class ProducerConsumerWithEvents
{private AutoResetEvent _producerEvent = new AutoResetEvent(true); // 初始有信号,生产者可以先生产private AutoResetEvent _consumerEvent = new AutoResetEvent(false); // 初始无信号,消费者等待private Queue<int> _buffer = new Queue<int>();private const int BUFFER_SIZE = 3;private int _itemCount = 0;private bool _stopProduction = false;public void RunExample(){Console.WriteLine("=== 生产者-消费者模式演示 ===\n");// 启动生产者Thread producer = new Thread(Producer);producer.Start();// 启动消费者Thread consumer = new Thread(Consumer);consumer.Start();// 运行一段时间后停止Thread.Sleep(10000);_stopProduction = true;_producerEvent.Set(); // 唤醒生产者以检查停止条件producer.Join();consumer.Join();Console.WriteLine($"\n生产消费完成,总生产数: {_itemCount}");}private void Producer(){int item = 0;while (!_stopProduction){// 等待可以生产的信号_producerEvent.WaitOne();if (_stopProduction) break;// 生产一个项目lock (_buffer){if (_buffer.Count < BUFFER_SIZE){_buffer.Enqueue(++item);_itemCount++;Console.WriteLine($"生产者: 生产项目 {item}, 缓冲区大小: {_buffer.Count}");// 通知消费者有新项目_consumerEvent.Set();// 如果缓冲区未满,继续生产if (_buffer.Count < BUFFER_SIZE){_producerEvent.Set();}}else{// 缓冲区已满,等待消费者消费// 不设置_producerEvent,等待消费者唤醒}}Thread.Sleep(500); // 模拟生产时间}Console.WriteLine("生产者停止生产");}private void Consumer(){while (true){// 等待可以消费的信号_consumerEvent.WaitOne();lock (_buffer){if (_buffer.Count > 0){int item = _buffer.Dequeue();Console.WriteLine($"消费者: 消费项目 {item}, 缓冲区大小: {_buffer.Count}");// 通知生产者可以继续生产_producerEvent.Set();// 如果缓冲区还有项目,继续消费if (_buffer.Count > 0){_consumerEvent.Set();}}else if (_stopProduction){// 停止生产且缓冲区为空,退出break;}}Thread.Sleep(800); // 模拟消费时间}Console.WriteLine("消费者停止消费");}
}
4.4 复杂的线程协调
class ComplexCoordination
{private ManualResetEvent _phase1Completed = new ManualResetEvent(false);private ManualResetEvent _phase2Completed = new ManualResetEvent(false);private ManualResetEvent _allDone = new ManualResetEvent(false);public void RunComplexWorkflow(){Console.WriteLine("=== 复杂工作流协调演示 ===\n");// 阶段1的工作线程Thread phase1Worker1 = new Thread(() => DoPhase1Work("Worker1"));Thread phase1Worker2 = new Thread(() => DoPhase1Work("Worker2"));// 阶段2的工作线程(依赖阶段1完成)Thread phase2Worker = new Thread(DoPhase2Work);// 最终阶段线程(依赖阶段2完成)Thread finalWorker = new Thread(DoFinalWork);// 启动线程phase1Worker1.Start();phase1Worker2.Start();phase2Worker.Start();finalWorker.Start();// 等待所有工作完成_allDone.WaitOne();Console.WriteLine("\n所有工作流程完成!");}private void DoPhase1Work(string workerName){Console.WriteLine($"{workerName} 开始阶段1工作...");Thread.Sleep(2000); // 模拟工作Console.WriteLine($"{workerName} 完成阶段1工作");// 如果是最后一个完成阶段1的线程,设置事件if (workerName == "Worker2") // 假设Worker2是最后一个{Console.WriteLine("阶段1所有工作完成!");_phase1Completed.Set();}}private void DoPhase2Work(){Console.WriteLine("阶段2工作等待阶段1完成...");_phase1Completed.WaitOne(); // 等待阶段1完成Console.WriteLine("阶段2工作开始...");Thread.Sleep(1500); // 模拟工作Console.WriteLine("阶段2工作完成!");_phase2Completed.Set(); // 通知阶段2完成}private void DoFinalWork(){Console.WriteLine("最终工作等待阶段2完成...");_phase2Completed.WaitOne(); // 等待阶段2完成Console.WriteLine("最终工作开始...");Thread.Sleep(1000); // 模拟工作Console.WriteLine("最终工作完成!");_allDone.Set(); // 通知所有工作完成}
}
4.5 超时控制和等待多个事件
class TimeoutAndMultipleWait
{private ManualResetEvent _event1 = new ManualResetEvent(false);private ManualResetEvent _event2 = new ManualResetEvent(false);private ManualResetEvent _event3 = new ManualResetEvent(false);public void DemonstrateWaits(){Console.WriteLine("=== 超时控制和多事件等待演示 ===\n");// 启动多个工作线程new Thread(() => Worker("Worker1", _event1, 1000)).Start();new Thread(() => Worker("Worker2", _event2, 3000)).Start();new Thread(() => Worker("Worker3", _event3, 5000)).Start();// 1. 等待任意一个事件(WaitAny)Console.WriteLine("主线程: 等待任意一个工作完成...");int firstCompleted = WaitHandle.WaitAny(new WaitHandle[] { _event1, _event2, _event3 });Console.WriteLine($"第一个完成的工作: Worker{firstCompleted + 1}\n");// 2. 带超时的等待Console.WriteLine("主线程: 等待第二个工作完成(超时2秒)...");bool success = _event2.WaitOne(2000);if (success){Console.WriteLine("第二个工作在超时前完成了");}else{Console.WriteLine("等待第二个工作超时!");}// 3. 等待所有事件(WaitAll)Console.WriteLine("\n主线程: 等待所有剩余工作完成...");WaitHandle.WaitAll(new WaitHandle[] { _event1, _event2, _event3 });Console.WriteLine("所有工作都完成了!");// 重置事件以便重新使用_event1.Reset();_event2.Reset();_event3.Reset();// 4. 测试无限等待Console.WriteLine("\n测试无限等待(10秒超时)...");Thread testThread = new Thread(() => {bool signaled = _event1.WaitOne(Timeout.Infinite); // 无限等待Console.WriteLine($"无限等待结果: {signaled}");});testThread.Start();Thread.Sleep(2000);_event1.Set(); // 在2秒后设置事件testThread.Join();}private void Worker(string name, ManualResetEvent completionEvent, int workTimeMs){Console.WriteLine($"{name} 开始工作(需要 {workTimeMs}ms)...");Thread.Sleep(workTimeMs);Console.WriteLine($"{name} 完成工作");completionEvent.Set();}
}
4.6 现代替代品 - ManualResetEventSlim
class ManualResetEventSlimExample
{private ManualResetEventSlim _eventSlim = new ManualResetEventSlim(false);public async Task RunAsyncExample(){Console.WriteLine("=== ManualResetEventSlim 异步演示 ===\n");// 启动多个异步任务var tasks = new List<Task>();for (int i = 0; i < 3; i++){int taskId = i;tasks.Add(Task.Run(async () => await WaitForEventAsync(taskId)));}await Task.Delay(2000);Console.WriteLine("\n设置事件信号...");_eventSlim.Set();await Task.WhenAll(tasks);Console.WriteLine("\n所有任务完成");// 重置事件_eventSlim.Reset();// 测试异步等待Console.WriteLine("\n测试异步等待...");var waitingTask = Task.Run(async () => {Console.WriteLine("任务开始异步等待...");await _eventSlim.WaitHandle.AsTask(); // 转换为TaskConsole.WriteLine("任务收到信号!");});await Task.Delay(1000);_eventSlim.Set();await waitingTask;_eventSlim.Dispose(); // 记得释放资源}private async Task WaitForEventAsync(int taskId){Console.WriteLine($"任务 {taskId} 等待事件...");// 使用异步等待(通过Task.Run包装)await Task.Run(() => _eventSlim.Wait());Console.WriteLine($"任务 {taskId} 收到事件信号!");// 模拟一些工作await Task.Delay(500);Console.WriteLine($"任务 {taskId} 完成工作");}
}
5、事件同步的内部等待机制
5.1 内核切换的代价
class EventPerformance
{// 事件等待的完整代价链:public void ShowWaitCost(){// 当调用 WaitOne() 时:// 1. 用户态检查状态(快速路径)// 2. 如果需要等待,切换到内核态(约1000-2000 CPU周期)// 3. 线程状态保存(寄存器、栈指针等)// 4. 加入内核等待队列// 5. 线程调度器选择新线程// 6. 新线程的上下文恢复// 当调用 Set() 时:// 1. 如果有等待线程,唤醒它们// 2. 触发线程调度// 3. 被唤醒线程的上下文恢复}
}
5.2 轻量级版本的优化
ManualResetEventSlim 通过自旋等待减少内核切换:
class ManualResetEventSlimInternal
{private int _state;private readonly object _lock = new object();public void Wait(){// 第一阶段:短暂自旋等待for (int i = 0; i < 100; i++){if (_state == 1) return; // 快速成功Thread.SpinWait(100);}// 第二阶段:进入真正的等待lock (_lock){while (_state == 0){Monitor.Wait(_lock);}}}public void Set(){// 使用内存屏障确保状态可见性Interlocked.Exchange(ref _state, 1);lock (_lock){Monitor.PulseAll(_lock); // 唤醒所有等待线程}}
}
6、最佳实践和注意事项
6.1 正确模式
class CorrectUsage
{private ManualResetEvent _completionEvent = new ManualResetEvent(false);public void GoodPattern1(){// 总是考虑使用 using 语句或手动 Disposeusing (var localEvent = new ManualResetEvent(false)){// 使用局部事件}}public void GoodPattern2(){// 使用超时避免永久阻塞if (_completionEvent.WaitOne(5000)){// 成功收到信号}else{// 超时处理HandleTimeout();}}public void GoodPattern3(){// 重置事件状态以便重新使用_completionEvent.Reset();// 启动工作StartWork();// 等待完成_completionEvent.WaitOne();}private void HandleTimeout(){Console.WriteLine("操作超时!");}private void StartWork(){ThreadPool.QueueUserWorkItem(_ => {// 执行工作Thread.Sleep(2000);_completionEvent.Set();});}
}
6.2 常见陷阱
class CommonMistakes
{// 错误1:忘记设置事件导致死锁private ManualResetEvent _deadlockEvent = new ManualResetEvent(false);public void Mistake1_Deadlock(){Thread worker = new Thread(() => {// 长时间工作...Thread.Sleep(5000);// 忘记调用 _deadlockEvent.Set();});worker.Start();_deadlockEvent.WaitOne(); // 永久阻塞!}// 错误2:误用 AutoResetEventprivate AutoResetEvent _autoEvent = new AutoResetEvent(false);public void Mistake2_AutoResetConfusion(){// 启动多个等待线程for (int i = 0; i < 3; i++){Thread t = new Thread(() => {_autoEvent.WaitOne();Console.WriteLine("线程被唤醒");});t.Start();}// 只设置一次,但期望唤醒所有线程_autoEvent.Set(); // 错误!只会唤醒一个线程}// 错误3:资源泄漏public void Mistake3_ResourceLeak(){for (int i = 0; i < 1000; i++){var event = new ManualResetEvent(false); // 创建但不Dispose// 应该使用 using 语句或手动调用 Dispose()}}// 错误4:竞争条件private ManualResetEvent _raceEvent = new ManualResetEvent(false);private bool _dataReady = false;public void Mistake4_RaceCondition(){Thread producer = new Thread(() => {_dataReady = true;_raceEvent.Set(); // 可能重排,导致消费者看到 _dataReady 为 false});Thread consumer = new Thread(() => {_raceEvent.WaitOne();if (!_dataReady) // 可能为 false!{Console.WriteLine("竞争条件发生!");}});consumer.Start();producer.Start();}
}
7、性能考虑
7.1 事件类型选择指南
class EventSelectionGuide
{// 选择指南:public void ChooseEventType(){// 使用 ManualResetEvent 当:// - 需要一次性通知多个线程// - 信号状态需要保持一段时间// - 线程需要反复检查信号状态// 使用 AutoResetEvent 当:// - 每次信号只应该唤醒一个线程// - 实现类似信号量的行为// - 生产者-消费者模式的精确控制// 使用 ManualResetEventSlim 当:// - 等待时间通常很短// - 需要更好的性能// - 不需要跨进程同步}
}
7.2 性能对比
class EventPerformanceComparison
{public void ComparePerformance(){int iterations = 10000;// 测试 ManualResetEventvar manualEvent = new ManualResetEvent(false);TestEvent("ManualResetEvent", manualEvent, iterations);manualEvent.Dispose();// 测试 ManualResetEventSlimvar slimEvent = new ManualResetEventSlim(false);TestEventSlim("ManualResetEventSlim", slimEvent, iterations);slimEvent.Dispose();// 测试 AutoResetEventvar autoEvent = new AutoResetEvent(false);TestAutoEvent("AutoResetEvent", autoEvent, iterations);autoEvent.Dispose();}private void TestEvent(string name, ManualResetEvent eventObj, int iterations){var stopwatch = Stopwatch.StartNew();Thread setter = new Thread(() => {for (int i = 0; i < iterations; i++){eventObj.Set();eventObj.Reset();}});Thread waiter = new Thread(() => {for (int i = 0; i < iterations; i++){eventObj.WaitOne();}});waiter.Start();setter.Start();waiter.Join();setter.Join();Console.WriteLine($"{name}: {stopwatch.ElapsedMilliseconds}ms");}private void TestEventSlim(string name, ManualResetEventSlim eventObj, int iterations){var stopwatch = Stopwatch.StartNew();Thread setter = new Thread(() => {for (int i = 0; i < iterations; i++){eventObj.Set();eventObj.Reset();}});Thread waiter = new Thread(() => {for (int i = 0; i < iterations; i++){eventObj.Wait();}});waiter.Start();setter.Start();waiter.Join();setter.Join();Console.WriteLine($"{name}: {stopwatch.ElapsedMilliseconds}ms");}private void TestAutoEvent(string name, AutoResetEvent eventObj, int iterations){var stopwatch = Stopwatch.StartNew();Thread setter = new Thread(() => {for (int i = 0; i < iterations; i++){eventObj.Set();}});Thread waiter = new Thread(() => {for (int i = 0; i < iterations; i++){eventObj.WaitOne();}});waiter.Start();setter.Start();waiter.Join();setter.Join();Console.WriteLine($"{name}: {stopwatch.ElapsedMilliseconds}ms");}
}
8、总结
事件同步的本质:
-
是基于内核对象的信号通知机制
-
通过布尔状态标志控制线程的执行流
-
使用等待队列管理多个等待线程
-
支持跨进程同步(命名事件)
核心区别:
-
ManualResetEvent:手动重置,设置后保持信号状态,唤醒所有等待线程
-
AutoResetEvent:自动重置,设置后只唤醒一个线程,然后自动重置
-
ManualResetEventSlim:轻量级版本,性能更好但不支持跨进程
适用场景:
-
线程启动同步:确保工作线程准备好后再开始
-
阶段协调:多阶段任务的同步点
-
资源可用性通知:生产者-消费者模式
-
复杂工作流:多个线程间的依赖关系
使用要点:
-
进程内同步优先使用 ManualResetEventSlim
-
跨进程同步使用 ManualResetEvent/AutoResetEvent
-
总是使用 try-finally 或 using 确保资源释放
-
合理设置超时避免死锁
-
注意事件的重置时机
