当前位置: 首页 > news >正文

c++无锁队列moodycamel::ConcurrentQueue测试结果

1moodycamel::ConcurrentQueue测试结果

 在c++开发中,无锁队列moodycamel::ConcurrentQueue使用较多,示意图如下:

从宏观来看moodycamel::ConcurrentQueue是MPMC队列,即多生产者,多消费者队列;从微观来看,是SPMC队列,即单生产者,多消费者队列。在ConcurrentQueue中,没个生产者都有自己对应的队列。这也引入了ConcurrentQueue的一个特点:

ConcurrentQueue只能保证单个生产者的FIFO特性,即对于一个生产者来说,元素出队的顺序和入队的顺序是一样的。如果考虑多个生产者,那么出队顺序和入队顺序可能是不一样的。这个特点也限制了ConcurrentQueue的应用场景。

moodycamel::ConcurrentQueue提供了两种无锁队列:ConcurrentQueue和BlockingConcurrentQueue,两者的区别是:后者的消费者和生产者事件有等待唤醒机制,前者没有。使用BlockingConcurrentQueue时 ,如果队列为空的时候,没有元素可以消费,那么消费者线程会等待,不会占用cpu,生产者生产元素的时候,会唤醒消费者;使用ConcurrentQueue,当队列为空的时候,消费者线程会一直轮询,不会让出cpu。

在实时linux系统中测试了多线程情况下,入队的最大延时,结果如下:

BlockingConcurrentQueue70μs
ConcurrentQueue55μs
ConcurrentQueue,固定大小35μs

BlockingConcurrentQueue:默认情况下,可以动态申请内存。生产者入队时,还需要通过信号量唤醒消费者。

ConcurrentQueue:入队时,不需要通过信号量唤醒消费者。

默认情况下,队列支持动态扩展,动态扩展时会动态申请内存;也可以不支持静态扩展,在构造时,指定队列的大小。动态扩展比固定大小的最大延时大20μs。

2无锁队列实现

在实际开发中,使用较多的是有锁队列,有锁队列使用简单;当业务对实时性有要求较高的时候,才会考虑使用无锁队列。无锁队列并不是真正的无锁,在操作队列的过程中也需要同步机制,只不过同步的粒度变小了。在linux实时系统中进行测试,同样的用户代码,使用有锁队列的最大延时比无锁队列大一个数量级。

有锁队列的操作步骤如下,使用简单。

加锁

操作队列

无锁队列的操作步骤,是典型的三段式,分三个步骤,无锁队列减小的同步的粒度,如下3给 步骤中,只有第一步和第3部的时候,需要使用原子操作进行同步。

占位置

入队,或者出队

更新位置

2.1moodycamel::ConcurrentQueue实现

入队:

moodycamel::ConcurrentQueue本质上是一个SPMC类型的队列,每个生产者都有自己的对应的队列。

        template<AllocationMode allocMode, typename U>

        inline bool enqueue(U&& element)

        {

            //第一步:占位置

            index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);

            index_t newTailIndex = 1 + currentTailIndex;

            ...

            //第二步:Enqueue

            new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));

            

            //第三步:更新索引

            this->tailIndex.store(newTailIndex, std::memory_order_release);

            return true;

        }

出队:

出队是多线程,要考虑多线程之间的同步。在moodycamel::ConcurrentQueue的实现中,多线程的同步,并没有使用cas类似的操作,而是使用了多个原子变量,这样减少了线程间的同步,可以提高性能。

出队实现比较复杂,可以参考坐着的博客,出队时要增加了两个计数dequeueOptimisticCount和dequeueOvercommit。

Detailed Design of a Lock-Free Queue

2.2dpdk无锁队列实现

dpdk中无锁队列的实现是典型的三段式。

以下是入队代码,出队代码和入队代码是类似的。

static __rte_always_inline unsigned int

__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,

        unsigned int esize, unsigned int n,

        enum rte_ring_queue_behavior behavior, unsigned int is_sp,

        unsigned int *free_space)

{

    uint32_t prod_head, prod_next;

    uint32_t free_entries;

   

    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,

            &prod_head, &prod_next, &free_entries);

    if (n == 0)

        goto end;

    __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);

    __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);

end:

    if (free_space != NULL)

        *free_space = free_entries - n;

    return n;

}

无锁队列的生产者索引和消费者索引,均有两个索引来维护。如下是生产者索引的变化过程:

初始状态:生产者索引的head和tail指向同一个位置。

第一步:占位置,占位置的时候,只更新head,不更新tail。因为更新tail的时候,说明䛾已经入队成功,元素是可以消费的,这个时候不更新tail,是因为元素还没有入队。

第二步:元素入队

第三步:更新tail 

第一步:占位置,更新head

最终调用到如下函数,通过函数rte_atomic32_cmpset来进行多线程间的同步。

static __rte_always_inline unsigned int

__rte_ring_headtail_move_head(struct rte_ring_headtail *d,

        const struct rte_ring_headtail *s, uint32_t capacity,

        unsigned int is_st, unsigned int n,

        enum rte_ring_queue_behavior behavior,

        uint32_t *old_head, uint32_t *new_head, uint32_t *entries)

{

    unsigned int max = n;

    int success;

    do {

        /* Reset n to the initial burst count */

        n = max;

        *old_head = d->head;

        /* add rmb barrier to avoid load/load reorder in weak

         * memory model. It is noop on x86

         */

        rte_smp_rmb();

        /*

         *  The subtraction is done between two unsigned 32bits value

         * (the result is always modulo 32 bits even if we have

         * *old_head > s->tail). So 'entries' is always between 0

         * and capacity (which is < size).

         */

        *entries = (capacity + s->tail - *old_head);

        /* check that we have enough room in ring */

        if (unlikely(n > *entries))

            n = (behavior == RTE_RING_QUEUE_FIXED) ?

                    0 : *entries;

        if (n == 0)

            return 0;

        *new_head = *old_head + n;

        if (is_st) {

            d->head = *new_head;

            success = 1;

        } else

            success = rte_atomic32_cmpset(

                    (uint32_t *)(uintptr_t)&d->head,

                    *old_head, *new_head);

    } while (unlikely(success == 0));

    return n;

}

第二步:元素入队,在这一步不需要原子操作。

第三步:更新tail

static __rte_always_inline void

__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,

        uint32_t new_val, uint32_t single, uint32_t enqueue)

{

    if (enqueue)

        rte_smp_wmb();

    else

        rte_smp_rmb();

    /*

     * If there are other enqueues/dequeues in progress that preceded us,

     * we need to wait for them to complete

     */

    if (!single)

        rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,

            rte_memory_order_relaxed);

    ht->tail = new_val;

}

2.3SPSC和MPMC无锁队列实现的差异

单生产者单消费者,在一开始占位置的时候,更新head的时候不需要通过cas进行线程间同步,直接更新即可;在最后更新tail的时候,也不需要cas进行线程间同步,直接更新tail即可。

http://www.dtcms.com/a/278046.html

相关文章:

  • 在高并发场景下,仅依赖数据库机制(如行锁、版本控制)无法完全避免数据异常的问题
  • Sping AI Alibaba
  • 第11章 AB实验评估指标体系
  • Soul方程式:Z世代背景下兴趣社交平台的商业模式解析
  • Java行业前景如何?零基础又该如何去学Java?
  • 深入理解 RocketMQ:生产者详解
  • 并行并发丨C++ 协程、现场池 学习笔记
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十三课——图像浮雕效果的FPGA实现
  • 语言模型常用的激活函数(Sigmoid ,GeLU ,SwiGLU,GLU,SiLU,Swish)
  • 算法-汽水瓶兑换
  • Spring AI 项目实战(十七):Spring Boot + AI + 通义千问星辰航空智能机票预订系统(附完整源码)
  • 【webrtc】gcc当前可用码率3:x264响应码率改变
  • 系规备考论文:论IT服务部署实施方法
  • 西藏氆氇新生:牦牛绒混搭液态金属的先锋尝试
  • 分布式锁踩坑记:当“防重“变成了“重复“
  • JAVA并发——什么是Java的原子性、可见性和有序性
  • Redis缓存设计与性能优化指南
  • 使用Starrocks替换Clickhouse的理由
  • C++封装、多态、继承
  • 在 Ubuntu 下安装 MySQL 数据库
  • 从文本中 “提取” 商业洞察“DatawhaleAI夏令营”
  • 电路分析基础(02)-电阻电路的等效变换
  • Matlab批量转换1km降水数据为tiff格式
  • 【LeetCode100】--- 5.盛水最多的容器【复习回顾】
  • ssm学习笔记day05
  • QT 多线程 管理串口
  • 《[系统底层攻坚] 张冬〈大话存储终极版〉精读计划启动——存储架构原理深度拆解之旅》-系统性学习笔记(适合小白与IT工作人员)
  • springboot高校竞赛赛事管理系统 计算机毕业设计源码23756
  • Java行为型模式---策略模式
  • 第1章 概 述