当前位置: 首页 > news >正文

无锁队列--知识分享

目录

无锁队列

        无锁队列是什么

         为什么需要无锁队列

        队列的类型 

         无锁队列的分类        

ringbuffer(SPSC)

ret_ring(MPMC)


无锁队列

        无锁队列是什么
  •         无锁队列通过原子操作来实现线程安全的队列,属于非阻塞队列
  •         有锁队列通过互斥锁或其他同步机制保证线程安全的队列,属于阻塞队列 
         为什么需要无锁队列

        锁的局限:

  •         线程阻塞带来的切换
  •         死锁风险
  •         性能瓶颈,高并发下锁竞争激烈,吞吐量下降

        队列的类型 

        无锁队列

  •         无锁(lock-free):保证了至少有一个线程在正常运行,其他可能重试,依赖CAS 等原子操作。
  •         无等待(wait-free):所有线程必成功,无重试,依赖exchange等原子操作。

         有锁队列

  •          阻塞队列(blocking-queue):当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新的元素加入;当队列已满时,向队列中添加元素的操作会被阻塞,直到队列中有元素被移除,依赖锁实现。
         无锁队列的分类        

        SPSC(单生产者,单消费者)

        含义:单生产者单消费者队列,即队列只有一个生产者线程和一个消费者线程。

        特点:不存在多个生产者或消费者之间的竞争,所以可以使用比较简单高效的算法来实现。

        

        MPSC(多生产者,单消费者)

        含义:多生产者单消费者队列,多个线程可以同时向队列中添加元素,但只有一个线程可以从队列中取出元素。

        特点:实现时需要重点处理多个生产者之间的并发问题,确保多个生产者能够安全地向队列中添加元素。而对于消费者线程,由于是唯一的,处理相对简单。

        SPMC(单生产者,多消费者)

        含义:单生产者多消费者队列,只有一个线程可以向队列中添加元素,但是有多个线程可以同时从队列中取出元素。

        特点:需要处理多个消费者之间的并发问题,保证多个消费者能够安全地从队列中取出元素。同时,要确保生产者在添加元素时不会受到消费者的干扰。

        

        MPMC(多生产者,多消费者)

        含义:多生产者多消费者队列,意味着有多个线程可以同时向队列中添加元素(生产者),也有多个线程可以同时从队列中取出元素(消费者)。

        特点:由于多个生产者和消费者可能会同时访问队列,因此需要更复杂的并发控制机制来保证线程安全。通常会使用 CAS 等原子操作来处理多个线程对队列的并发访问,避免数据竞争和不一致的问题。

ringbuffer(SPSC)

#pragma once// SPSC
#include <atomic>
#include <cstddef>
#include <type_traits>template<typename T, std::size_t Capacity>
class RingBuffer
{
public:static_assert(Capacity && !(Capacity & (Capacity - 1)), "Capacity must be power of 2");RingBuffer() : read_(0), write_(0) {}~RingBuffer() {std::size_t r = read_.load(std::memory_order_relaxed);std::size_t w = write_.load(std::memory_order_relaxed);while (r != w) {reinterpret_cast<T *>(&buffer_[r])->~T();r = (r + 1) & (Capacity - 1);}}// 这里使用万能引用和完美转发,支持左值和右值template<typename U>bool Push(U && value) {const std::size_t w = write_.load(std::memory_order_relaxed);const std::size_t next_w = (w + 1) & (Capacity - 1);// 检查缓冲区是否满if (next_w == read_.load(std::memory_order_acquire)) {return false;}new (&buffer_[w]) T(std::forward<U>(value));write_.store(next_w, std::memory_order_release);return true;}bool Pop(T & value) {const std::size_t r = read_.load(std::memory_order_relaxed);// 检查缓冲区是否空if (r == write_.load(std::memory_order_acquire)) {return false;}// 取出元素并析构value = std::move(*reinterpret_cast<T *>(&buffer_[r]));reinterpret_cast<T *>(&buffer_[r])->~T();read_.store((r + 1) & (Capacity - 1), std::memory_order_release);return true;}std::size_t Size() const {const std::size_t r = read_.load(std::memory_order_acquire);const std::size_t w = write_.load(std::memory_order_acquire);return (w >= r) ? (w - r) : (Capacity - r + w);}private:
//cache line  64Balignas(64) std::atomic<std::size_t> read_;alignas(64) std::atomic<std::size_t> write_;alignas(64) std::aligned_storage_t<sizeof(T), alignof(T)> buffer_[Capacity];  // 支持 pod 和 非 pod 类型
};
ret_ring(MPMC)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>#include "rte_ring.h"#define RING_SIZE 16<<20typedef struct cc_queue_node {int data;
} cc_queue_node_t;static struct rte_ring *r;typedef unsigned long long ticks;static __inline__ ticks getticks(void)
{u_int32_t a, d;asm volatile("rdtsc" : "=a" (a), "=d" (d));return (((ticks)a) | (((ticks)d) << 32));
}void *enqueue_fun(void *data)
{int n = (int)data;int i = 0;int ret;cc_queue_node_t *p;for (; i < n; i++) {p = (cc_queue_node_t *)malloc(sizeof(cc_queue_node_t));p->data = i;ret = rte_ring_mp_enqueue(r, p);if (ret != 0) {printf("enqueue failed: %d\n", i);}}return NULL;
}void *dequeue_func(void *data)
{int ret;int i = 0;int sum = 0;int n = (int)data;cc_queue_node_t *p;ticks t1, t2, diff;//return;t1 = getticks();while (1) {p = NULL;ret = rte_ring_sc_dequeue(r, (void **)&p);if (ret != 0) {//do something}if (p != NULL) {i++;sum += p->data;free(p);if (i == n) {break;}}}t2 = getticks();diff = t2 - t1;printf("time diff: %llu\n", diff);printf("dequeue total: %d, sum: %d\n", i, sum);return NULL;
}int main(int argc, char *argv[])
{int ret = 0;pthread_t pid1, pid2, pid3, pid4, pid5, pid6;pthread_attr_t pthread_attr;r = rte_ring_create("test", RING_SIZE, 0);if (r == NULL) {return -1;}printf("start enqueue, 5 producer threads, echo thread enqueue 1000 numbers.\n");pthread_attr_init(&pthread_attr);if ((ret = pthread_create(&pid1, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid1);}if ((ret = pthread_create(&pid2, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid2);}if ((ret = pthread_create(&pid3, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid3);}if ((ret = pthread_create(&pid4, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid4);}if ((ret = pthread_create(&pid5, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {pthread_detach(pid5);}printf("start dequeue, 1 consumer thread.\n");if ((ret = pthread_create(&pid6, &pthread_attr, dequeue_func, (void *)5000)) == 0) {//pthread_detach(pid6);}pthread_join(pid6, NULL);rte_ring_free(r);return 0;
}

相关文章:

  • conda常用命令简解
  • postgres 数据库信息解读 与 sqlshell常用指令介绍
  • 基于STM32+FPGA的地震数据采集器软件设计,支持RK3568+FPGA平台
  • 在PyTorch中,使用不同模型的参数进行模型预热
  • C语言 —— 指尖跃迁 刻印永恒 - 文件操作
  • 序列化 反序列化实例
  • 【软件工程大系】净室软件工程
  • 整活 kotlin + springboot3 + sqlite 配置一个 SQLiteCache
  • 【Spring】DI(依赖注入)详解:属性注入@Autowired(超详细)、构造方法注入、Setter注入
  • 《JVM考古现场(二十一):奇点黎明·在事件视界编译时空曲率》
  • 智能语音识别+1.2用SAPI实现文本转语音(100%教会)
  • 科技项目验收测试报告有哪些作用?需要多长时间和费用?
  • Shell编程之正则表达式与文本处理器
  • AI 对话高效输入指令攻略(一):了解AI对话指令
  • 解决靶机分配的 IP 地址与 Kali 机器静态 IP 地址冲突的方法
  • Langchain Agent封装的工具
  • Unity导出微信小游戏后无法调起移动端输入框
  • window 凭据管理器密码破解
  • 信息科技伦理与道德0:课程安排
  • 如何实现“一机两用” 寻求安全与效率的完美平衡
  • ps快速做网站/seo关键词布局技巧
  • 网站建设发展状况/seo整体优化步骤怎么写
  • 高明网站建设公司/怎么做公众号
  • 动态网站开发第一步/鼓楼网站seo搜索引擎优化
  • 服装网站案例/win7优化工具
  • 网站建设中啥意思/广州网络广告推广公司