当前位置: 首页 > news >正文

【基础组件 and 网络编程】对 DPDK 的 MPMC 无锁队列 rte-ring 组件的思考分析(同时也是实战原子操作的好机会)

文章目录

  • DPDK 的巨页池管理工具是 `RTE-RING`
  • 找到 `RTE-RING` 的源码
  • `RTE-RING` 的源代码分析
    • `rte_ring` 的原子操作分析、处理多线程竞争
    • 生产者指针入队的宏(将被复用)
    • 多生产者入队函数
    • 单生产者入队函数
    • 消费者指针出队的宏(将被复用)
    • 多消费者出队函数
    • 单消费者出队函数
    • 包装生产入队函数(囊括单生产者与多生产者)
    • 包装消费出队函数(囊括单消费者与多消费者)
    • 环形队列的计数函数
    • 环形队列的创建、初始化、销毁
  • 代码测试

推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接。

DPDK 的巨页池管理工具是 RTE-RING

DPDK 里随处可见的 rte 前缀,其实是 “Run-Time Environment” 的缩写。最早由 Intel 内部项目发起,把内核网络栈搬到用户态跑,需要一个 “运行环境” 来管理大页、PCI 设备、线程亲和、无锁队列等基础设施,于是起了个名字叫 Run-Time Environment,缩写 rte。

我也有一个新的感悟。什么是队列?是注重程序正义的数据结构,是先进先出后进后出,不是一个强查找算法,所谓的指标根本在这里没有用,这是我以前的看法。我现在的看法更进一步,它是异步计算的数据资源同步器,队列的根本作用在于同步!!!!

今天所介绍的 MPMC 环形队列是一个服务器专用的数据模型,他能多线程的执行 I/O 任务。他就是 DPDK 内置的 rte-ring 无锁环形队列(原子操作保证多线程的数据同步性,也正好符合,DPDK 是指挥多个 CPU 读写巨页池的机制)。关于 DPDK,我曾写过一篇文章介绍过他的环境配置 《初识 DPDK 并配置 DPDK 环境》,我在这篇文章指出了 DPDK 的优势:

  1. 减少上下文切换:部分 CPU 特异化为专用的 IO 设备,其余 CPU 绝缘于 CPU,减少 CPU 被打断的清空,因为这会产生上下文切换的成本。(一个在做计算任务的 CPU 被切换出去干什么呢?内核读取 IO 数据。)
  2. 零复制:CPU 提前命令网卡,让其做内存映射,通过 DMA 机制将网卡的缓冲区数据写入内存映射区(物理上位于内存),写完后再通知被 DPDK 绑定的 CPU。
  3. 减少内存搜索:通过巨页池的设置和挂载,使得被 DPDK 绑定的 CPU 的 TLB 内的地址装框得下巨页池的所有块的地址,那就不用因为搜不到而作全局随机搜索。

但其实,那篇文章里面还没提到的一点是,DPDK 的巨页池管理是很有一套的。

  1. 多线程环境下避免锁竞争:采用无锁队列管理多线程对巨页池的内存申请、释放

在这里插入图片描述

并且也给出了 DPDK 是如何管理巨页池的(其实巨页池也是内存池)

在这里插入图片描述
所以内存池是大有门道的,我曾写过一篇文章介绍过内存池设计 《手撕内存池的设计代码(C 实现)》,当时我只是指出了对于控制内存碎片的情形,我们可以把小碎片收集起来装载到大节点之中。其实,内存池也是可以服务于提升服务器吞吐量的,这其中的关键就是我们今天所要提到的 rte-ring 无锁队列。

找到 RTE-RING 的源码

读者可以在 http://core.dpdk.org/download/ 下载 24.11.3 版本的 DPDK (不同版本的区别是很大的,请谨慎选择),具体下载可见 《初识 DPDK 并配置 DPDK 环境》。我们可以翻看文件夹

在这里插入图片描述
找出其内的

在这里插入图片描述
找到 ring 文件夹

在这里插入图片描述
最后找到 rte_ring.h 头文件和其对应的源代码。

在这里插入图片描述

RTE-RING 的源代码分析

高效的无锁队列是通过原子操作保证线程之间的数据同步性,至于原子操作为何可以保证多线程间的数据同步,读者可以自行参考我写过的博客 《一次过讲清 CPU 的三级缓存、MESI 缓存一致性协议和原子操作三者的关系》,这篇文章详细介绍了原子操作的内存序、缓存锁等重要话题。读者也可以参考 DPDK 官方给出的 RTE-RING 的官方介绍文档 https://doc.dpdk.org/guides/prog_guide/ring_lib.html,当然也可以参考 《深入浅出 DPDK》这本书。

rte_ring 的原子操作分析、处理多线程竞争

DPDK 的 rte-ring 的原子操作只涉及 CAS 操作。

static inline int
rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
{return __sync_bool_compare_and_swap(dst, exp, src);
}

这是编译器内置的原子操作函数,不是 C++ 语言的 <atomic> 或者 C 语言 <stdatomic.h> 里的 CAS 操作,其函数解析如下

__sync_bool_compare_and_swap(type *ptr, type oldval, type newval)  就是 GCC 提供的“硬件级 CAS”封装:一条指令完成“比较+交换”,type:任意 整型(int, long, long long 及对应无符号)或 指针 类型。返回 true 表示 旧值等于 oldval 且已写入 newval;返回 false 表示 旧值不等于 oldval,什么都没改。

原子操作怎么能少了 “内存序” 呢?此处使用汇编语言,完成指令重排的限制性声明,并且以宏 COMPILER_BARRIER() 的形式被调用。

/* dummy assembly operation to prevent compiler re-ordering of instructions */
#define COMPILER_BARRIER() do { asm volatile("" ::: "memory"); } while(0)

RTE-RING 的生产者入队模式,要么不成功就返回失败,要么不成功就返回成功部分的数量。

enum rte_ring_queue_behavior {/* 尝试入队/出队固定数量 n 个对象,不成功则失败 */RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring *//* 尝试入队/出队最多 n 个对象,返回实际成功的数量 */RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items a possible from ring */
};

rte-ring 无锁环形队列数据结构如下,需要注意到环形队列是有消费者偏移组和生产者偏移组(偏移≠指针),这个队列结构是可以充当多线程异步操作的资源同步器,也可以是一个控制器。void * ring[0] __rte_cache_aligned 是 C/C++ 的柔性数组语法,用来任意补长度的,比如说这里如果不算上柔性数组的话,该结构体共 84 个字节(宏 RTE_RING_NAMESIZE 是 32 ),需要补充字节到 128 个才是 64 的倍数,因为一个缓存行刚好是 64 字节,这样可以减少缓存被无效刷新,而且多余的 44 个字节可以放 5 个指针。不止的,如果指针数量众多,还可以继续填充到 3×64=192 个字节…柔性数组是用来放置具体数据指针的,前两个结构体是这个柔性数组的控制器。

柔性数组还有一个天大的好处,那就是可以任意指定环形数组的长度。rte_ring 生产者结构体中的 watermark(水位线)成员用于实现一种软性的流量控制(Traffic Shaping or Backpressure)机制。在生产速度远大于消费速度时,提前向生产者发出警告,而不是等到队列完全满了才通知。(这也就是说警戒线必须要低于环形数列的预设长度,否则没有意义)

struct rte_ring {char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */int flags;                       /**< Flags supplied at creation. *//** Ring producer status. */struct prod {uint32_t watermark;      /**< Maximum items before EDQUOT. */uint32_t sp_enqueue;     /**< True, if single producer. */uint32_t size;           /**< Size of ring. */uint32_t mask;           /**< Mask (size-1) of ring. */volatile uint32_t head;  /**< Producer head. */volatile uint32_t tail;  /**< Producer tail. */} prod __rte_cache_aligned;/** Ring consumer status. */struct cons {uint32_t sc_dequeue;     /**< True, if single consumer. */uint32_t size;           /**< Size of the ring. */uint32_t mask;           /**< Mask (size-1) of ring. */volatile uint32_t head;  /**< Consumer head. */volatile uint32_t tail;  /**< Consumer tail. */} cons __rte_cache_aligned;;void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.* not volatile so need to be careful* about compiler re-ordering */
};

这就是官方手册原文的图示

在这里插入图片描述
其实 RTE-RING 这种数据结构有点像幼儿园的小朋友抢凳子。

在这里插入图片描述

我将分 5 步去介绍多生产者入队的情况,多消费者出队也是类似的,我只给出两个线程的竞争案例,更多线程的竞争也是类似的。首先是,第一步抢占资源。
在这里插入图片描述

第二步,是抢成功的排斥其他线程,并更新全局的 prod_head 指针

在这里插入图片描述
第三步,线程 1 写入数据

在这里插入图片描述
第四步,线程 2 抢占成功,更新全局的 prod_head,并且写入数据
在这里插入图片描述

第五步,多线程任务结束,每个线程都不约而同把尾指针更新为当前 prod_head 指针

在这里插入图片描述

注意到了没有,两个竞争线程都是在抢不到首个资源的时候,会退而求其次抢夺下一个资源,他不会吊死在一棵树之上,他们会尽快的占位子,把坑挖好,占上一个萝卜坑。也有点像 98 陈小春版本的《鹿鼎记》那一句神台词:“人人有份,永不落空。”

在这里插入图片描述

生产者指针入队的宏(将被复用)

r 指代将要在函数内传入的 rte-ring 的指针。idx 是环形队列的相位,prod_head & mask 本质是在关于周期做模变换,是一种求余的运算。其中,mask 将会是 size-1r->ring 就是环形队列里面的那一个柔性数组成员(这个柔性数组有多余的空间,可以放很多个指针),这个柔性数组至多可以放 5 个指针。有趣的是,柔性数组里指针的添加是 4 个 4 个地添加。n & ((~(unsigned)0x3)) 将变量 n 的最低两位清零,目的是避免最后一次遍历时产生 core-dump。当队列往后的 “可用空间” 不足,这个环形队列的柔性数组的指针添加是会折返回去的,否则怎么称之为 ‘“环形” 呢?

obj_table 是将要被传入的指向常量指针数组的指针。

#define ENQUEUE_PTRS() do { \const uint32_t size = r->prod.size; \uint32_t idx = prod_head & mask; \if (likely(idx + n < size)) { \for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \r->ring[idx] = obj_table[i]; \r->ring[idx+1] = obj_table[i+1]; \r->ring[idx+2] = obj_table[i+2]; \r->ring[idx+3] = obj_table[i+3]; \} \switch (n & 0x3) { \case 3: r->ring[idx++] = obj_table[i++]; \case 2: r->ring[idx++] = obj_table[i++]; \case 1: r->ring[idx++] = obj_table[i++]; \} \} else { \for (i = 0; idx < size; i++, idx++)\r->ring[idx] = obj_table[i]; \for (idx = 0; i < n; i++, idx++) \r->ring[idx] = obj_table[i]; \} \
} while(0)

多生产者入队函数

mask = r->prod.mask 是环形队列的相位周期,cons_tail 是用来判断环形队列是否已经满员,free_entries 是环形队列剩余多少的可用空间,n 是插入的数量。原子操作 rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next); 如果不成功,就会反复尝试,重复地获取新的 prod_head,再进行修改。其他线程在失败的时候会在新的循环中获取新的 prod_head,这样的话,其他线程就会互不相犯,因为大家都找到了一块地方去写 n 个指针。最后才更新尾指针。如果没有多余的位置,会看情况看宏定义 RTE_RING_QUEUE_FIXED 是全部都不写,还是能写多少就写多少。

最后当最后把全部数据都写入队列后,如果此时的队列的已用空间超过了警戒线,那就返回警戒标志;如果没超,那就返回成功标志或者已写入 n 个。EDQUOT 是一个标准的 POSIX 错误码,其含义是 “Disk quota exceeded”(磁盘配额超出)。DPDK 巧妙地借用了这个通用的错误码来表示其环形队列的 “配额超出” 状态。ENOBUFS 也是 Linux / POSIX 标准错误码之一,定义在头文件 <errno.h>,“No buffer space available”,通常解释为 “缓冲区不足” 或 “无可用缓冲空间”。

//	多生产者入队
static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,unsigned n, enum rte_ring_queue_behavior behavior)
{uint32_t prod_head, prod_next;uint32_t cons_tail, free_entries;const unsigned max = n;int success;unsigned i;uint32_t mask = r->prod.mask;int ret;/* move prod.head atomically */do {/* Reset n to the initial burst count */n = max;prod_head = r->prod.head;cons_tail = r->cons.tail;/* The subtraction is done between two unsigned 32bits value* (the result is always modulo 32 bits even if we have* prod_head > cons_tail). So 'free_entries' is always between 0* and size(ring)-1. */free_entries = (mask + cons_tail - prod_head);/* check that we have enough room in ring */if (unlikely(n > free_entries)) {if (behavior == RTE_RING_QUEUE_FIXED) {return -ENOBUFS;}else {/* No free entry available */if (unlikely(free_entries == 0)) {return 0;}n = free_entries;}}prod_next = prod_head + n;success = rte_atomic32_cmpset(&r->prod.head, prod_head,prod_next);} while (unlikely(success == 0));/* write entries in ring */ENQUEUE_PTRS();COMPILER_BARRIER();/* if we exceed the watermark */if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :(int)(n | RTE_RING_QUOT_EXCEED);}else {ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;}/** If there are other enqueues in progress that preceeded us,* we need to wait for them to complete*/r->prod.tail = prod_next;return ret;
}

单生产者入队函数

跟上面那个函数差不多,我们可以注意到,这个函数并没有原子操作(因为是单线程操作,故而无需做同步设计,也就没有原子操作失败再重试的戏码),其余都一样。

//	单生产者入队
static inline int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,unsigned n, enum rte_ring_queue_behavior behavior)
{uint32_t prod_head, cons_tail;uint32_t prod_next, free_entries;unsigned i;uint32_t mask = r->prod.mask;int ret;prod_head = r->prod.head;cons_tail = r->cons.tail;/* The subtraction is done between two unsigned 32bits value* (the result is always modulo 32 bits even if we have* prod_head > cons_tail). So 'free_entries' is always between 0* and size(ring)-1. */free_entries = mask + cons_tail - prod_head;/* check that we have enough room in ring */if (unlikely(n > free_entries)) {if (behavior == RTE_RING_QUEUE_FIXED) {return -ENOBUFS;}else {/* No free entry available */if (unlikely(free_entries == 0)) {return 0;}n = free_entries;}}prod_next = prod_head + n;r->prod.head = prod_next;/* write entries in ring */ENQUEUE_PTRS();COMPILER_BARRIER();/* if we exceed the watermark */if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :(int)(n | RTE_RING_QUOT_EXCEED);}else {ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;}r->prod.tail = prod_next;return ret;
}

消费者指针出队的宏(将被复用)

r 指代将要在函数内传入的 rte-ring 的指针。idx 是环形队列的相位,prod_head & mask 本质是在关于周期做模变换,是一种求余的运算。其中,mask 将会是 size-1r->ring 就是环形队列里面的那一个柔性数组成员(这个柔性数组有多余的空间,可以放很多个指针),这个柔性数组至多可以放 5 个指针。有趣的是,柔性数组里指针的消费也是 4 个 4 个地消费。n & ((~(unsigned)0x3)) 将变量 n 的最低两位清零,目的是避免最后一次遍历时产生 core-dump。当队列往后的 “可用空间” 不足,这个环形队列的柔性数组的指针添加是会折返回去的,否则怎么称之为 ‘“环形” 呢?

obj_table是将要被传入的指向常量指针数组的指针。

#define DEQUEUE_PTRS() do { \uint32_t idx = cons_head & mask; \const uint32_t size = r->cons.size; \if (likely(idx + n < size)) { \for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\obj_table[i] = r->ring[idx]; \obj_table[i+1] = r->ring[idx+1]; \obj_table[i+2] = r->ring[idx+2]; \obj_table[i+3] = r->ring[idx+3]; \} \switch (n & 0x3) { \case 3: obj_table[i++] = r->ring[idx++]; \case 2: obj_table[i++] = r->ring[idx++]; \case 1: obj_table[i++] = r->ring[idx++]; \} \} else { \for (i = 0; idx < size; i++, idx++) \obj_table[i] = r->ring[idx]; \for (idx = 0; i < n; i++, idx++) \obj_table[i] = r->ring[idx]; \} \
} while (0)

多消费者出队函数

prod_tail 的作用是要来计算当前环形队列是否已经满员,剩余空间 entries 也是有可能被计算为负数的,这不会不影响使用,因为我们无非是想看一下是否有足够的空余空间了,负数不更说明队列内容多到爆炸?之后就是多个线程间的 “抢座位” 活动,抢到座位的,就可以立刻写,抢不到的,那就往后抢座位,抢到就再写。最后才更新尾指针。

如果没有充足的消费空间,系统会看情况、看宏定义 RTE_RING_QUEUE_FIXED 来决定是全部都不消费,还是能消费多少就消费多少。

//	多消费者出队
static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,unsigned n, enum rte_ring_queue_behavior behavior)
{uint32_t cons_head, prod_tail;uint32_t cons_next, entries;const unsigned max = n;int success;unsigned i;uint32_t mask = r->prod.mask;/* move cons.head atomically */do {/* Restore n as it may change every loop */n = max;cons_head = r->cons.head;prod_tail = r->prod.tail;/* The subtraction is done between two unsigned 32bits value* (the result is always modulo 32 bits even if we have* cons_head > prod_tail). So 'entries' is always between 0* and size(ring)-1. */entries = (prod_tail - cons_head);/* Set the actual entries for dequeue */if (n > entries) {if (behavior == RTE_RING_QUEUE_FIXED) {return -ENOENT;}else {if (unlikely(entries == 0)){return 0;}n = entries;}}cons_next = cons_head + n;success = rte_atomic32_cmpset(&r->cons.head, cons_head,cons_next);} while (unlikely(success == 0));/* copy in table */DEQUEUE_PTRS();COMPILER_BARRIER();/** If there are other dequeues in progress that preceded us,* we need to wait for them to complete*/r->cons.tail = cons_next;return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}

单消费者出队函数

跟上面的函数差不多,只是没有多线程的争抢,故而不需要用到原子操作函数。

//	单生产者出列
static inline int __attribute__((always_inline))
__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,unsigned n, enum rte_ring_queue_behavior behavior)
{uint32_t cons_head, prod_tail;uint32_t cons_next, entries;unsigned i;uint32_t mask = r->prod.mask;cons_head = r->cons.head;prod_tail = r->prod.tail;/* The subtraction is done between two unsigned 32bits value* (the result is always modulo 32 bits even if we have* cons_head > prod_tail). So 'entries' is always between 0* and size(ring)-1. */entries = prod_tail - cons_head;if (n > entries) {if (behavior == RTE_RING_QUEUE_FIXED) {return -ENOENT;}else {if (unlikely(entries == 0)){return 0;}n = entries;}}cons_next = cons_head + n;r->cons.head = cons_next;/* copy in table */DEQUEUE_PTRS();COMPILER_BARRIER();r->cons.tail = cons_next;return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}

包装生产入队函数(囊括单生产者与多生产者)

每次可以插入固定多个(大于 2)元素。没有多余空间的时候,本线程的 n 个资源都不生产。环形队列的 r->prod.sp_enqueue 就是说这个队列有多少个生产者。多生产者者就作原子操作版本的。

bulk = “整批搬家,全有或全无”

static inline int __attribute__((always_inline))
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,unsigned n)
{return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
}static inline int __attribute__((always_inline))
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,unsigned n)
{return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
}static inline int __attribute__((always_inline))
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,unsigned n)
{if (r->prod.sp_enqueue)return rte_ring_sp_enqueue_bulk(r, obj_table, n);elsereturn rte_ring_mp_enqueue_bulk(r, obj_table, n);
}

每次仅插入 1 个元素入环形队列。环形队列的 r->prod.sp_enqueue 就是说这个队列有多少个生产者。多生产者者就作原子操作版本的。

static inline int __attribute__((always_inline))
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{return rte_ring_mp_enqueue_bulk(r, &obj, 1);
}static inline int __attribute__((always_inline))
rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
{return rte_ring_sp_enqueue_bulk(r, &obj, 1);
}static inline int __attribute__((always_inline))
rte_ring_enqueue(struct rte_ring *r, void *obj)
{if (r->prod.sp_enqueue)return rte_ring_sp_enqueue(r, obj);elsereturn rte_ring_mp_enqueue(r, obj);
}

每次插入 n 个元素,空闲位置不充足的时候,能插入多少就插入多少的版本。

burst 就是 “能搬多少算多少”

static inline int __attribute__((always_inline))
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,unsigned n)
{return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
}static inline int __attribute__((always_inline))
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,unsigned n)
{return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
}static inline int __attribute__((always_inline))
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,unsigned n)
{if (r->prod.sp_enqueue)return 	rte_ring_sp_enqueue_burst(r, obj_table, n);elsereturn 	rte_ring_mp_enqueue_burst(r, obj_table, n);
}

包装消费出队函数(囊括单消费者与多消费者)

每次可消费固定 n 个元素。没有多余空间的时候,本线程的 n 个资源都不消费。环形队列的 r->cons.sc_dequeue 就是说这个队列有多少个消费者。多消费者就作原子操作版本的。

bulk = “整批搬家,全有或全无”

static inline int __attribute__((always_inline))
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
}static inline int __attribute__((always_inline))
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
}static inline int __attribute__((always_inline))
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{if (r->cons.sc_dequeue)return rte_ring_sc_dequeue_bulk(r, obj_table, n);elsereturn rte_ring_mc_dequeue_bulk(r, obj_table, n);
}

每次仅消费 1 个元素入环形队列。环形队列的 r->prod.sp_enqueue 就是说这个队列有多少个生产者。多生产者者就作原子操作版本的。

static inline int __attribute__((always_inline))
rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
{return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
}static inline int __attribute__((always_inline))
rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
{return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
}static inline int __attribute__((always_inline))
rte_ring_dequeue(struct rte_ring *r, void **obj_p)
{if (r->cons.sc_dequeue)return rte_ring_sc_dequeue(r, obj_p);elsereturn rte_ring_mc_dequeue(r, obj_p);
}

固定消费 n 个,元素不充足的时候能消费多少就消费多少的版本。

burst 就是 “能搬多少算多少”

static inline int __attribute__((always_inline))
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
{return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
}static inline int __attribute__((always_inline))
rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
{return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
}static inline int __attribute__((always_inline))
rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
{if (r->cons.sc_dequeue)return rte_ring_sc_dequeue_burst(r, obj_table, n);elsereturn rte_ring_mc_dequeue_burst(r, obj_table, n);
}

环形队列的计数函数

判断环形队列是否已经满员

static inline int
rte_ring_full(const struct rte_ring *r)
{uint32_t prod_tail = r->prod.tail;uint32_t cons_tail = r->cons.tail;return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0);
}

判断环形队列是否为空

static inline int
rte_ring_empty(const struct rte_ring *r)
{uint32_t prod_tail = r->prod.tail;uint32_t cons_tail = r->cons.tail;return !!(cons_tail == prod_tail);
}

计算当前环形队列还有多少内容没读完

static inline unsigned
rte_ring_count(const struct rte_ring *r)
{uint32_t prod_tail = r->prod.tail;uint32_t cons_tail = r->cons.tail;return ((prod_tail - cons_tail) & r->prod.mask);
}

计算当前环形队列还有多少空闲位置

static inline unsigned
rte_ring_free_count(const struct rte_ring *r)
{uint32_t prod_tail = r->prod.tail;uint32_t cons_tail = r->cons.tail;return ((cons_tail - prod_tail - 1) & r->prod.mask);
}

rte_ring_free_countrte_ring_count 两个函数都涉及了二进制运算中负数关于其相反数的 补码表示法,即 按位取反后加一
−a=(∼a)+1-a = (\sim a) + 1a=(a)+1
原因是生产者偏移减去消费者偏移的值有可能是一个负数。我举个例子(为方便表示,假设现在的数字空间是 8 位的),
0b1的负数==>>先按位取反0b11111110==>>再加10b111111110b 1\; 的负数\;==>> 先按位取反\;\;0b11111110\;\;==>>再加 1\;\;0b111111110b1的负数==>>先按位取反0b11111110==>>再加10b11111111

这两个函数也都涉及了二进制运算的特殊技巧法——“掩码 mask 法”,所谓的掩码指代的是一个二进制数所有二进制位上数字1 是连续存在的,掩码是通过一个 2 的 N 次方减 1 得到的,我做个例子,7 是 8 对应的掩码
0b1000−1=0b01110b1000 -1 = 0b01110b10001=0b0111

掩码的好处是可以通过按位与操作表达所有小于 2 的 N 次方的所有数字。因此,DPDK 的 rte-ring 环形队列的长度被初始化地设定成 2 的 N 次方,因而她才能获取对应的掩码 r->prod.mask

现在这两个函数的问题是,假如有一个 2 的 N 次方正整数 BBB,其对应的掩码是 maskmaskmask,关于一个整数 aaa 的函数 f(a)f(a)f(a) ,其定义如下
f(a)={a,0≤a<B,B+a,−B≤a<0,f(a) = \left\{ \begin{aligned} &a,\;\;0\leq a<B, \\ &B+a,\;\; -B\leq a <0, \end{aligned} \right. f(a)={a,0a<B,B+a,Ba<0,

如何从二进制的位运算理解一般的加减法呢(进而理解上述的模变换)?答案还是只考虑特殊情况,假如 ccc 是一个正整数,其有一个最小的值的形式为 2M2^M2M 的正整数上界 DDD,其对应的掩码为 mask(D)mask(D)mask(D)
D=c+(D−c)=c+[D+(−c)]D=c+(D-c) =c+[D+(-c)] D=c+(Dc)=c+[D+(c)]

而且有分块运算加和律(一个二进制数前后截成两段),读者可以从 “补码表示法” 理解
−c=(−c)&(∼mask(D))+(−c)&mask(D),-c = (-c)\;\&\;(\sim mask(D)) + (-c)\;\&\;mask(D) ,c=(c)&(mask(D))+(c)&mask(D),
而且 c<Dc<Dc<D,说明了 −c-cc 超过 DDD 的最高位后的数字都会是 1(读者也可以从 “补码表示法” 理解),故而
D=(−1)×[(−c)&(∼mask(D))]D= (-1)×[(-c)\; \& \;(\sim mask(D))]D=(1)×[(c)&(mask(D))]

联立式子可得
D=c+(−c)&mask(D)=c&mask(D)+(−c)&mask(D)D= c + (-c)\;\&\;mask(D) \\ =c\;\&\;mask(D) +(-c)\;\&\;mask(D) D=c+(c)&mask(D)=c&mask(D)+(c)&mask(D)
我愿称之为互补公式,进而有

f(a)={a,0≤a<B,mask(B)&a,−B≤a<0,f(a) = \left\{ \begin{aligned} &a,\;\;0\leq a<B, \\ & mask(B)\;\&\;a,\;\; -B\leq a <0, \end{aligned} \right. f(a)={a,0a<B,mask(B)&a,Ba<0,
所以,f(a)=mask(B)&a.f(a)= mask(B)\;\&\;a.f(a)=mask(B)&a.

至此,代码中的数学问题,已经全解释了。

环形队列的创建、初始化、销毁

RTE 环形队列的创建。在 C 编程之中,必须要注意的是位运算远比算术运算要来的快,而高效实现位运算的前提是所操作的数字必须是 2 的整数倍,否则还会是很慢的。

/* true if x is a power of 2 */
#define POWEROF2(x) ((((x)-1) & (x)) == 0)/* create the ring */
struct rte_ring *
rte_ring_create(const char *name, unsigned count, unsigned flags)
{struct rte_ring *r;size_t ring_size;/* count must be a power of 2 */if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {errno = EINVAL;return NULL;}ring_size = count * sizeof(void *) + sizeof(struct rte_ring);r = (struct rte_ring *)malloc(ring_size);if (r != NULL) {/* init the ring structure */memset(r, 0, sizeof(*r));snprintf(r->name, sizeof(r->name), "%s", name);r->flags = flags;r->prod.watermark = count;r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);r->prod.size = r->cons.size = count;r->prod.mask = r->cons.mask = count-1;r->prod.head = r->cons.head = 0;r->prod.tail = r->cons.tail = 0;} else {errno = ENOBUFS;}return r;
}

设置生产者的警戒线(不能一次过加太多,否则长度固定的环形队列会因为空间不够而丢失数据)

int
rte_ring_set_water_mark(struct rte_ring *r, unsigned count)
{if (count >= r->prod.size)return -EINVAL;/* if count is 0, disable the watermarking */if (count == 0)count = r->prod.size;r->prod.watermark = count;return 0;
}

在删除之前对环形队列数据结构作出 “侧写”。

/* dump the status of the ring on the console */
void
rte_ring_dump(const struct rte_ring *r)
{printf("ring <%s>@%p\n", r->name, r);printf("  flags=%x\n", r->flags);printf("  size=%"PRIu32"\n", r->prod.size);printf("  ct=%"PRIu32"\n", r->cons.tail);printf("  ch=%"PRIu32"\n", r->cons.head);printf("  pt=%"PRIu32"\n", r->prod.tail);printf("  ph=%"PRIu32"\n", r->prod.head);printf("  used=%u\n", rte_ring_count(r));printf("  avail=%u\n", rte_ring_free_count(r));if (r->prod.watermark == r->prod.size)printf("  watermark=0\n");elseprintf("  watermark=%"PRIu32"\n", r->prod.watermark);/* sum and dump statistics */printf("  no statistics available\n");
}

销毁环形队列

void
rte_ring_free(struct rte_ring *r)
{free(r); 
}

代码测试

代码测试的内容很简单,作加和测试,按顺序添加 0~999 的数字重复 5 次,最终加和的数字就是 (999+0)×1000÷2×5=2497500(999+0)×1000÷2×5=2497500(999+0)×1000÷2×5=2497500。并且计算所用的时间。

重复的 5 次就是由 5 个线程完成,并且每个线程每次就往环形队列添加 1 元,即我们只会用到 rte_ring_sc_dequeue 和 rte_ring_mp_enqueue 函数。

函数 getticks 是一段 “读取 CPU 时间戳计数器(Time-Stamp Counter)” 的内联汇编函数,名字通常叫 getticks / rdtsc / get_cycles,在 DPDK、OS 内核、性能测试代码里随处可见。rdtsc 这条指令硬件级就把 64 位时间戳劈成两半:
低 32 位 → EAX 寄存器
高 32 位 → EDX 寄存器
因此不管运行在 32 位还是 64 位模式,CPU 总是从 两个不同的寄存器 里把值掏出来;
C 代码里看到的 a 和 d 正好对应这两个寄存器,拼接后才能复原成完整的 64 位 TSC。函数返回的数值本身 没有单位,直观理解为 “时钟周期数”,是根据计算机的具体系统设置而来的。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>#include "rte_ring.h"#define RING_SIZE 16<<20    //  2 的 24 次方typedef struct cc_queue_node {int data;
} cc_queue_node_t;static struct rte_ring *r;typedef unsigned long long ticks;//  这是一段 “读取 CPU 时间戳计数器(Time-Stamp Counter)” 的内联汇编函数,
//  名字通常叫 getticks / rdtsc / get_cycles,在 DPDK、OS 内核、性能测试代码里随处可见。
static inline ticks getticks(void)
{u_int32_t a, d;asm __volatile__("rdtsc" : "=a" (a), "=d" (d));return (((ticks)a) | (((ticks)d) << 32));
}void *enqueue_fun(void *data)
{int n = *(int*)data;int i = 0;int ret;cc_queue_node_t *p;for (; i < n; i++) {p = (cc_queue_node_t *)malloc(sizeof(cc_queue_node_t));p->data = i;ret = rte_ring_mp_enqueue(r, p);if (ret != 0) {printf("enqueue failed: %d\n", i);}}return NULL;
}void *dequeue_func(void *data)
{int ret=0;int i = 0;int sum = 0;int n = *(int*)data;cc_queue_node_t *p= NULL;ticks t1, t2, diff;//return;t1 = getticks();while (1) {p = NULL;ret = rte_ring_sc_dequeue(r, (void **)&p);if (ret != 0) {//do something}if (p != NULL) {i++;sum += p->data;free(p);if (i == n) {break;}}}t2 = getticks();diff = t2 - t1;printf("time diff: %llu\n", diff);printf("dequeue total: %d, sum: %d\n", i, sum);return NULL;
}int main(int argc, char *argv[])
{int ret = 0;pthread_t pid1, pid2, pid3, pid4, pid5, pid6;pthread_attr_t pthread_attr;// 使用栈变量而不是指针int producer_count = 1000;int consumer_count = 5000;r = rte_ring_create("test", RING_SIZE, 0);if (r == NULL) {return -1;}printf("start enqueue, 5 producer threads, echo thread enqueue 1000 numbers.\n");//  pthread_attr_init 就是 “给线程属性变量刷一遍出厂设置”,不用它,pthread_create 直接罢工。pthread_attr_init(&pthread_attr);if ((ret = pthread_create(&pid1, &pthread_attr, enqueue_fun, &producer_count)) == 0) {//pthread_detach(pid1);}if ((ret = pthread_create(&pid2, &pthread_attr, enqueue_fun, &producer_count)) == 0) {//pthread_detach(pid2);}if ((ret = pthread_create(&pid3, &pthread_attr, enqueue_fun, &producer_count)) == 0) {//pthread_detach(pid3);}if ((ret = pthread_create(&pid4, &pthread_attr, enqueue_fun, &producer_count)) == 0) {//pthread_detach(pid4);}if ((ret = pthread_create(&pid5, &pthread_attr, enqueue_fun, &producer_count)) == 0) {//pthread_detach(pid5);}printf("start dequeue, 1 consumer thread.\n");if ((ret = pthread_create(&pid6, &pthread_attr, dequeue_func, &consumer_count)) == 0) {//pthread_detach(pid6);}pthread_join(pid1, NULL);pthread_join(pid2, NULL);pthread_join(pid3, NULL);pthread_join(pid4, NULL);pthread_join(pid5, NULL);pthread_join(pid6, NULL);rte_ring_free(r);return 0;
}

编译并执行,情况良好

qiming@qiming:~/share/CTASK/BASIC-COMPONENT/3.2.2-free_queue/code/rte_ring$ gcc -o test rte_ring_main.c rte_ring.c -lpthread -g
qiming@qiming:~/share/CTASK/BASIC-COMPONENT/3.2.2-free_queue/code/rte_ring$ ./test 
start enqueue, 5 producer threads, echo thread enqueue 1000 numbers.
start dequeue, 1 consumer thread.
time diff: 1914869
dequeue total: 5000, sum: 2497500
qiming@qiming:~/share/CTASK/BASIC-COMPONENT/3.2.2-free_queue/code/rte_ring$ 
http://www.dtcms.com/a/385574.html

相关文章:

  • ingress-nginx-controller 414 Request—URI Too Large
  • Java 定时任务与分布式调度工具分析
  • 【热点】最优传输(Optimal Transport)及matlab案例
  • 用 Kotlin 玩转 Protocol Buffers(proto3)
  • leecode73 矩阵置零
  • SELECT INTO 和 INSERT INTO SELECT 区别
  • dhtmlx-gantt
  • Spring如何巧妙解决循环依赖问题
  • 第四章:职业初印象:打造你的个人品牌(1)
  • (九)Python高级应用-文件与IO操作
  • FFmpeg06:SDL渲染
  • javadoc命令 错误: 编码 GBK 的不可映射字符 (0x80)
  • 【面试场景题】自增主键、UUID、雪花算法都有什么问题
  • 数据整理器(Data Collators)总结 (95)
  • 代码评价:std::shared_ptr用法分析
  • 23种设计模式案例
  • AI Agent案例与实践全解析:字节智能运维
  • MyBatis-Plus分页插件实现导致total为0问题
  • S32DS仿真环境问题
  • 黑马JavaWeb+AI笔记 Day07 Web后端实战(部门管理模块)
  • 【AI开发】【前后端全栈】[特殊字符] AI 时代的快速开发思维
  • kimi-k2论文阅读笔记
  • [SC]一个使用前向声明的SystemC项目例子
  • Gunicorn 部署与调优全指南(2025 版)
  • 第二十一篇|新宿平和日本语学校的结构化解读:费用函数、文化网络与AI教育建模
  • 数据结构(C语言篇):(十五)二叉树OJ题
  • RIFE.py代码学习 自学
  • Gateway-路由-规则配置
  • 低端影视官网入口 - 免费看影视资源网站|网页版|电脑版地址
  • 【Python3教程】Python3高级篇之日期与时间