江西南昌疫情防控最新政策重庆seo报价
1 简介
自旋锁是一种典型的无锁算法,在任何时刻,它都最多只能有一个保持者。当有一个线程试图获得一个已经被占用的锁时,该线程就会一直进行循环等待,直到该锁被释放。自旋锁的优点是不会使线程状态发生切换,一直处于用户态,即线程一直都是active的;缺点是如果一直获取不到锁,那线程就一直处于循环等待中,白白浪费CPU资源。自旋锁在某些场景下非常有效,尤其是临界区较小且持有时间短的情况下。然而,在高竞争或临界区较长的情况下,使用自旋锁可能导致性能下降,因此需谨慎选择。
2 不同自旋锁的实现
自旋锁的实现有多种方式,包括:
- 比较并交换(CAS):CAS是一种原子操作,它可以比较一个内存位置的值与一个期望值是否相等,如果相等则将该位置的值更新为一个新值。
- 测试并设置(TAS):TAS是一种原子操作,它可以测试一个内存位置的值是否为0,如果为0则将该位置的值设置为1。
- 互斥量:互斥量是一种同步原语,它可以确保同一时刻只有一个线程可以访问共享资源。
- 信号量:信号量是一种同步原语,它可以确保同一时刻只有一定数量的线程可以访问共享资源。
- 屏障:屏障是一种同步原语,它可以确保同一时刻只有一定数量的线程可以访问共享资源。
2.1 TASLock
TASLock的实现比较简单,只需要一个bool类型的变量flag来表示锁的状态,当flag为false时表示锁可用,当flag为true时表示锁不可用。当一个线程试图获取锁时,它会不断地检查flag的值,如果flag为false,则将flag设置为true,表示锁被占用,否则自旋等待。当一个线程释放锁时,它会将flag设置为false,表示锁可用。TASLock的实现如下所示:
class TASLock {
public:TASLock() : flag(false) {}void lock() {// 自旋等待直到能够获取锁while (flag.exchange(true, std::memory_order_acquire)) {// 自旋,直到锁被释放}}void unlock() {// 释放锁flag.store(false, std::memory_order_release);}private:std::atomic<bool> flag; // 锁的标志
};
TAS实现简单,但是由于它是不断尝试TEST_AND_SET,当多个线程在不同的 CPU 核心上竞争同一个 TAS 锁时,所有线程都会频繁地读取和写入锁的状态。这会导致缓存行的频繁失效,因为每次一个线程更新锁的状态时,其他线程的缓存中相应的缓存行会失效,从而引发更高的内存访问延迟。这不仅增加了内存带宽的消耗,还可能导致内存访问争用,进而影响到系统的整体性能。在高竞争环境下,线程自旋等待锁的状态,频繁地访问同一内存地址,可能导致内存访问延迟显著增加。每次缓存失效都需要访问主内存,从而增加了获取锁的时间。
2.2 TTASLock
TTASLock的实现与TASLock类似,只是在获取锁时先测试锁的状态,如果锁已经被占用,则自旋等待,否则尝试获取锁。TTASLock的实现如下所示:
class TTASLock {
public:TTASLock() : flag(false) {}void lock() {// 自旋等待,直到锁未被占用while (true) {// 先测试锁的状态while (flag.load(std::memory_order_acquire)) {// 自旋等待,直到锁被释放}// 尝试获取锁if (!flag.exchange(true, std::memory_order_acquire)) {// 成功获取锁break;}}}void unlock() {// 释放锁flag.store(false, std::memory_order_release);}private:std::atomic<bool> flag; // 锁的标志
};
TTAS 首先测试锁的状态,而不是立即尝试获取锁。只有在确认锁未被占用时,才会进行写操作(即设置 flag),这减少了对共享内存的写入操作,降低了总线争用的频率。由于 TTAS 在尝试获取锁之前先检查锁的状态,只有在锁未被占用时才会进行 CAS 操作,这可以减少其他线程在尝试获取锁时导致的缓存失效,从而提高性能。在多线程竞争不激烈的情况下,TTAS 锁可以更高效地降低自旋带来的 CPU 资源浪费,尤其是在锁的持有时间短的情况下。尽管 TTAS 减少了总线争用,但它仍然是自旋锁。在线程自旋等待锁时,CPU 资源仍然被消耗,特别是在高竞争情况下,可能导致性能下降。TTAS 的实现依赖于原子操作(如 CAS),在某些硬件架构上,这些操作可能仍然导致较高的延迟,尤其是在较高的竞争情况下。
2.3 指数退避锁
指数退避算法是一种在并发系统中,当多个线程争用同一个锁时,用于减少资源竞争的策略。它通过在每次尝试获取锁失败后,等待一段时间再重试,并且等待的时间长度逐渐增加,从而避免系统频繁尝试而导致过载、资源竞争或者死锁。
- 首次尝试: 线程尝试获取锁。
- 失败后等待: 如果获取锁失败,线程会等待一段时间。
- 指数增加: 等待时间会以指数方式增加,例如,第一次等待1毫秒,第二次等待2毫秒,第三次等待4毫秒,以此类推。
- 重试: 等待时间结束后,线程再次尝试获取锁。
- 达到上限: 为了防止等待时间过长,通常会设置一个最大等待时间。当等待时间达到上限时,就不再增加。
- 加入随机抖动: 为了避免多个请求同时重试导致系统再次过载,可以在指数退避的基础上,加入一些随机性,使得重试的时间并不是固定的,而是随机的。
class ExponentialBackoffTTASLock {
public:
ExponentialBackoffTTASLock() {}void lock() {retries = 0;while (flag.test_and_set(std::memory_order_acquire)) {// spin until the lock is releasedbackoff();retries++;}}void unlock() {flag.clear(std::memory_order_release);}private:void backoff() {const int max_retries = 8;if (retries < max_retries) {std::this_thread::yield();} else {auto delay = std::chrono::microseconds(1 << (retries - max_retries));std::this_thread::sleep_for(delay);}}std::atomic_flag flag = ATOMIC_FLAG_INIT;int retries{0};
};
指数后退的随机抖动可以减少多个线程同时重试的情况,从而减少系统的负载。这有助于避免在高竞争情况下,多个线程同时重试获取锁,从而导致系统过载。但是,指数后退的随机抖动也可能导致某些线程的重试时间过长,从而降低系统的性能。因此,在使用指数后退时,需要根据实际情况进行调整,以达到最佳的性能和资源利用率。
2.4 队列锁
2.4.1 数组队列锁
队列锁是一种基于队列的锁实现,它允许多个线程同时尝试获取锁,但是只有一个线程能够成功获取锁。队列锁的实现如下所示:
class QueueLock {
private:static thread_local int mySlotIndex;std::atomic<int> tail;std::vector<std::atomic<bool>> flag;int size;public:QueueLock(int capacity = 128) : size(capacity), tail(0), flag(capacity) {// 初始化 flag 数组for (int i = 0; i < size; ++i) {flag[i].store(false, std::memory_order_relaxed);}flag[0].store(true, std::memory_order_relaxed); // 初始化第一个标志位为 true}void lock() {int slot = tail.fetch_add(1, std::memory_order_relaxed) % size;mySlotIndex = slot; // 存储当前线程的槽位索引// 自旋等待while (!flag[slot].load(std::memory_order_acquire)) {std::this_thread::yield(); // 让出 CPU 时间片}}void unlock() {int slot = mySlotIndex;flag[slot].store(false, std::memory_order_release); // 释放当前槽位// 允许下一个线程获取锁flag[(slot + 1) % size].store(true, std::memory_order_release);}
};// 初始化 thread_local 变量
thread_local int QueueLock::mySlotIndex = 0;
队列锁通过维护一个请求锁的线程队列,保证了线程按照先来后到的顺序获取锁,解决了传统锁可能导致的线程饥饿问题,确保了公平性。但队列锁需要在初始化时确定最大线程数,限制了其扩展性,并且自旋等待的机制在锁竞争激烈时会浪费 CPU 资源。
2.4.2 CLH队列锁
class CLHQNode {
public:std::atomic<bool> locked;CLHQNode() : locked(false) {}
};class CLHLock {
private:std::atomic<CLHQNode*> tail;static thread_local CLHQNode* myPred;static thread_local CLHQNode* myNode;public:CLHLock() : tail(new CLHQNode()) {// 初始化 thread_local 变量 (C++11 以后可以在类外初始化)}~CLHLock() {// 释放 tail 指向的 CLHQNodedelete tail.load();}void lock() {CLHQNode* node = getMyNode(); // 获取当前线程的 CLHQNodenode->locked.store(true, std::memory_order_relaxed); // 设置为 lockedCLHQNode* pred = tail.exchange(node, std::memory_order_acquire); // 获取前驱节点,并将 tail 指向当前节点myPred = pred; // 保存前驱节点// 自旋等待前驱节点释放锁while (pred->locked.load(std::memory_order_acquire)) {std::this_thread::yield(); // 让出 CPU 时间片}}void unlock() {CLHQNode* node = getMyNode(); // 获取当前线程的 CLHQNodenode->locked.store(false, std::memory_order_release); // 释放锁myNode = myPred; // 将 myNode 指向前驱节点,以便下次使用}private:// 获取当前线程的 CLHQNode,如果不存在则创建CLHQNode* getMyNode() {if (!myNode) {myNode = new CLHQNode();}return myNode;}
};
CLH 队列锁通过链表结构维护等待锁的线程队列,解决了传统锁的公平性问题以及普通队列锁需要预先确定最大线程数的限制,实现了更好的可扩展性。然而,CLH 锁仍然存在自旋等待的问题,这会导致 CPU 资源的浪费,并且每个线程都需要额外的内存来存储队列节点。
2.4.3 MCS锁
MCS锁也是一种基于链表的公平锁,它与 CLH 锁类似,但 MCS 锁的节点直接指向后继节点,而不是前驱节点,这使得释放锁的操作更加简单高效。
#pragma once
#include <atomic>
#include <thread>
#include <iostream>class MCSLock {
private:struct MCSNode {std::atomic<bool> locked;MCSNode* next;MCSNode() : locked(false), next(nullptr) {}};std::atomic<MCSNode*> tail;static thread_local MCSNode* myNode; // 线程局部变量public:MCSLock() : tail(nullptr) {}~MCSLock() {// 需要仔细考虑析构函数中的资源释放,避免内存泄漏// 但是由于多线程环境,简单的 delete tail 可能会导致问题// 所以这里选择不释放,或者使用更复杂的机制来保证安全释放}void lock() {MCSNode* qnode = getMyNode(); // 获取当前线程的 MCSNodeMCSNode* pred = tail.exchange(qnode, std::memory_order_acquire); // 获取前驱节点,并将 tail 指向当前节点if (pred != nullptr) {qnode->locked.store(true, std::memory_order_relaxed); // 设置为 lockedpred->next = qnode; // 将前驱节点的 next 指向当前节点// 等待前驱节点释放锁while (qnode->locked.load(std::memory_order_acquire)) {std::this_thread::yield(); // 让出 CPU 时间片}}}void unlock() {MCSNode* qnode = getMyNode(); // 获取当前线程的 MCSNodeif (qnode->next == nullptr) {// 如果没有后继节点if (tail.compare_exchange_strong(qnode, nullptr, std::memory_order_release, std::memory_order_relaxed)) {// 如果成功将 tail 设置为 null,则直接返回return;}// 等待前驱节点填充 next 字段while (qnode->next == nullptr) {std::this_thread::yield(); // 让出 CPU 时间片}}// 释放后继节点的锁qnode->next->locked.store(false, std::memory_order_release);qnode->next = nullptr; // 避免 double free,虽然这里设置为空,但是qnode->next的内存并没有释放,需要注意内存泄漏的问题}private:// 获取当前线程的 MCSNode,如果不存在则创建MCSNode* getMyNode() {if (!myNode) {myNode = new MCSNode();}return myNode;}
};// 初始化 thread_local 变量
thread_local MCSLock::MCSNode* MCSLock::myNode = nullptr;
MCS 锁和 CLH 锁都是公平队列锁,MCS 锁在释放锁时效率更高,因为直接操作后继节点,但代码稍复杂;而 CLH 锁代码更简单,但释放锁时效率稍逊。选择哪种锁取决于具体应用场景,性能要求高则选 MCS,追求简单则选 CLH。它们都解决了传统锁的公平性问题,并具有良好的可扩展性。MCS更适合多核处理器和NUMA架构,因为它可以更好地利用缓存和内存局部性。
2.4.4 时限队列锁
时限队列锁是一种基于队列的锁实现,它允许线程在一定时间内获取锁,如果超过时限仍未获取到锁,则放弃获取。它的实现如下所示:
class TOLock {
private:struct TONode {TONode* pred;TONode() : pred(nullptr) {}};static TONode* AVAILABLE; // 静态成员变量需要在类外初始化std::atomic<TONode*> tail;static thread_local TONode* myNode; // 线程局部变量public:TOLock() : tail(nullptr) {}~TOLock() {// 同样需要考虑内存管理,这里简单处理,可能存在问题}bool try_lock(long long time, std::chrono::milliseconds unit) {auto startTime = std::chrono::steady_clock::now();auto patience = unit;TONode* qnode = new TONode();myNode = qnode; // 设置当前线程的 TONodeqnode->pred = nullptr;TONode* myPred = tail.exchange(qnode, std::memory_order_acquire); // 获取前驱节点,并将 tail 指向当前节点if (myPred == nullptr || myPred->pred == AVAILABLE) {return true; // 成功获取锁}while (std::chrono::steady_clock::now() - startTime < patience) {TONode* predPred = myPred->pred;if (predPred == AVAILABLE) {return true; // 成功获取锁} else if (predPred != nullptr) {myPred = predPred;if (!tail.compare_exchange_weak(qnode, myPred, std::memory_order_release, std::memory_order_relaxed)) {qnode->pred = myPred;}return false; // 获取锁失败,但仍在等待}std::this_thread::yield(); // 让出 CPU 时间片}// 超时,获取锁失败delete qnode; // 释放申请的 TONode 内存return false;}void unlock() {TONode* qnode = myNode; // 获取当前线程的 TONodeif (!tail.compare_exchange_strong(qnode, nullptr, std::memory_order_release, std::memory_order_relaxed)) {qnode->pred = AVAILABLE;}}
};// 初始化静态成员变量
TOLock::TONode* TOLock::AVAILABLE = new TONode(); // 注意内存管理,需要释放
thread_local TOLock::TONode* TOLock::myNode = nullptr;
时限队列锁是在队列锁的基础上增加了超时机制,允许线程在等待锁时设置一个最大等待时间,超时后放弃等待,避免永久阻塞。相比于普通自旋锁,时限队列锁保证了公平性,避免饥饿;相比于普通队列锁,时限队列锁增加了容错性,防止因死锁等原因造成的无限期等待;但相比于无超时机制的锁,实现更复杂,且超时处理可能引入额外的竞争。
2.5 复合锁
复合锁(Composite Lock)是指将多种锁机制结合在一起,以利用各自的优点,从而在不同场景下实现更好的性能和灵活性。所以实际上的复合锁的实现方案众多,根据具体的场景不同而不同。下面的实现结合了 Test-and-Test-and-Set 自旋锁和指数退避策略,以在锁竞争激烈时减少 CPU 占用并提高性能。
class CompositeLock {
private:static const int SIZE = 8; // 示例值,根据实际情况调整static const int MIN_BACKOFF = 1; // 示例值static const int MAX_BACKOFF = 10; // 示例值enum class State {FREE,WAITING,RELEASED,ABORTED};struct CompositeNode {std::atomic<State> state;CompositeNode* pred;CompositeNode() : state(State::FREE), pred(nullptr) {}};class Backoff {private:const int minDelay;const int maxDelay;int limit;std::random_device rd;std::mt19937 gen;std::uniform_int_distribution<> distrib;public:Backoff(int min, int max) : minDelay(min), maxDelay(max), limit(min), rd(), gen(rd()), distrib(min, max) {}void backoff() {std::this_thread::sleep_for(std::chrono::milliseconds(distrib(gen)));limit = std::min(maxDelay, limit * 2);}};std::atomic<CompositeNode*> tail;CompositeNode waiting[SIZE]{};std::random_device rd;std::mt19937 gen;std::uniform_int_distribution<> distrib;static thread_local CompositeNode* myNode; // 线程局部变量bool timeout(long long patience, std::chrono::steady_clock::time_point startTime) {auto now = std::chrono::steady_clock::now();auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count();return elapsed > patience;}public:CompositeLock() : tail(nullptr), distrib(0, SIZE - 1) {}~CompositeLock() {}void unlock() {CompositeNode* acqNode = myNode;if (acqNode != nullptr) {acqNode->state.store(State::RELEASED, std::memory_order_release);myNode = nullptr;}}bool try_lock(long long time, std::chrono::milliseconds unit) {auto startTime = std::chrono::steady_clock::now();long long patience = unit.count();Backoff backoff(MIN_BACKOFF, MAX_BACKOFF);try {CompositeNode* node = acquireCompositeNode(backoff, startTime, patience);CompositeNode* pred = spliceCompositeNode(node, startTime, patience);waitForPredecessor(pred, node, startTime, patience);return true;} catch (const std::runtime_error& e) { // 使用 std::runtime_error 捕获异常return false;}}private:CompositeNode* acquireCompositeNode(Backoff& backoff, std::chrono::steady_clock::time_point startTime, long long patience) {while (true) {int index = distrib(gen);CompositeNode& node = waiting[index];State expected = State::FREE;if (node.state.compare_exchange_strong(expected, State::WAITING)) {return &node;}CompositeNode* currTail = tail.load();State state = node.state.load();if (state == State::ABORTED || state == State::RELEASED) {if (&node == currTail) {CompositeNode* myPred = node.pred;if (tail.compare_exchange_weak(currTail, myPred)) {node.state.store(State::WAITING);return &node;}}}backoff.backoff();if (timeout(patience, startTime)) {throw std::runtime_error("TimeoutException");}}}CompositeNode* spliceCompositeNode(CompositeNode* node, std::chrono::steady_clock::time_point startTime, long long patience) {CompositeNode* currTail;do {currTail = tail.load();if (timeout(patience, startTime)) {node->state.store(State::FREE);throw std::runtime_error("TimeoutException");}} while (!tail.compare_exchange_weak(currTail, node));return currTail;}void waitForPredecessor(CompositeNode* pred, CompositeNode* node, std::chrono::steady_clock::time_point startTime, long long patience) {if (pred == nullptr) {myNode = node;return;}State predState = pred->state.load();while (predState != State::RELEASED) {if (predState == State::ABORTED) {CompositeNode* temp = pred;pred = pred->pred;temp->state.store(State::FREE);if (timeout(patience, startTime)) {node->pred = pred;node->state.store(State::ABORTED);throw std::runtime_error("TimeoutException");}predState = pred->state.load();pred->state.store(State::FREE);myNode = node;return;}}}
};thread_local CompositeLock::CompositeNode* CompositeLock::myNode = nullptr;
2.6 层次锁
层次锁是一种通过构建多层锁结构,由粗到细逐层尝试获取,并结合退避策略来减少锁竞争,优化NUMA架构下并发性能的锁机制。
2.6.1 层次后退锁
层次后退锁(Hierarchical Backoff Lock)是一种结合了层次锁和退避策略的锁,旨在优化在多处理器系统中的性能,尤其是在 NUMA(Non-Uniform Memory Access)架构下。 它的核心思想是,将锁的获取过程分为多个层次,每个层次对应不同的锁粒度和竞争范围,在每个层次上,如果获取锁失败,则采用退避策略,例如指数退避,以减少锁冲突。
#pragma once// 假设 ThreadID 类存在,并且可以获取线程的集群 ID
// 这部分依赖于具体的 NUMA 架构和操作系统 API
class ThreadID {
public:static int getCluster() {// 实际实现需要调用 NUMA API 或其他方式获取线程的集群 ID// 这里只是一个示例,返回一个随机值static std::random_device rd;static std::mt19937 gen(rd());static std::uniform_int_distribution<> distrib(0, 1); // 假设只有两个集群return distrib(gen);}
};class HierarchicalBackoffLock {
private:static const int LOCAL_MIN_DELAY = 1; // 示例值static const int LOCAL_MAX_DELAY = 10; // 示例值static const int REMOTE_MIN_DELAY = 10; // 示例值static const int REMOTE_MAX_DELAY = 100; // 示例值static const int FREE = 0;std::atomic<int> state;class Backoff {private:const int minDelay;const int maxDelay;int limit;std::random_device rd;std::mt19937 gen;std::uniform_int_distribution<> distrib;public:Backoff(int min, int max) : minDelay(min), maxDelay(max), limit(min), rd(), gen(rd()), distrib(min, max) {}void backoff() {std::this_thread::sleep_for(std::chrono::microseconds(distrib(gen))); // 使用 us 级别延迟limit = std::min(maxDelay, limit * 2);}};public:HierarchicalBackoffLock() : state(FREE) {}void lock() {int myCluster = ThreadID::getCluster();Backoff localBackoff(LOCAL_MIN_DELAY, LOCAL_MAX_DELAY);Backoff remoteBackoff(REMOTE_MIN_DELAY, REMOTE_MAX_DELAY);while (true) {auto free = FREE;if (state.compare_exchange_weak(free, myCluster)) {return; // 成功获取锁}int lockState = state.load();if (lockState == myCluster) {localBackoff.backoff(); // 本地退避} else {remoteBackoff.backoff(); // 远程退避}}}void unlock() {state.store(FREE, std::memory_order_release);}
};
2.6.2 层次 CLH 队列锁
层次 CLH 队列锁的基本思想是在 NUMA (Non-Uniform Memory Access) 架构下,构建一个层次化的 CLH 队列锁结构,使得线程尽可能在本地 NUMA 节点上排队等待,减少跨节点访问,从而提高性能,是一种结合了 CLH 队列锁的公平性和层次锁的 NUMA 感知特性的锁。通过维护多个 CLH 队列,每个队列对应一个 NUMA 节点,线程在本地节点上排队,当本地队列为空时,尝试在全局队列上排队。这种分层设计可以减少跨节点访问,提高性能。
// 假设 HCLHThreadID 类存在,并且可以获取线程的集群 ID
// 这部分依赖于具体的 NUMA 架构和操作系统 API
class HCLHThreadID {
public:static int getCluster() {// 实际实现需要调用 NUMA API 或其他方式获取线程的集群 ID// 这里只是一个示例,返回一个随机值static std::random_device rd;static std::mt19937 gen(rd());static std::uniform_int_distribution<> distrib(0, 1); // 假设只有两个集群return distrib(gen);}
};class HierarchicalCLHLock {
private:static const int MAX_CLUSTERS = 2; // 示例值,根据实际情况调整struct QNode {static const long long TWS_MASK = 0x8000000000;static const long long SMW_MASK = 0x4000000000;static const long long CLUSTER_MASK = 0x3FFFFFFFFF;std::atomic<long long> state; // Use long long to store the stateQNode() : state(0) {}void unlock() {int myCluster = HCLHThreadID::getCluster();long long oldState = 0;long long newState = myCluster;newState |= SMW_MASK;newState &= (~TWS_MASK);do {oldState = state.load();} while (!state.compare_exchange_weak(oldState, newState));}int getClusterID() {return state.load() & CLUSTER_MASK;}// Setters and Getters for TWS and SMW (omitted for brevity)void setTailWhenSpliced(bool value) {long long oldState = state.load();long long newState = oldState;if (value) {newState |= TWS_MASK;}else {newState &= ~TWS_MASK;}while (!state.compare_exchange_weak(oldState, newState));}bool isSuccessorMustwait() {return (state.load() & SMW_MASK) != 0;}void setSuccessorMustWait(bool value) {long long oldState = state.load();long long newState = oldState;if (value) {newState |= SMW_MASK;}else {newState &= ~SMW_MASK;}while (!state.compare_exchange_weak(oldState, newState));}bool waitforGrantorClusterMaster() {std::this_thread::sleep_for(std::chrono::microseconds(10)); // 模拟等待return true; // 简化,始终返回 true}};std::vector<std::atomic<QNode*>> localQueues{ MAX_CLUSTERS };std::atomic<QNode*> globalQueue;static thread_local QNode* currNode;static thread_local QNode* predNode;public:HierarchicalCLHLock() {globalQueue.store(new QNode());for (int i = 0; i < MAX_CLUSTERS; i++) {localQueues[i].store(nullptr); // 初始化为 nullptr}}~HierarchicalCLHLock() {// 需要考虑内存管理,这里省略}void lock() {QNode* myNode = getCurrNode(); // 获取当前线程的 QNodeint clusterID = HCLHThreadID::getCluster();std::atomic<QNode*>& localQueue = localQueues[clusterID];QNode* myPred = nullptr;do {myPred = localQueue.load();} while (!localQueue.compare_exchange_weak(myPred, myNode));if (myPred != nullptr) {bool iOwnLock = myPred->waitforGrantorClusterMaster();if (iOwnLock) {predNode = myPred;return;}}// I am the cluster master: splice local queue into global queueQNode* localTail = nullptr;do {myPred = globalQueue.load();localTail = myNode; // 这里直接使用 myNode,因为 localQueue 已经指向 myNode} while (!globalQueue.compare_exchange_weak(myPred, localTail));// inform successor it is the new masterlocalTail->setTailWhenSpliced(true);while (myPred->isSuccessorMustwait()) {};predNode = myPred;return;}void unlock() {QNode* myNode = currNode;if (myNode != nullptr) {myNode->setSuccessorMustWait(false);QNode* node = predNode;if (node != nullptr) {node->unlock();}currNode = node;}}private:// 获取当前线程的 QNode,如果不存在则创建QNode* getCurrNode() {if (!currNode) {currNode = new QNode();}return currNode;}
};thread_local HierarchicalCLHLock::QNode* HierarchicalCLHLock::currNode = nullptr;
thread_local HierarchicalCLHLock::QNode* HierarchicalCLHLock::predNode = nullptr;
3 参考文献
- 现代 C++ 中的自旋锁,具有原子性、内存屏障和指数退避
- 排队自旋锁
- 多处理器编程的艺术