多线程六脉神剑第五剑:原子操作 (Interlocked)
文章目录
- 1、举栗子
- 2、原子操作的核心本质
- 2.1 CPU 指令级的保证
- 2.2 内存屏障的作用
- 3、底层原理深入解析、
- 3.1 比较并交换(CAS)的核心机制
- 3.2 CPU 缓存一致性协议
- 4、Interlocked 类的完整使用
- 4.1 基础原子操作
- 4.2 无锁计数器实现
- 4.3 无锁栈实现
- 4.4 无锁对象池
- 4.5 线程安全的标志位管理
- 5、高级模式和技巧
- 5.1 无锁队列实现
- 5.2 性能计数器
- 6、最佳实践和注意事项
- 6.1 正确模式
- 6.2 常见陷阱
- 7、性能对比
- 8、总结
1、举栗子
场景:超市的收银台
-
非原子操作:像传统的收银方式,顾客把钱给收银员,收银员数钱,找零,整个过程可能被打断。
-
原子操作:像自助收银机,投入钱币 → 自动识别 → 完成交易,整个过程不可分割。
原子操作的本质:一个操作要么完全执行,要么完全不执行,不会出现执行到一半的状态。
2、原子操作的核心本质
2.1 CPU 指令级的保证
原子操作的底层是 CPU 提供的特殊指令:
// 概念性展示 - 原子操作在硬件层面的实现
class CpuAtomicInstructions
{// 1. 原子读-修改-写指令bool Atomic_CompareAndSwap(ref int location, int expected, int newValue){// 这个操作在CPU级别是原子的:// 如果 location == expected,则 location = newValue 并返回 true// 否则返回 false// 整个过程不会被线程调度打断}// 2. 原子加法指令int Atomic_Add(ref int location, int value){// 原子性地执行:location = location + value// 返回修改前的值}// 3. 原子交换指令 int Atomic_Exchange(ref int location, int value){// 原子性地执行:oldValue = location; location = value;// 返回修改前的值}
}
2.2 内存屏障的作用
原子操作还隐含内存屏障(Memory Barrier),确保内存访问的顺序性:
class MemoryBarrierExample
{private int _data;private bool _initialized = false;// 错误示例:没有内存屏障void ThreadA(){_data = 42;_initialized = true; // 编译器或CPU可能重排指令,导致这个先执行}void ThreadB(){if (_initialized){Console.WriteLine(_data); // 可能看到42,也可能看到0!}}// 正确示例:使用原子操作(隐含内存屏障)void ThreadA_Fixed(){_data = 42;Interlocked.Exchange(ref _initialized, true); // 保证指令不会重排}
}
3、底层原理深入解析、
3.1 比较并交换(CAS)的核心机制
CAS 是原子操作的基石:
// 详细展示 CAS 的工作原理
class CompareExchangeMechanism
{public static int CompareExchange(ref int location, int value, int comparand){// 第一步:读取当前值int current = location;// 第二步:比较当前值与期望值if (current == comparand){// 第三步:如果相等,原子性地更新为新值// 这个"比较+更新"在CPU层面是一个不可分割的指令location = value;}// 第四步:返回原始值return current;}
}// 实际使用模式
class CASPattern
{private int _counter = 0;public bool TryIncrement(){int original, newValue;do{original = _counter; // 读取当前值newValue = original + 1; // 计算新值// 尝试原子性地更新:如果_counter还是original,就设置为newValue// 如果返回值和original相等,说明更新成功// 如果不相等,说明其他线程修改了_counter,需要重试} while (Interlocked.CompareExchange(ref _counter, newValue, original) != original);return true;}
}
3.2 CPU 缓存一致性协议
原子操作依赖于现代 CPU 的 MESI 协议:
class CacheCoherence
{// 当多个CPU核心访问同一内存地址时:void AtomicOperationOnMultiCore(){// CPU0 执行:Interlocked.Increment(ref sharedVar)// CPU1 也执行:Interlocked.Increment(ref sharedVar)// MESI协议确保:// 1. 当一个核心要修改数据时,它先获取该缓存行的独占权// 2. 其他核心的对应缓存行被标记为"无效"// 3. 修改完成后,其他核心需要重新从内存或修改者那里获取新值}
}
4、Interlocked 类的完整使用
4.1 基础原子操作
class BasicInterlockedOperations
{private int _counter = 0;private long _bigCounter = 0;public void DemonstrateBasicOperations(){// 1. 递增操作int afterIncrement = Interlocked.Increment(ref _counter);Console.WriteLine($"递增后: {afterIncrement}"); // 返回递增后的值// 2. 递减操作 int afterDecrement = Interlocked.Decrement(ref _counter);Console.WriteLine($"递减后: {afterDecrement}");// 3. 加法操作int afterAdd = Interlocked.Add(ref _counter, 5);Console.WriteLine($"加5后: {afterAdd}");// 4. 交换操作int oldValue = Interlocked.Exchange(ref _counter, 100);Console.WriteLine($"交换前: {oldValue}, 交换后: {_counter}");// 5. 比较交换(CAS)int expected = 100;int desired = 200;int original = Interlocked.CompareExchange(ref _counter, desired, expected);if (original == expected){Console.WriteLine("CAS成功");}else{Console.WriteLine($"CAS失败,当前值: {original}");}// 6. 64位操作(针对long类型)long afterIncrement64 = Interlocked.Increment(ref _bigCounter);long afterAdd64 = Interlocked.Add(ref _bigCounter, 1000L);// 7. 读取操作(确保读取最新值)long current = Interlocked.Read(ref _bigCounter);// 8. 内存屏障Interlocked.MemoryBarrier(); // 确保屏障前的操作不会重排到屏障后}
}
4.2 无锁计数器实现
class LockFreeCounter
{private int _value = 0;private long _totalOperations = 0;// 简单的递增计数器public void Increment(){Interlocked.Increment(ref _value);Interlocked.Increment(ref _totalOperations);}// 带阈值的递增public bool IncrementWithThreshold(int threshold){int original, newValue;do{original = _value;if (original >= threshold)return false; // 已达到阈值newValue = original + 1;} while (Interlocked.CompareExchange(ref _value, newValue, original) != original);Interlocked.Increment(ref _totalOperations);return true;}// 安全的获取值(防止指令重排)public int GetValue(){// 对于int,直接读取就是原子的,但为了确保看到最新值,可以使用:return Interlocked.CompareExchange(ref _value, 0, 0); // 技巧:比较交换相同的值// 或者使用 Volatile.Read}// 重置计数器public int Reset(){return Interlocked.Exchange(ref _value, 0);}public long TotalOperations => Volatile.Read(ref _totalOperations);
}// 使用示例
class CounterExample
{static void Main(){var counter = new LockFreeCounter();// 启动多个线程并发递增var threads = new List<Thread>();for (int i = 0; i < 10; i++){threads.Add(new Thread(() => {for (int j = 0; j < 10000; j++){counter.Increment();}}));}threads.ForEach(t => t.Start());threads.ForEach(t => t.Join());Console.WriteLine($"最终计数: {counter.GetValue()}"); // 应该是 100000Console.WriteLine($"总操作数: {counter.TotalOperations}"); // 应该是 100000}
}
4.3 无锁栈实现
public class LockFreeStack<T>
{private class Node{public readonly T Value;public Node Next;public Node(T value){Value = value;}}private Node _head;public void Push(T item){var newNode = new Node(item);Node oldHead;do{oldHead = _head;newNode.Next = oldHead;} while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);Console.WriteLine($"压入: {item}");}public bool TryPop(out T result){Node oldHead;do{oldHead = _head;if (oldHead == null){result = default(T);return false;}} while (Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead) != oldHead);result = oldHead.Value;Console.WriteLine($"弹出: {result}");return true;}public bool IsEmpty => _head == null;
}// 使用示例
class StackExample
{static void Main(){var stack = new LockFreeStack<int>();// 生产者线程var producer = new Thread(() => {for (int i = 0; i < 10; i++){stack.Push(i);Thread.Sleep(100);}});// 消费者线程var consumer = new Thread(() => {for (int i = 0; i < 10; i++){if (stack.TryPop(out int value)){Console.WriteLine($"消费: {value}");}Thread.Sleep(150);}});producer.Start();consumer.Start();producer.Join();consumer.Join();}
}
4.4 无锁对象池
public class LockFreeObjectPool<T> where T : class, new()
{private class PoolNode{public readonly T Value;public PoolNode Next;public PoolNode(T value){Value = value;}}private PoolNode _head;private int _count = 0;private readonly int _maxSize;public LockFreeObjectPool(int maxSize){_maxSize = maxSize;// 预创建对象for (int i = 0; i < Math.Min(10, maxSize); i++){Return(new T());}}public T Get(){PoolNode oldHead;do{oldHead = _head;if (oldHead == null){// 池为空,创建新对象return new T();}} while (Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead) != oldHead);Interlocked.Decrement(ref _count);Console.WriteLine($"从池中获取对象,剩余: {_count}");return oldHead.Value;}public void Return(T item){if (item == null) throw new ArgumentNullException(nameof(item));if (Volatile.Read(ref _count) >= _maxSize){// 池已满,不回收Console.WriteLine("池已满,不回收对象");return;}var newNode = new PoolNode(item);PoolNode oldHead;do{oldHead = _head;newNode.Next = oldHead;} while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);Interlocked.Increment(ref _count);Console.WriteLine($"对象返回到池,总数: {_count}");}public int Count => Volatile.Read(ref _count);
}// 使用示例
class ObjectPoolExample
{static void Main(){var pool = new LockFreeObjectPool<StringBuilder>(5);Parallel.For(0, 20, i => {var sb = pool.Get();try{sb.Clear();sb.Append($"线程 {Thread.CurrentThread.ManagedThreadId} 正在工作");Console.WriteLine(sb.ToString());Thread.Sleep(100);}finally{pool.Return(sb);}});Console.WriteLine($"最终池中对象数: {pool.Count}");}
}
4.5 线程安全的标志位管理
class AtomicFlags
{private int _flags = 0;// 设置标志位public void SetFlag(int flag){int oldFlags, newFlags;do{oldFlags = _flags;newFlags = oldFlags | flag; // 设置指定位} while (Interlocked.CompareExchange(ref _flags, newFlags, oldFlags) != oldFlags);}// 清除标志位public void ClearFlag(int flag){int oldFlags, newFlags;do{oldFlags = _flags;newFlags = oldFlags & ~flag; // 清除指定位} while (Interlocked.CompareExchange(ref _flags, newFlags, oldFlags) != oldFlags);}// 检查标志位public bool HasFlag(int flag){return (Volatile.Read(ref _flags) & flag) != 0;}// 原子性地检查并设置标志位public bool TestAndSetFlag(int flag){int oldFlags, newFlags;do{oldFlags = _flags;if ((oldFlags & flag) != 0){return false; // 标志位已经设置}newFlags = oldFlags | flag;} while (Interlocked.CompareExchange(ref _flags, newFlags, oldFlags) != oldFlags);return true;}
}// 使用示例
class FlagsExample
{static void Main(){var flags = new AtomicFlags();const int FLAG_A = 1 << 0; // 0001const int FLAG_B = 1 << 1; // 0010const int FLAG_C = 1 << 2; // 0100// 多个线程并发设置标志位Parallel.Invoke(() => flags.SetFlag(FLAG_A),() => flags.SetFlag(FLAG_B),() => Console.WriteLine($"测试设置FLAG_C: {flags.TestAndSetFlag(FLAG_C)}"));Console.WriteLine($"FLAG_A: {flags.HasFlag(FLAG_A)}");Console.WriteLine($"FLAG_B: {flags.HasFlag(FLAG_B)}");Console.WriteLine($"FLAG_C: {flags.HasFlag(FLAG_C)}");}
}
5、高级模式和技巧
5.1 无锁队列实现
public class LockFreeQueue<T>
{private class Node{public readonly T Value;public Node Next;public Node(T value){Value = value;}}private Node _head;private Node _tail;public LockFreeQueue(){// 创建哨兵节点var dummy = new Node(default(T));_head = dummy;_tail = dummy;}public void Enqueue(T item){var newNode = new Node(item);while (true){Node oldTail = _tail;Node oldNext = oldTail.Next;// 检查tail是否被其他线程修改if (oldTail == _tail){if (oldNext == null){// 尝试链接新节点if (Interlocked.CompareExchange(ref oldTail.Next, newNode, null) == null){// 成功链接,尝试移动tailInterlocked.CompareExchange(ref _tail, newNode, oldTail);return;}}else{// 帮助其他线程完成tail更新Interlocked.CompareExchange(ref _tail, oldNext, oldTail);}}}}public bool TryDequeue(out T result){while (true){Node oldHead = _head;Node oldTail = _tail;Node oldFirst = oldHead.Next;if (oldHead == _head) // 确保head没有被修改{if (oldHead == oldTail){if (oldFirst == null){result = default(T);return false; // 队列为空}// 帮助移动tailInterlocked.CompareExchange(ref _tail, oldFirst, oldTail);}else{if (oldFirst != null){result = oldFirst.Value;if (Interlocked.CompareExchange(ref _head, oldFirst, oldHead) == oldHead){return true;}}}}}}
}
5.2 性能计数器
public class HighPerformanceCounter
{private long _count = 0;private long _total = 0;private long _min = long.MaxValue;private long _max = long.MinValue;public void AddSample(long value){Interlocked.Increment(ref _count);Interlocked.Add(ref _total, value);// 更新最小值long currentMin;do{currentMin = Volatile.Read(ref _min);if (value >= currentMin) break;} while (Interlocked.CompareExchange(ref _min, value, currentMin) != currentMin);// 更新最大值long currentMax;do{currentMax = Volatile.Read(ref _max);if (value <= currentMax) break;} while (Interlocked.CompareExchange(ref _max, value, currentMax) != currentMax);}public Statistics GetStatistics(){long count = Volatile.Read(ref _count);long total = Volatile.Read(ref _total);long min = Volatile.Read(ref _min);long max = Volatile.Read(ref _max);return new Statistics{Count = count,Average = count > 0 ? (double)total / count : 0,Min = min != long.MaxValue ? min : 0,Max = max != long.MinValue ? max : 0};}public class Statistics{public long Count { get; set; }public double Average { get; set; }public long Min { get; set; }public long Max { get; set; }}
}
6、最佳实践和注意事项
6.1 正确模式
class BestPractices
{// 模式1:简单的原子操作private int _simpleCounter = 0;public void SimpleIncrement(){Interlocked.Increment(ref _simpleCounter);}// 模式2:CAS循环用于复杂操作private int _complexState = 0;public bool TryUpdateState(int newValue){int original, newState;do{original = _complexState;newState = ComputeNewState(original, newValue);// 检查是否可以提前退出if (newState == original)return false;} while (Interlocked.CompareExchange(ref _complexState, newState, original) != original);return true;}private int ComputeNewState(int current, int input){// 复杂的状态计算逻辑return current + input * 2;}// 模式3:使用内存屏障确保可见性private int _data;private bool _ready = false;public void Initialize(){_data = 42;Interlocked.Exchange(ref _ready, true); // 隐含内存屏障}public int GetData(){if (Volatile.Read(ref _ready)){return _data;}throw new InvalidOperationException("Not ready");}
}
6.2 常见陷阱
class CommonMistakes
{// 错误1:误以为原子操作适用于所有场景private struct Point { public int X, Y; }private Point _point;public void Mistake1(){// 不能这样用!Interlocked只能用于基本类型// Interlocked.Exchange(ref _point, new Point()); // 编译错误}// 错误2:ABA问题private int _value = 0;public void Mistake2_ABAProblem(){int original = _value;// 假设其他线程在这里执行:// _value = 1; 然后 _value = 0;// CAS仍然会成功,但这可能不是我们期望的Interlocked.CompareExchange(ref _value, 2, original);}// 错误3:错误的内存访问模式private int _a, _b;public void Mistake3_NoMemoryBarrier(){// 没有内存屏障,编译器或CPU可能重排指令_a = 1;_b = 2; // 可能在实际执行时先于_a=1执行}// 错误4:在64位系统上误用32位操作private long _longValue;public void Mistake4_32BitOn64Bit(){// 在32位系统上,long的读写不是原子的// 需要使用Interlocked.Read和Interlocked.Exchange等long temp = _longValue; // 非原子操作!}
}
7、性能对比
class PerformanceComparison
{private int _counter = 0;private readonly object _lockObject = new object();public void TestInterlocked(int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, iterations, i => {Interlocked.Increment(ref _counter);});Console.WriteLine($"Interlocked: {stopwatch.ElapsedMilliseconds}ms");}public void TestLock(int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, iterations, i => {lock (_lockObject){_counter++;}});Console.WriteLine($"Lock: {stopwatch.ElapsedMilliseconds}ms");}public void TestNoSync(int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, iterations, i => {_counter++; // 线程不安全,结果不正确});Console.WriteLine($"No Sync: {stopwatch.ElapsedMilliseconds}ms (结果不可靠)");}
}// 测试结果通常:
// Interlocked: 最快
// Lock: 较慢
// No Sync: 最快但结果错误
8、总结
原子操作的本质:
-
是硬件级别的不可分割操作
-
基于 CPU 的原子指令(如 CAS、XCHG)
-
隐含内存屏障,保证内存可见性
-
通过缓存一致性协议确保多核一致性
核心优势:
-
无锁:避免线程阻塞和上下文切换
-
高性能:比传统锁快一个数量级
-
可组合:可以通过 CAS 构建复杂的无锁数据结构
-
内存安全:隐含内存屏障,防止指令重排
适用场景:
-
简单计数器:递增、递减、加法
-
标志位管理:布尔状态、位标志
-
无锁数据结构:栈、队列、对象池
-
性能敏感的同步:高并发计数器、统计信息
使用要点:
-
只适用于基本数值类型(int、long 等)
-
复杂操作使用 CAS 循环模式
-
注意 ABA 问题,必要时使用版本号
-
理解内存屏障的影响
