当前位置: 首页 > news >正文

无锁队列简介与实现示例

1. 简介

无锁队列是一种数据结构,旨在在多线程环境中实现高效的并发访问,而无需使用传统的锁机制(如互斥锁)。无锁队列通过使用原子操作(如CAS,Compare-And-Swap)来确保线程安全,从而避免了锁带来的开销和潜在的死锁问题。

1.1 无锁队列的特点

  1. 高并发性:无锁队列允许多个线程同时进行入队和出队操作,而不会相互阻塞,从而提高了系统的并发性能。
  2. 避免死锁:由于不使用锁机制,无锁队列天然避免了死锁问题。
  3. 低延迟:无锁队列的操作通常比使用锁的队列操作更快,因为它们避免了上下文切换和锁竞争。

1.2 实现原理

无锁队列通常基于以下原理实现:

  1. 原子操作:使用原子操作(如CAS)来确保对共享数据的修改是线程安全的。
  2. 链表结构:无锁队列通常使用链表结构,其中每个节点包含一个值和一个指向下一个节点的指针。
  3. 头尾指针:队列维护两个原子指针,分别指向队列的头部和尾部,用于支持并发的入队和出队操作。

1.3 常见应用

无锁队列广泛应用于需要高并发和低延迟的场景,如:

  • 多线程任务调度
  • 并发数据处理
  • 网络服务器请求队列

2. Python 无锁队列实现

import threading
import queue
import time

class LockFreeQueue:
    def __init__(self):
        self.queue = queue.Queue()

    def enqueue(self, value):
        self.queue.put(value)
        print(f"Enqueued: {value}")

    def dequeue(self):
        if not self.queue.empty():
            value = self.queue.get()
            print(f"Dequeued: {value}")
            return value
        return None

def producer(queue, values):
    for value in values:
        queue.enqueue(value)
        time.sleep(0.1)

def consumer(queue, count):
    for _ in range(count):
        queue.dequeue()
        time.sleep(0.1)

if __name__ == "__main__":
    queue = LockFreeQueue()
    values_to_enqueue = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    producer_thread = threading.Thread(target=producer, args=(queue, values_to_enqueue))
    consumer_thread = threading.Thread(target=consumer, args=(queue, len(values_to_enqueue)))

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

代码解析

  1. LockFreeQueue类

    • 使用queue.Queue类来实现线程安全的队列。
    • enqueue方法:将元素添加到队列中,并打印出添加的元素。
    • dequeue方法:从队列中取出元素,并打印出取出的元素。
  2. 生产者和消费者函数

    • producer函数:模拟生产者线程,向队列中添加元素。
    • consumer函数:模拟消费者线程,从队列中取出元素。
  3. 主程序

    • 创建并启动生产者和消费者线程,并等待它们完成。

2. C++ 无锁队列实现

#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

template<typename T>
class LockFreeQueue {
private:
    struct Node {
        T value;
        std::atomic<Node*> next;
        Node(T val) : value(val), next(nullptr) {}
    };

    std::atomic<Node*> head;
    std::atomic<Node*> tail;

public:
    LockFreeQueue() {
        Node* dummy = new Node(T());
        head.store(dummy);
        tail.store(dummy);
    }

    ~LockFreeQueue() {
        while (Node* node = head.load()) {
            head.store(node->next.load());
            delete node;
        }
    }

    void enqueue(T value) {
        Node* new_node = new Node(value);
        Node* old_tail = nullptr;

        while (true) {
            old_tail = tail.load();
            Node* next = old_tail->next.load();

            if (old_tail == tail.load()) {
                if (next == nullptr) {
                    if (old_tail->next.compare_exchange_weak(next, new_node)) {
                        break;
                    }
                } else {
                    tail.compare_exchange_weak(old_tail, next);
                }
            }
        }
        tail.compare_exchange_weak(old_tail, new_node);
    }

    bool dequeue(T& result) {
        Node* old_head = nullptr;

        while (true) {
            old_head = head.load();
            Node* old_tail = tail.load();
            Node* next = old_head->next.load();

            if (old_head == head.load()) {
                if (old_head == old_tail) {
                    if (next == nullptr) {
                        return false;
                    }
                    tail.compare_exchange_weak(old_tail, next);
                } else {
                    result = next->value;
                    if (head.compare_exchange_weak(old_head, next)) {
                        break;
                    }
                }
            }
        }
        delete old_head;
        return true;
    }
};

void producer(LockFreeQueue<int>& queue, const std::vector<int>& values) {
    for (int value : values) {
        queue.enqueue(value);
        std::cout << "Enqueued: " << value << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void consumer(LockFreeQueue<int>& queue, int count) {
    for (int i = 0; i < count; ++i) {
        int value;
        if (queue.dequeue(value)) {
            std::cout << "Dequeued: " << value << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main() {
    LockFreeQueue<int> queue;
    std::vector<int> values_to_enqueue = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

    std::thread producer_thread(producer, std::ref(queue), std::ref(values_to_enqueue));
    std::thread consumer_thread(consumer, std::ref(queue), values_to_enqueue.size());

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

编译和运行

使用g++编译并运行代码:

g++ -std=c++11 -o lock_free_queue lock_free_queue.cpp -pthread
./lock_free_queue

代码解析

  1. Node结构体

    • Node结构体表示队列中的节点,每个节点包含一个值和一个原子指针next
  2. LockFreeQueue类

    • headtail是原子指针,分别指向队列的头部和尾部。
    • enqueue方法:将新节点添加到队列的尾部,使用CAS操作保证线程安全。
    • dequeue方法:从队列的头部移除节点,使用CAS操作保证线程安全。
  3. 生产者和消费者函数

    • producer函数:模拟生产者线程,向队列中添加元素。
    • consumer函数:模拟消费者线程,从队列中取出元素。
  4. 主程序

    • 创建并启动生产者和消费者线程,并等待它们完成。

这个示例展示了如何使用C++11中的原子操作实现一个无锁队列,并使用多线程进行测试。

相关文章:

  • # 基于人脸关键点的多表情实时检测系统
  • 4月7号.
  • 【开源宝藏】30天学会CSS - DAY12 第十二课 从左向右填充的文字标题动画
  • spring-cloud-alibaba-nacos-discovery使用说明
  • 超大规模数据场景(思路)——面试高频算法题目
  • 进程和线程的区别和联系
  • 【Java面试系列】Spring Boot应用中的事务传播机制与分布式事务实践详解 - 3-5年Java开发必备知识
  • 【软件】在 macOS 上安装和配置 Apache HTTP 服务器
  • React-narice安卓打包流程
  • ifconfig 使用详解
  • animals_classification动物分类
  • 子类是否能继承
  • 解决windows下删除文件提示该项目不存在
  • 设计模式简述(七)原型模式
  • Qt音频采集:QAudioInput详解与示例
  • Android打包及上架应用市场问题处理
  • vue-axios跨域问题
  • 利用CNN卷积神经网络进行声呐图像分类
  • STM32常用外设寄存器介绍
  • golang并发原语groutine channel select sync包
  • 房管局网站做房查/app广告投放价格表
  • 营销型网站建设eyouc/深圳seo优化外包公司
  • 做非经营网站需要营业执照/信息流广告模板
  • 可以网站可以做免费的文案广告/如何接广告赚钱
  • 在百度搜索到自己的网站/如何推广网上国网
  • 怎么写网站建设推广/杭州seo排名优化