c++无锁队列moodycamel::ConcurrentQueue测试结果
1moodycamel::ConcurrentQueue测试结果
在c++开发中,无锁队列moodycamel::ConcurrentQueue使用较多,示意图如下:
从宏观来看moodycamel::ConcurrentQueue是MPMC队列,即多生产者,多消费者队列;从微观来看,是SPMC队列,即单生产者,多消费者队列。在ConcurrentQueue中,没个生产者都有自己对应的队列。这也引入了ConcurrentQueue的一个特点:
ConcurrentQueue只能保证单个生产者的FIFO特性,即对于一个生产者来说,元素出队的顺序和入队的顺序是一样的。如果考虑多个生产者,那么出队顺序和入队顺序可能是不一样的。这个特点也限制了ConcurrentQueue的应用场景。
moodycamel::ConcurrentQueue提供了两种无锁队列:ConcurrentQueue和BlockingConcurrentQueue,两者的区别是:后者的消费者和生产者事件有等待唤醒机制,前者没有。使用BlockingConcurrentQueue时 ,如果队列为空的时候,没有元素可以消费,那么消费者线程会等待,不会占用cpu,生产者生产元素的时候,会唤醒消费者;使用ConcurrentQueue,当队列为空的时候,消费者线程会一直轮询,不会让出cpu。
在实时linux系统中测试了多线程情况下,入队的最大延时,结果如下:
BlockingConcurrentQueue | 70μs |
ConcurrentQueue | 55μs |
ConcurrentQueue,固定大小 | 35μs |
BlockingConcurrentQueue:默认情况下,可以动态申请内存。生产者入队时,还需要通过信号量唤醒消费者。
ConcurrentQueue:入队时,不需要通过信号量唤醒消费者。
默认情况下,队列支持动态扩展,动态扩展时会动态申请内存;也可以不支持静态扩展,在构造时,指定队列的大小。动态扩展比固定大小的最大延时大20μs。
2无锁队列实现
在实际开发中,使用较多的是有锁队列,有锁队列使用简单;当业务对实时性有要求较高的时候,才会考虑使用无锁队列。无锁队列并不是真正的无锁,在操作队列的过程中也需要同步机制,只不过同步的粒度变小了。在linux实时系统中进行测试,同样的用户代码,使用有锁队列的最大延时比无锁队列大一个数量级。
有锁队列的操作步骤如下,使用简单。
加锁
操作队列
无锁队列的操作步骤,是典型的三段式,分三个步骤,无锁队列减小的同步的粒度,如下3给 步骤中,只有第一步和第3部的时候,需要使用原子操作进行同步。
占位置
入队,或者出队
更新位置
2.1moodycamel::ConcurrentQueue实现
入队:
moodycamel::ConcurrentQueue本质上是一个SPMC类型的队列,每个生产者都有自己的对应的队列。
template<AllocationMode allocMode, typename U>
inline bool enqueue(U&& element)
{
//第一步:占位置
index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
index_t newTailIndex = 1 + currentTailIndex;
...
//第二步:Enqueue
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
//第三步:更新索引
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}
出队:
出队是多线程,要考虑多线程之间的同步。在moodycamel::ConcurrentQueue的实现中,多线程的同步,并没有使用cas类似的操作,而是使用了多个原子变量,这样减少了线程间的同步,可以提高性能。
出队实现比较复杂,可以参考坐着的博客,出队时要增加了两个计数dequeueOptimisticCount和dequeueOvercommit。
Detailed Design of a Lock-Free Queue
2.2dpdk无锁队列实现
dpdk中无锁队列的实现是典型的三段式。
以下是入队代码,出队代码和入队代码是类似的。
static __rte_always_inline unsigned int
__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
unsigned int esize, unsigned int n,
enum rte_ring_queue_behavior behavior, unsigned int is_sp,
unsigned int *free_space)
{
uint32_t prod_head, prod_next;
uint32_t free_entries;
n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
&prod_head, &prod_next, &free_entries);
if (n == 0)
goto end;
__rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
__rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
if (free_space != NULL)
*free_space = free_entries - n;
return n;
}
无锁队列的生产者索引和消费者索引,均有两个索引来维护。如下是生产者索引的变化过程:
初始状态:生产者索引的head和tail指向同一个位置。
第一步:占位置,占位置的时候,只更新head,不更新tail。因为更新tail的时候,说明䛾已经入队成功,元素是可以消费的,这个时候不更新tail,是因为元素还没有入队。
第二步:元素入队
第三步:更新tail
第一步:占位置,更新head
最终调用到如下函数,通过函数rte_atomic32_cmpset来进行多线程间的同步。
static __rte_always_inline unsigned int
__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
unsigned int is_st, unsigned int n,
enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
unsigned int max = n;
int success;
do {
/* Reset n to the initial burst count */
n = max;
*old_head = d->head;
/* add rmb barrier to avoid load/load reorder in weak
* memory model. It is noop on x86
*/
rte_smp_rmb();
/*
* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
*entries = (capacity + s->tail - *old_head);
/* check that we have enough room in ring */
if (unlikely(n > *entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ?
0 : *entries;
if (n == 0)
return 0;
*new_head = *old_head + n;
if (is_st) {
d->head = *new_head;
success = 1;
} else
success = rte_atomic32_cmpset(
(uint32_t *)(uintptr_t)&d->head,
*old_head, *new_head);
} while (unlikely(success == 0));
return n;
}
第二步:元素入队,在这一步不需要原子操作。
第三步:更新tail
static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
{
if (enqueue)
rte_smp_wmb();
else
rte_smp_rmb();
/*
* If there are other enqueues/dequeues in progress that preceded us,
* we need to wait for them to complete
*/
if (!single)
rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
rte_memory_order_relaxed);
ht->tail = new_val;
}
2.3SPSC和MPMC无锁队列实现的差异
单生产者单消费者,在一开始占位置的时候,更新head的时候不需要通过cas进行线程间同步,直接更新即可;在最后更新tail的时候,也不需要cas进行线程间同步,直接更新tail即可。