当前位置: 首页 > news >正文

JCTools Spmc 单生产者-多消费者的无锁并发有界队列

SpmcArrayQueue 是 JCTools 中为 单生产者-多消费者(Single-Producer-Multi-Consumer) 场景设计的有界队列。与 SPSC 模型相比,SPMC 的复杂性主要体现在消费者侧,因为多个消费者线程需要以线程安全的方式竞争消费同一个队列中的元素。

单生产者-单消费者数组队列 分析见:JCTools Spsc:单生产者-单消费者无锁队列

SpmcArrayQueue 的继承链同样是为了精确控制内存布局,但其侧重点与 SpscArrayQueue 有所不同,它需要处理多消费者对 consumerIndex 的争用。

  1. SpmcArrayQueueL1Pad & SpmcArrayQueueProducerIndexField:

    • 与 SPSC 类似,这部分定义了生产者索引 producerIndex,并用 L1Pad 将其与上游的“冷”字段(如 buffermask)隔离开。
    • 由于只有一个生产者,producerIndex 的更新逻辑相对简单,不需要 CAS 操作,使用 putOrderedLong 即可。
  2. SpmcArrayQueueL2Pad & SpmcArrayQueueConsumerIndexField:

    • 核心变化点SpmcArrayQueueConsumerIndexField 中不再有 soConsumerIndex 方法,取而代之的是 casConsumerIndex
       
      // ... existing code ...
      //$gen:ordered-fields
      abstract class SpmcArrayQueueConsumerIndexField<E> extends SpmcArrayQueueL2Pad<E>
      {protected final static long C_INDEX_OFFSET = fieldOffset(SpmcArrayQueueConsumerIndexField.class, "consumerIndex");private volatile long consumerIndex;// ... existing code ...@Overridepublic final long lvConsumerIndex(){return consumerIndex;}final boolean casConsumerIndex(long expect, long newValue){return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);}
      }
      // ... existing code ...
      
    • 原因:因为有多个消费者,它们必须通过 CAS(Compare-And-Swap) 操作来原子性地更新 consumerIndex,以确保只有一个消费者能成功获取并消费一个元素。L2Pad 在此的作用依然是隔离生产者和消费者的热点字段。
  3. SpmcArrayQueueMidPad & SpmcArrayQueueProducerIndexCacheField:

    • SPMC 特有的优化:这里引入了一个新的字段 producerIndexCache
      // ... existing code ...
      //$gen:ordered-fields
      abstract class SpmcArrayQueueProducerIndexCacheField<E> extends SpmcArrayQueueMidPad<E>
      {// This is separated from the consumerIndex which will be highly contended in the hope that this value spends most// of it's time in a cache line that is Shared(and rarely invalidated)private volatile long producerIndexCache;// ... existing code ...
      }
      // ... existing code ...
      
    • producerIndex 会被生产者频繁更新(每次 offer 都会更新)
    • producerIndexCache 只在消费者发现缓存过期时才更新,因此减少了对 producerIndex 的 volatile 读取,降低了缓存一致性流量的争用
    • MidPad 的作用就是将这个消费者侧的缓存producerIndexCache)与消费者侧的争用点consumerIndex)分离开,避免它们互相干扰。
  4. SpmcArrayQueueL3Pad: 最后的填充,隔离 producerIndexCache 和 SpmcArrayQueue 自身的字段。


offer(E e):简单而直接

由于只有一个生产者,offer 的逻辑比 SPSC 还要简单,因为它不需要“前瞻优化”(producerLimit)。生产者只需要检查目标槽位是否为空即可。

// ... existing code ...@Overridepublic boolean offer(final E e){if (null == e){throw new NullPointerException();}final E[] buffer = this.buffer;final long mask = this.mask;final long currProducerIndex = lvProducerIndex(); // 1. 获取当前生产者索引final long offset = calcCircularRefElementOffset(currProducerIndex, mask);// 2. 检查槽位是否被消费者释放if (null != lvRefElement(buffer, offset)){// 如果槽位不为空,说明队列满了long size = currProducerIndex - lvConsumerIndex();if (size > mask){return false;}else{// 等待消费者释放该槽位 (这会破坏无等待性)while (null != lvRefElement(buffer, offset)){// BURN}}}// 3. 放置元素并更新索引soRefElement(buffer, offset, e);soProducerIndex(currProducerIndex + 1); // 使用 store-ordered 更新return true;}
// ... existing code ...

SPMC 特性分析

  1. 单生产者权威:生产者是唯一能推进 producerIndex 的线程,所以它只需 lvProducerIndex() 读取自己的进度,然后用 soProducerIndex() 更新即可,无需 CAS。
  2. 依赖消费者:生产者通过 lvRefElement 检查目标槽位是否为 null 来判断队列是否已满。这个 null 是由消费者在消费后写入的。
  3. 潜在的自旋等待:代码中有一段 while 循环等待。这通常发生在消费者进度稍稍落后于生产者进度,但队列并未完全满的情况下。生产者会在此“自旋”等待消费者完成对该槽位的消费和清理。这是 SpmcArrayQueue 的一个关键特性,它牺牲了一定的无等待性(Wait-Free)来简化设计

poll():竞争与缓存的艺术

poll 方法是 SPMC 模型的核心,完美展现了多消费者如何通过 CAS 和本地缓存来协同工作。

// ... existing code ...@Overridepublic E poll(){long currentConsumerIndex;// 1. 读取本地的生产者进度缓存long currProducerIndexCache = lvProducerIndexCache();do{// 2. 读取全局的消费者进度currentConsumerIndex = lvConsumerIndex();// 3. 快路径判断:使用本地缓存判断队列是否为空if (currentConsumerIndex >= currProducerIndexCache){// 4. 慢路径:本地缓存表明队列为空,需同步最新的生产者进度long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex){return null; // 队列确实为空}else{// 更新本地缓存currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}}// 5. CAS 竞争:尝试原子性地将 consumerIndex 加一while (!casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1));// 6. 成功获取元素return removeElement(buffer, currentConsumerIndex, mask);}
// ... existing code ...

SPMC 特性分析

  1. 生产者进度缓存:每个消费者线程开始时都会读取 producerIndexCache。这是一个本地快照,避免了每次都去访问真正的 producerIndex
  2. 快慢路径分离
    • 快路径:只要 currentConsumerIndex < currProducerIndexCache,消费者就认为队列中有元素,直接进入第5步的 CAS 竞争。这是绝大多数情况。
    • 慢路径:当快路径条件不满足时,消费者必须通过 lvProducerIndex() 读取最新的生产者进度,并更新自己的本地缓存 producerIndexCache
  3. CAS 争用casConsumerIndex 是多消费者协调的核心。多个消费者线程可能同时读取到相同的 currentConsumerIndex,但只有一个能通过 CAS 操作成功地将其加一,从而“赢得”消费该位置元素的权利。失败的线程则会重新循环,读取新的 consumerIndex 再次尝试。
  4. 无竞争取出:一旦一个消费者通过 CAS 成功预定了位置,它就可以安全地调用 removeElement 来取出元素。因为 removeElement 操作的是一个已经被它“私有化”的索引,不会有其他消费者来干扰。

SpmcArrayQueue.peek() 方法详细分析

peek() 方法是队列中的一个重要操作,它允许查看队列头部元素但不移除该元素。

public E peek() {final E[] buffer = this.buffer;final long mask = this.mask;long currProducerIndexCache = lvProducerIndexCache();long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null;} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();} while (null == e || nextConsumerIndex != currentConsumerIndex);return e;
}

方法开始首先获取几个关键变量:

  • buffer: 队列的底层数组,存储实际元素
  • mask: 用于计算循环队列位置的掩码值,通常是 capacity-1
  • currProducerIndexCache: 生产者索引的缓存值,这是一个重要的优化变量
  • currentConsumerIndex 和 nextConsumerIndex: 消费者索引的当前值和下一个值

  1. 在 peek() 方法中,消费者首先检查 producerIndexCache
  2. 只有当 currentConsumerIndex >= currProducerIndexCache 时才需要读取真实的 producerIndex
  3. 大多数情况下,队列不为空时,消费者可以直接使用缓存值,避免读取主 producerIndex
  4. 性能影响

    • 直接读取 producerIndex:每次都要从主内存读取,可能触发缓存失效
    • 使用 producerIndexCache:大部分时间从本地缓存读取,减少内存屏障和缓存一致性流量
  5. 正确性保证

    • 虽然使用了缓存,但通过 lvProducerIndex() 的检查确保了最终一致性
    • 当缓存可能过期时(currentConsumerIndex >= currProducerIndexCache),会重新读取真实值

缓存更新逻辑

if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null; // 队列为空} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex); // 更新缓存}
}

这段代码处理了两种情况:

  1. 队列为空:当消费者索引已经赶上或超过生产者索引时,返回 null
  2. 缓存过期:当缓存值小于实际生产者索引时,更新缓存值

元素加载

e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));

这行代码从缓冲区中加载元素:

  • calcCircularRefElementOffset 计算循环队列中的实际位置
  • lvRefElement 是一个 volatile 加载操作,确保内存可见性

一致性检查

// sandwich the element load between 2 consumer index loads
nextConsumerIndex = lvConsumerIndex();
} while (null == e || nextConsumerIndex != currentConsumerIndex);

这是一个重要的并发控制机制,被称为"三明治加载":

  1. 在加载元素前获取消费者索引(currentConsumerIndex
  2. 加载元素本身
  3. 再次获取消费者索引(nextConsumerIndex

通过比较两次获取的消费者索引是否相同,可以确保在加载元素过程中没有其他消费者修改了队列状态。如果不同,说明有其他消费者已经修改了队列,需要重试。

循环条件分析

while (null == e || nextConsumerIndex != currentConsumerIndex)

循环继续的条件有两个:

  1. null == e: 加载的元素为 null,可能是因为:

    • 队列确实为空
    • 生产者正在写入元素但尚未完成
    • 其他消费者已经取走了该元素
  2. nextConsumerIndex != currentConsumerIndex: 消费者索引在加载元素过程中被修改,说明有并发操作干扰

内存访问顺序

方法中的内存访问遵循特定顺序,确保正确的并发语义:

  1. 首先读取生产者索引缓存(lvProducerIndexCache()
  2. 读取消费者索引(lvConsumerIndex()
  3. 如果需要,读取实际生产者索引(lvProducerIndex()
  4. 读取队列元素(lvRefElement()
  5. 再次读取消费者索引(lvConsumerIndex()

这种顺序确保了在多线程环境下能正确检测队列状态变化。

volatile 操作的作用

  • lvProducerIndexCache(): volatile 读取,确保获取最新的缓存值
  • lvConsumerIndex(): volatile 读取,确保获取最新的消费者位置
  • lvProducerIndex(): volatile 读取,确保获取最新的生产者位置
  • svProducerIndexCache(): volatile 写入,确保缓存更新对所有线程可见

这些 volatile 操作确保了多线程间的内存可见性,防止出现不一致的视图。

循环优化

虽然方法中包含一个 do-while 循环,但在正常情况下(队列不为空且没有并发干扰),循环只会执行一次。只有在以下情况下才会多次循环:

  1. 队列为空
  2. 有并发消费者干扰
  3. 生产者正在写入元素但尚未完成

与其他方法的比较

peek() vs poll()

  • peek() 只查看元素但不移除
  • poll() 查看并移除元素
  • peek() 不需要修改消费者索引,而 poll() 需要通过 CAS 操作更新消费者索引

peek() vs relaxedPeek()

@Override
public E relaxedPeek() {final E[] buffer = this.buffer;final long mask = this.mask;long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();}while (nextConsumerIndex != currentConsumerIndex);return e;
}

relaxedPeek() 是 peek() 的"宽松"版本:

  • 不检查队列是否为空
  • 不使用生产者索引缓存
  • 只确保在加载元素过程中消费者索引没有变化

这使得 relaxedPeek() 性能更好,但可能在某些边界情况下行为不同(例如当队列为空时)。

适用场景

peek() 方法特别适用于以下场景:

  1. 检查队列内容:在不修改队列状态的情况下查看头部元素
  2. 多消费者环境:在有多个消费者线程的情况下,确保正确处理并发访问
  3. 性能敏感场景:通过缓存机制减少对共享变量的访问,提高性能
  4. 需要强一致性保证:相比 relaxedPeek(),提供更强的一致性保证

 

总结 

SpmcArrayQueue 相比 SpscArrayQueue 的核心特性和设计权衡在于:

  • 多消费者协调:引入了 CAS 操作 (casConsumerIndex) 来解决多个消费者对 consumerIndex 的争用问题,这是从 SPSC 到 SPMC 的根本性变化。
  • 消费者侧缓存:增加了 producerIndexCache 字段,让每个消费者可以缓存生产者进度,大大减少了对 producerIndex 的 volatile 读,降低了缓存一致性流量,提升了消费者侧的性能。
  • 内存布局的进一步细分:通过 MidPad 将 consumerIndex(极热争用点)和 producerIndexCache(消费者本地缓存)进行隔离,进一步优化了缓存性能。
  • 性能权衡:在 offer 方法中,允许了短暂的自旋等待,牺牲了严格的无等待性,以换取更简单的生产者逻辑。

总而言之,SpmcArrayQueue 是一个通过精巧的内存布局、CAS竞争和消费者侧缓存机制,高效解决了单生产者、多消费者并发难题的优秀实现。

http://www.dtcms.com/a/344738.html

相关文章:

  • 支持轻量化部署的混元3D世界模型Lite版本上线魔乐社区,昇腾部署实践来啦
  • FCT/ATE/ICT通用测试上位机软件
  • Leetcode—595. 大的国家【简单】
  • JUC之Fork/Join
  • WindowsAPI|每天了解几个winAPI接口之网络配置相关文档Iphlpapi.h详细分析9
  • 2-3.Python 编码基础 - 类型检测与类型转换
  • Vue 实现可拖拽分割布局(支持左右、上下拖拽调整)
  • Java 学习笔记(基础篇7)
  • 2025年游戏盾SDK动态加密技术全景解析:从防御破解到重塑游戏安全基石
  • CSM5110 5V/1A降压芯片 SOT23-5封装 可替代RY3408 带OVP保护
  • vim的使用
  • 牛客面经1 滴滴社招-002
  • JAVA国际版多商户运营版商城系统源码多商户社交电商系统源码支持Android+IOS+H5
  • 哈希和字符串哈希
  • STM32 外设驱动模块七:红外反射式光电模块
  • Centos 8 管理防火墙
  • 安装Tailscale
  • Maven初识到应用
  • 【AI应用】向量数据库Milvus详细命令
  • Jenkins + SonarQube 从原理到实战四:Jenkins 与 Gerrit 集成并实现自动任务
  • Linux爆音问题解决方法(隔一会会有奇怪噪音)
  • Go 基础解析
  • 逛越南本地菜市场学英语
  • 异质结3.0时代的降本提效革命:捷造科技设备技术创新与产业拐点分析
  • DSPy框架:从提示工程到声明式编程的革命性转变
  • go 常见面试题
  • 番茄(西红柿)叶片病害检测数据集:12k+图像,10类,yolo标注
  • RAG中稠密向量和稀疏向量
  • 基于抗辐照性能的ASP4644S电源芯片特性分析与多领域应用验证
  • show-overflow-tooltip使用当内容过多不展示...