无锁队列--知识分享
目录
无锁队列
无锁队列是什么
为什么需要无锁队列
队列的类型
无锁队列的分类
ringbuffer(SPSC)
ret_ring(MPMC)
无锁队列
无锁队列是什么
- 无锁队列通过原子操作来实现线程安全的队列,属于非阻塞队列
- 有锁队列通过互斥锁或其他同步机制保证线程安全的队列,属于阻塞队列
为什么需要无锁队列
锁的局限:
- 线程阻塞带来的切换
- 死锁风险
- 性能瓶颈,高并发下锁竞争激烈,吞吐量下降
队列的类型
无锁队列
- 无锁(lock-free):保证了至少有一个线程在正常运行,其他可能重试,依赖CAS 等原子操作。
- 无等待(wait-free):所有线程必成功,无重试,依赖exchange等原子操作。
有锁队列
- 阻塞队列(blocking-queue):当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新的元素加入;当队列已满时,向队列中添加元素的操作会被阻塞,直到队列中有元素被移除,依赖锁实现。
无锁队列的分类
SPSC(单生产者,单消费者)
含义:单生产者单消费者队列,即队列只有一个生产者线程和一个消费者线程。
特点:不存在多个生产者或消费者之间的竞争,所以可以使用比较简单高效的算法来实现。
MPSC(多生产者,单消费者)
含义:多生产者单消费者队列,多个线程可以同时向队列中添加元素,但只有一个线程可以从队列中取出元素。
特点:实现时需要重点处理多个生产者之间的并发问题,确保多个生产者能够安全地向队列中添加元素。而对于消费者线程,由于是唯一的,处理相对简单。
SPMC(单生产者,多消费者)
含义:单生产者多消费者队列,只有一个线程可以向队列中添加元素,但是有多个线程可以同时从队列中取出元素。
特点:需要处理多个消费者之间的并发问题,保证多个消费者能够安全地从队列中取出元素。同时,要确保生产者在添加元素时不会受到消费者的干扰。
MPMC(多生产者,多消费者)
含义:多生产者多消费者队列,意味着有多个线程可以同时向队列中添加元素(生产者),也有多个线程可以同时从队列中取出元素(消费者)。
特点:由于多个生产者和消费者可能会同时访问队列,因此需要更复杂的并发控制机制来保证线程安全。通常会使用 CAS 等原子操作来处理多个线程对队列的并发访问,避免数据竞争和不一致的问题。
ringbuffer(SPSC)
#pragma once// SPSC
#include <atomic>
#include <cstddef>
#include <type_traits>template<typename T, std::size_t Capacity>
class RingBuffer
{
public:static_assert(Capacity && !(Capacity & (Capacity - 1)), "Capacity must be power of 2");RingBuffer() : read_(0), write_(0) {}~RingBuffer() {std::size_t r = read_.load(std::memory_order_relaxed);std::size_t w = write_.load(std::memory_order_relaxed);while (r != w) {reinterpret_cast<T *>(&buffer_[r])->~T();r = (r + 1) & (Capacity - 1);}}// 这里使用万能引用和完美转发,支持左值和右值template<typename U>bool Push(U && value) {const std::size_t w = write_.load(std::memory_order_relaxed);const std::size_t next_w = (w + 1) & (Capacity - 1);// 检查缓冲区是否满if (next_w == read_.load(std::memory_order_acquire)) {return false;}new (&buffer_[w]) T(std::forward<U>(value));write_.store(next_w, std::memory_order_release);return true;}bool Pop(T & value) {const std::size_t r = read_.load(std::memory_order_relaxed);// 检查缓冲区是否空if (r == write_.load(std::memory_order_acquire)) {return false;}// 取出元素并析构value = std::move(*reinterpret_cast<T *>(&buffer_[r]));reinterpret_cast<T *>(&buffer_[r])->~T();read_.store((r + 1) & (Capacity - 1), std::memory_order_release);return true;}std::size_t Size() const {const std::size_t r = read_.load(std::memory_order_acquire);const std::size_t w = write_.load(std::memory_order_acquire);return (w >= r) ? (w - r) : (Capacity - r + w);}private:
//cache line 64Balignas(64) std::atomic<std::size_t> read_;alignas(64) std::atomic<std::size_t> write_;alignas(64) std::aligned_storage_t<sizeof(T), alignof(T)> buffer_[Capacity]; // 支持 pod 和 非 pod 类型
};
ret_ring(MPMC)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>#include "rte_ring.h"#define RING_SIZE 16<<20typedef struct cc_queue_node {int data;
} cc_queue_node_t;static struct rte_ring *r;typedef unsigned long long ticks;static __inline__ ticks getticks(void)
{u_int32_t a, d;asm volatile("rdtsc" : "=a" (a), "=d" (d));return (((ticks)a) | (((ticks)d) << 32));
}void *enqueue_fun(void *data)
{int n = (int)data;int i = 0;int ret;cc_queue_node_t *p;for (; i < n; i++) {p = (cc_queue_node_t *)malloc(sizeof(cc_queue_node_t));p->data = i;ret = rte_ring_mp_enqueue(r, p);if (ret != 0) {printf("enqueue failed: %d\n", i);}}return NULL;
}void *dequeue_func(void *data)
{int ret;int i = 0;int sum = 0;int n = (int)data;cc_queue_node_t *p;ticks t1, t2, diff;//return;t1 = getticks();while (1) {p = NULL;ret = rte_ring_sc_dequeue(r, (void **)&p);if (ret != 0) {//do something}if (p != NULL) {i++;sum += p->data;free(p);if (i == n) {break;}}}t2 = getticks();diff = t2 - t1;printf("time diff: %llu\n", diff);printf("dequeue total: %d, sum: %d\n", i, sum);return NULL;
}int main(int argc, char *argv[])
{int ret = 0;pthread_t pid1, pid2, pid3, pid4, pid5, pid6;pthread_attr_t pthread_attr;r = rte_ring_create("test", RING_SIZE, 0);if (r == NULL) {return -1;}printf("start enqueue, 5 producer threads, echo thread enqueue 1000 numbers.\n");pthread_attr_init(&pthread_attr);if ((ret = pthread_create(&pid1, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid1);}if ((ret = pthread_create(&pid2, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid2);}if ((ret = pthread_create(&pid3, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid3);}if ((ret = pthread_create(&pid4, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid4);}if ((ret = pthread_create(&pid5, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid5);}printf("start dequeue, 1 consumer thread.\n");if ((ret = pthread_create(&pid6, &pthread_attr, dequeue_func, (void *)5000)) == 0) {//pthread_detach(pid6);}pthread_join(pid6, NULL);rte_ring_free(r);return 0;
}