当前位置: 首页 > news >正文

多线程六脉神剑第五剑:原子操作 (Interlocked)

文章目录

  • 1、举栗子
  • 2、原子操作的核心本质
    • 2.1 CPU 指令级的保证
    • 2.2 内存屏障的作用
  • 3、底层原理深入解析、
    • 3.1 比较并交换(CAS)的核心机制
    • 3.2 CPU 缓存一致性协议
  • 4、Interlocked 类的完整使用
    • 4.1 基础原子操作
    • 4.2 无锁计数器实现
    • 4.3 无锁栈实现
    • 4.4 无锁对象池
    • 4.5 线程安全的标志位管理
  • 5、高级模式和技巧
    • 5.1 无锁队列实现
    • 5.2 性能计数器
  • 6、最佳实践和注意事项
    • 6.1 正确模式
    • 6.2 常见陷阱
  • 7、性能对比
  • 8、总结

1、举栗子

场景:超市的收银台

  • 非原子操作:像传统的收银方式,顾客把钱给收银员,收银员数钱,找零,整个过程可能被打断。

  • 原子操作:像自助收银机,投入钱币 → 自动识别 → 完成交易,整个过程不可分割。

原子操作的本质:一个操作要么完全执行,要么完全不执行,不会出现执行到一半的状态。

2、原子操作的核心本质

2.1 CPU 指令级的保证

原子操作的底层是 CPU 提供的特殊指令:

// 概念性展示 - 原子操作在硬件层面的实现
class CpuAtomicInstructions
{// 1. 原子读-修改-写指令bool Atomic_CompareAndSwap(ref int location, int expected, int newValue){// 这个操作在CPU级别是原子的:// 如果 location == expected,则 location = newValue 并返回 true// 否则返回 false// 整个过程不会被线程调度打断}// 2. 原子加法指令int Atomic_Add(ref int location, int value){// 原子性地执行:location = location + value// 返回修改前的值}// 3. 原子交换指令  int Atomic_Exchange(ref int location, int value){// 原子性地执行:oldValue = location; location = value;// 返回修改前的值}
}

2.2 内存屏障的作用

原子操作还隐含内存屏障(Memory Barrier),确保内存访问的顺序性:

class MemoryBarrierExample
{private int _data;private bool _initialized = false;// 错误示例:没有内存屏障void ThreadA(){_data = 42;_initialized = true; // 编译器或CPU可能重排指令,导致这个先执行}void ThreadB(){if (_initialized){Console.WriteLine(_data); // 可能看到42,也可能看到0!}}// 正确示例:使用原子操作(隐含内存屏障)void ThreadA_Fixed(){_data = 42;Interlocked.Exchange(ref _initialized, true); // 保证指令不会重排}
}

3、底层原理深入解析、

3.1 比较并交换(CAS)的核心机制

CAS 是原子操作的基石:

// 详细展示 CAS 的工作原理
class CompareExchangeMechanism
{public static int CompareExchange(ref int location, int value, int comparand){// 第一步:读取当前值int current = location;// 第二步:比较当前值与期望值if (current == comparand){// 第三步:如果相等,原子性地更新为新值// 这个"比较+更新"在CPU层面是一个不可分割的指令location = value;}// 第四步:返回原始值return current;}
}// 实际使用模式
class CASPattern
{private int _counter = 0;public bool TryIncrement(){int original, newValue;do{original = _counter;                    // 读取当前值newValue = original + 1;                // 计算新值// 尝试原子性地更新:如果_counter还是original,就设置为newValue// 如果返回值和original相等,说明更新成功// 如果不相等,说明其他线程修改了_counter,需要重试} while (Interlocked.CompareExchange(ref _counter, newValue, original) != original);return true;}
}

3.2 CPU 缓存一致性协议

原子操作依赖于现代 CPU 的 MESI 协议:

class CacheCoherence
{// 当多个CPU核心访问同一内存地址时:void AtomicOperationOnMultiCore(){// CPU0 执行:Interlocked.Increment(ref sharedVar)// CPU1 也执行:Interlocked.Increment(ref sharedVar)// MESI协议确保:// 1. 当一个核心要修改数据时,它先获取该缓存行的独占权// 2. 其他核心的对应缓存行被标记为"无效"// 3. 修改完成后,其他核心需要重新从内存或修改者那里获取新值}
}

4、Interlocked 类的完整使用

4.1 基础原子操作

class BasicInterlockedOperations
{private int _counter = 0;private long _bigCounter = 0;public void DemonstrateBasicOperations(){// 1. 递增操作int afterIncrement = Interlocked.Increment(ref _counter);Console.WriteLine($"递增后: {afterIncrement}"); // 返回递增后的值// 2. 递减操作  int afterDecrement = Interlocked.Decrement(ref _counter);Console.WriteLine($"递减后: {afterDecrement}");// 3. 加法操作int afterAdd = Interlocked.Add(ref _counter, 5);Console.WriteLine($"加5后: {afterAdd}");// 4. 交换操作int oldValue = Interlocked.Exchange(ref _counter, 100);Console.WriteLine($"交换前: {oldValue}, 交换后: {_counter}");// 5. 比较交换(CAS)int expected = 100;int desired = 200;int original = Interlocked.CompareExchange(ref _counter, desired, expected);if (original == expected){Console.WriteLine("CAS成功");}else{Console.WriteLine($"CAS失败,当前值: {original}");}// 6. 64位操作(针对long类型)long afterIncrement64 = Interlocked.Increment(ref _bigCounter);long afterAdd64 = Interlocked.Add(ref _bigCounter, 1000L);// 7. 读取操作(确保读取最新值)long current = Interlocked.Read(ref _bigCounter);// 8. 内存屏障Interlocked.MemoryBarrier(); // 确保屏障前的操作不会重排到屏障后}
}

4.2 无锁计数器实现

class LockFreeCounter
{private int _value = 0;private long _totalOperations = 0;// 简单的递增计数器public void Increment(){Interlocked.Increment(ref _value);Interlocked.Increment(ref _totalOperations);}// 带阈值的递增public bool IncrementWithThreshold(int threshold){int original, newValue;do{original = _value;if (original >= threshold)return false; // 已达到阈值newValue = original + 1;} while (Interlocked.CompareExchange(ref _value, newValue, original) != original);Interlocked.Increment(ref _totalOperations);return true;}// 安全的获取值(防止指令重排)public int GetValue(){// 对于int,直接读取就是原子的,但为了确保看到最新值,可以使用:return Interlocked.CompareExchange(ref _value, 0, 0); // 技巧:比较交换相同的值// 或者使用 Volatile.Read}// 重置计数器public int Reset(){return Interlocked.Exchange(ref _value, 0);}public long TotalOperations => Volatile.Read(ref _totalOperations);
}// 使用示例
class CounterExample
{static void Main(){var counter = new LockFreeCounter();// 启动多个线程并发递增var threads = new List<Thread>();for (int i = 0; i < 10; i++){threads.Add(new Thread(() => {for (int j = 0; j < 10000; j++){counter.Increment();}}));}threads.ForEach(t => t.Start());threads.ForEach(t => t.Join());Console.WriteLine($"最终计数: {counter.GetValue()}"); // 应该是 100000Console.WriteLine($"总操作数: {counter.TotalOperations}"); // 应该是 100000}
}

4.3 无锁栈实现

public class LockFreeStack<T>
{private class Node{public readonly T Value;public Node Next;public Node(T value){Value = value;}}private Node _head;public void Push(T item){var newNode = new Node(item);Node oldHead;do{oldHead = _head;newNode.Next = oldHead;} while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);Console.WriteLine($"压入: {item}");}public bool TryPop(out T result){Node oldHead;do{oldHead = _head;if (oldHead == null){result = default(T);return false;}} while (Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead) != oldHead);result = oldHead.Value;Console.WriteLine($"弹出: {result}");return true;}public bool IsEmpty => _head == null;
}// 使用示例
class StackExample
{static void Main(){var stack = new LockFreeStack<int>();// 生产者线程var producer = new Thread(() => {for (int i = 0; i < 10; i++){stack.Push(i);Thread.Sleep(100);}});// 消费者线程var consumer = new Thread(() => {for (int i = 0; i < 10; i++){if (stack.TryPop(out int value)){Console.WriteLine($"消费: {value}");}Thread.Sleep(150);}});producer.Start();consumer.Start();producer.Join();consumer.Join();}
}

4.4 无锁对象池

public class LockFreeObjectPool<T> where T : class, new()
{private class PoolNode{public readonly T Value;public PoolNode Next;public PoolNode(T value){Value = value;}}private PoolNode _head;private int _count = 0;private readonly int _maxSize;public LockFreeObjectPool(int maxSize){_maxSize = maxSize;// 预创建对象for (int i = 0; i < Math.Min(10, maxSize); i++){Return(new T());}}public T Get(){PoolNode oldHead;do{oldHead = _head;if (oldHead == null){// 池为空,创建新对象return new T();}} while (Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead) != oldHead);Interlocked.Decrement(ref _count);Console.WriteLine($"从池中获取对象,剩余: {_count}");return oldHead.Value;}public void Return(T item){if (item == null) throw new ArgumentNullException(nameof(item));if (Volatile.Read(ref _count) >= _maxSize){// 池已满,不回收Console.WriteLine("池已满,不回收对象");return;}var newNode = new PoolNode(item);PoolNode oldHead;do{oldHead = _head;newNode.Next = oldHead;} while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);Interlocked.Increment(ref _count);Console.WriteLine($"对象返回到池,总数: {_count}");}public int Count => Volatile.Read(ref _count);
}// 使用示例
class ObjectPoolExample
{static void Main(){var pool = new LockFreeObjectPool<StringBuilder>(5);Parallel.For(0, 20, i => {var sb = pool.Get();try{sb.Clear();sb.Append($"线程 {Thread.CurrentThread.ManagedThreadId} 正在工作");Console.WriteLine(sb.ToString());Thread.Sleep(100);}finally{pool.Return(sb);}});Console.WriteLine($"最终池中对象数: {pool.Count}");}
}

4.5 线程安全的标志位管理

class AtomicFlags
{private int _flags = 0;// 设置标志位public void SetFlag(int flag){int oldFlags, newFlags;do{oldFlags = _flags;newFlags = oldFlags | flag; // 设置指定位} while (Interlocked.CompareExchange(ref _flags, newFlags, oldFlags) != oldFlags);}// 清除标志位public void ClearFlag(int flag){int oldFlags, newFlags;do{oldFlags = _flags;newFlags = oldFlags & ~flag; // 清除指定位} while (Interlocked.CompareExchange(ref _flags, newFlags, oldFlags) != oldFlags);}// 检查标志位public bool HasFlag(int flag){return (Volatile.Read(ref _flags) & flag) != 0;}// 原子性地检查并设置标志位public bool TestAndSetFlag(int flag){int oldFlags, newFlags;do{oldFlags = _flags;if ((oldFlags & flag) != 0){return false; // 标志位已经设置}newFlags = oldFlags | flag;} while (Interlocked.CompareExchange(ref _flags, newFlags, oldFlags) != oldFlags);return true;}
}// 使用示例
class FlagsExample
{static void Main(){var flags = new AtomicFlags();const int FLAG_A = 1 << 0; // 0001const int FLAG_B = 1 << 1; // 0010const int FLAG_C = 1 << 2; // 0100// 多个线程并发设置标志位Parallel.Invoke(() => flags.SetFlag(FLAG_A),() => flags.SetFlag(FLAG_B),() => Console.WriteLine($"测试设置FLAG_C: {flags.TestAndSetFlag(FLAG_C)}"));Console.WriteLine($"FLAG_A: {flags.HasFlag(FLAG_A)}");Console.WriteLine($"FLAG_B: {flags.HasFlag(FLAG_B)}");Console.WriteLine($"FLAG_C: {flags.HasFlag(FLAG_C)}");}
}

5、高级模式和技巧

5.1 无锁队列实现

public class LockFreeQueue<T>
{private class Node{public readonly T Value;public Node Next;public Node(T value){Value = value;}}private Node _head;private Node _tail;public LockFreeQueue(){// 创建哨兵节点var dummy = new Node(default(T));_head = dummy;_tail = dummy;}public void Enqueue(T item){var newNode = new Node(item);while (true){Node oldTail = _tail;Node oldNext = oldTail.Next;// 检查tail是否被其他线程修改if (oldTail == _tail){if (oldNext == null){// 尝试链接新节点if (Interlocked.CompareExchange(ref oldTail.Next, newNode, null) == null){// 成功链接,尝试移动tailInterlocked.CompareExchange(ref _tail, newNode, oldTail);return;}}else{// 帮助其他线程完成tail更新Interlocked.CompareExchange(ref _tail, oldNext, oldTail);}}}}public bool TryDequeue(out T result){while (true){Node oldHead = _head;Node oldTail = _tail;Node oldFirst = oldHead.Next;if (oldHead == _head) // 确保head没有被修改{if (oldHead == oldTail){if (oldFirst == null){result = default(T);return false; // 队列为空}// 帮助移动tailInterlocked.CompareExchange(ref _tail, oldFirst, oldTail);}else{if (oldFirst != null){result = oldFirst.Value;if (Interlocked.CompareExchange(ref _head, oldFirst, oldHead) == oldHead){return true;}}}}}}
}

5.2 性能计数器

public class HighPerformanceCounter
{private long _count = 0;private long _total = 0;private long _min = long.MaxValue;private long _max = long.MinValue;public void AddSample(long value){Interlocked.Increment(ref _count);Interlocked.Add(ref _total, value);// 更新最小值long currentMin;do{currentMin = Volatile.Read(ref _min);if (value >= currentMin) break;} while (Interlocked.CompareExchange(ref _min, value, currentMin) != currentMin);// 更新最大值long currentMax;do{currentMax = Volatile.Read(ref _max);if (value <= currentMax) break;} while (Interlocked.CompareExchange(ref _max, value, currentMax) != currentMax);}public Statistics GetStatistics(){long count = Volatile.Read(ref _count);long total = Volatile.Read(ref _total);long min = Volatile.Read(ref _min);long max = Volatile.Read(ref _max);return new Statistics{Count = count,Average = count > 0 ? (double)total / count : 0,Min = min != long.MaxValue ? min : 0,Max = max != long.MinValue ? max : 0};}public class Statistics{public long Count { get; set; }public double Average { get; set; }public long Min { get; set; }public long Max { get; set; }}
}

6、最佳实践和注意事项

6.1 正确模式

class BestPractices
{// 模式1:简单的原子操作private int _simpleCounter = 0;public void SimpleIncrement(){Interlocked.Increment(ref _simpleCounter);}// 模式2:CAS循环用于复杂操作private int _complexState = 0;public bool TryUpdateState(int newValue){int original, newState;do{original = _complexState;newState = ComputeNewState(original, newValue);// 检查是否可以提前退出if (newState == original)return false;} while (Interlocked.CompareExchange(ref _complexState, newState, original) != original);return true;}private int ComputeNewState(int current, int input){// 复杂的状态计算逻辑return current + input * 2;}// 模式3:使用内存屏障确保可见性private int _data;private bool _ready = false;public void Initialize(){_data = 42;Interlocked.Exchange(ref _ready, true); // 隐含内存屏障}public int GetData(){if (Volatile.Read(ref _ready)){return _data;}throw new InvalidOperationException("Not ready");}
}

6.2 常见陷阱

class CommonMistakes
{// 错误1:误以为原子操作适用于所有场景private struct Point { public int X, Y; }private Point _point;public void Mistake1(){// 不能这样用!Interlocked只能用于基本类型// Interlocked.Exchange(ref _point, new Point()); // 编译错误}// 错误2:ABA问题private int _value = 0;public void Mistake2_ABAProblem(){int original = _value;// 假设其他线程在这里执行:// _value = 1; 然后 _value = 0;// CAS仍然会成功,但这可能不是我们期望的Interlocked.CompareExchange(ref _value, 2, original);}// 错误3:错误的内存访问模式private int _a, _b;public void Mistake3_NoMemoryBarrier(){// 没有内存屏障,编译器或CPU可能重排指令_a = 1;_b = 2; // 可能在实际执行时先于_a=1执行}// 错误4:在64位系统上误用32位操作private long _longValue;public void Mistake4_32BitOn64Bit(){// 在32位系统上,long的读写不是原子的// 需要使用Interlocked.Read和Interlocked.Exchange等long temp = _longValue; // 非原子操作!}
}

7、性能对比

class PerformanceComparison
{private int _counter = 0;private readonly object _lockObject = new object();public void TestInterlocked(int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, iterations, i => {Interlocked.Increment(ref _counter);});Console.WriteLine($"Interlocked: {stopwatch.ElapsedMilliseconds}ms");}public void TestLock(int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, iterations, i => {lock (_lockObject){_counter++;}});Console.WriteLine($"Lock: {stopwatch.ElapsedMilliseconds}ms");}public void TestNoSync(int iterations){var stopwatch = Stopwatch.StartNew();Parallel.For(0, iterations, i => {_counter++; // 线程不安全,结果不正确});Console.WriteLine($"No Sync: {stopwatch.ElapsedMilliseconds}ms (结果不可靠)");}
}// 测试结果通常:
// Interlocked: 最快
// Lock: 较慢
// No Sync: 最快但结果错误

8、总结

原子操作的本质:

  • 硬件级别的不可分割操作

  • 基于 CPU 的原子指令(如 CAS、XCHG)

  • 隐含内存屏障,保证内存可见性

  • 通过缓存一致性协议确保多核一致性

核心优势:

  1. 无锁:避免线程阻塞和上下文切换

  2. 高性能:比传统锁快一个数量级

  3. 可组合:可以通过 CAS 构建复杂的无锁数据结构

  4. 内存安全:隐含内存屏障,防止指令重排

适用场景:

  • 简单计数器:递增、递减、加法

  • 标志位管理:布尔状态、位标志

  • 无锁数据结构:栈、队列、对象池

  • 性能敏感的同步:高并发计数器、统计信息

使用要点:

  • 只适用于基本数值类型(int、long 等)

  • 复杂操作使用 CAS 循环模式

  • 注意 ABA 问题,必要时使用版本号

  • 理解内存屏障的影响

http://www.dtcms.com/a/528752.html

相关文章:

  • 在阿里云Debian12搭建WG
  • 深度掌握 Git 分支体系:从基础操作到高级策略与实践案例
  • 如何修改dns 快速使用境外网站西安seo公司哪家好
  • 域名对网站有什么影响爱站网长尾词挖掘工具
  • 解码Linux文件IO之开机动画原理与实现
  • R语言模型分析(一)(1)
  • 成都模板建站代理佛山手工外发加工网
  • 二维差分数组
  • 【Linux网络】定制协议
  • wordpress漫画站wordpress 外部调用
  • python 网站开发流程图微信营销软件破解版
  • HCL-MTC、HiTIN
  • 平方根求解-趋近法步长保守策略数学推导
  • JSP 文件上传
  • 基于深度生成模型的单细胞多时间点数据分析与药物发现
  • FreeRTOS信号量实战:停车场管理
  • 做网站一般不选用的图片格式万能浏览器手机版下载安装
  • Federated Learning-Empowered AI-Generated Content in Wireless Networks
  • 网站建设外包还是自己做乐清网站建设公司
  • 计算机网络自顶向下方法4——详解协议层次及其服务模型
  • 【开题答辩全过程】以 暴肌兔健康饮食推荐系统的设计与实现为例,包含答辩的问题和答案
  • 网站找不到首页网站开发分前台后台
  • 网站微信支付怎么做深圳品牌做网站公司哪家好
  • jEasyUI 创建异步树形菜单
  • fabric.js 中originX originY center设置问题
  • java开发手册与规范
  • 展示网站开发 大概多少钱wordpress+4.2.4中文
  • 深圳建设局官网站对网站建设需求
  • linux:查看某个文件下开启的进程占用的是哪个端口?
  • 【开题答辩过程】以《基于微信小程序的街道水电费缴纳系统》为例,不会开题答辩的可以进来看看