当前位置: 首页 > news >正文

笔记:现代操作系统:原理与实现(7)

第八章 同步原语

互斥锁

临界区问题

  • 竞争冒险:多个执行单元(如线程、进程、信号等)在访问共享资源时,最终结果依赖于它们执行的相对时序。
  • 互斥访问:任意时刻只允许至多一个线程访问的方式
  • 临界区:保证互斥访问共享资源的代码区域
  • 临界区问题:如何通过设计协议来保证互斥访问临界区的问题称为临界区问题

解决临界区问题是保证多核中并行程序正确性的关键。然而,即使在单核中,临界区问题同样存在。由于调度器允许多个线程在一个核心中交错执行,正在临界区内执行的线程可能会被打断,然后另一个线程又被调度进临界区,从而造成两个线程同时处于临界区的现象,导致竞争冒险。

使用临界区的程序:

在这里插入图片描述

访问临界区程序的条件:

  • 互斥访问:在同一时刻,最多只有一个线程可以执行临界区。
  • 有限等待:当一个线程申请进入临界区之后,必须在有限的时间内获得许可并进入临界区,不能无限等待。
  • 空闲让进:当没有线程在执行临界区代码时,必须在申请进入临界区的线程中选择一个线程,允许其执行临界区代码,保证程序执行的进展。

硬件实现:关闭中断

单核中关闭中断意味着当前执行的线程不能被其他线程抢占,因此若在进入临界区之前关闭中断,且在离开临界区时再开启中断,便能够避免当前执行临界区代码的线程被打断,保证任意时刻只有一个线程执行临界区。

小思考

在单核环境中,关闭中断能否满足解决临界区问题的三个条件(即互斥访问、有限等待与空闲让进)?

  • 互斥访问:关闭中断可以防止执行临界区的线程被抢占,避免多个线程同时执行临界区,保证了互斥访问
  • 有限等待:有限等待依赖于内核中的调度器,如果能保证在有限时间内调度到该线程,则该线程就可以在有限时间内进入临界区,达成有限等待的要求
  • 空闲让进:每个线程在离开临界区时都开启了中断,允许调度器调度到其他线程并执行。因此某线程在执行时如需要进入临界区(意味着没有其他线程在执行临界区),可以关闭中断并执行临界区代码,从而达成了空闲让进的要求

在多核环境中,关闭中断的方法不再适用。即使同时关闭了所有核心的中断,也不能阻塞其他核心上正在运行的线程继续执行。如果多个同时运行的线程需要执行临界区,关闭中断还是会出现临界区问题。

软件实现:皮特森算法

皮特森算法中有两个重要变量。

  • 第一个是全局数组 flag。它共有两个布尔成员(flag[0] 与 flag[1]),分别代表线程 0 与线程 1 是否尝试进入临界区。
  • 第二个是全局变量 turn。如果两个线程都申请进入临界区,那么 turn 将决定最终能进入临界区的线程编号(因此其值只能为 0 或 1)。

在应用程序开始时,flag 数组中的成员均被设置为 FALSE,表示两个线程都没有申请进入临界区。而变量 turn 无须进行初始化,其会在线程申请进入临界区时被设置。

在皮特森算法中,线程在进入临界区之前必须达成以下两个条件之一。以线程 0 为例,其必须满足 flag[1] == FALSE(即线程 1 没有申请进入临界区)或者 turn ==0(即线程 1 也申请进入临界区,但 turn 决定线程 0 可以进入临界区),方能进入临界区。

皮特森算法:

在这里插入图片描述

线程 0 在进入临界区之前,需要设置 flag[0] 为 TRUE,代表其尝试进入临界区。而在线程 0 检查是否可以进入临界区时(代码第 4、5 行),线程 1 只可能处于以下三种状态之一:

  • 线程 1 在准备进入临界区(执行完第 2 行代码)
    • 这两个线程将依据变量 turn 的值从二者中选出一个先进入临界区,而另一个将继续循环等待(互斥访问)。turn 的值在算法的第 3 行设置,而它的最终值取决于最后对其进行更新的线程(程都会将 turn 设置为对方而不是自己)
  • 线程 1 在临界区内
    • 由于线程 1 在临界区内,它不会再更新 flag[1] 与 turn。而由于线程 0 只能将 turn 值设为 1,满足了线程 0 循环等待的条件(flag[1] 为 TRUE 且 turn 值为 1),保证了同一时刻只有一个线程执行临界区内的代码(互斥访问
  • 线程 1 在执行其他代码(执行完第 6 行代码且在执行第 2 行代码之前)
    • 线程 1 在离开临界区时会设置自己的 flag[1] 为 FALSE,表示不再占有临界区
    • 果线程 1 在线程 0 成功进入临界区之前又再次尝试进入临界区(第 2、3 行),turn 的值会被线程 1 更新为 0,表示线程 0 被允许进入临界区,不会出现无限等待的情况(有限等待

经典的皮特森算法只能应对两个线程的情况,而 Micha Hofri 于 1990 年扩展了皮特森算法,使其能被用于任意数量的线程。需要注意的是,皮特森算法要求访存操作严格按照程序顺序执行。然而现代 CPU 为了达到更好的性能往往允许访存操作乱序执行,因此皮特森算法无法直接在这些 CPU 上正确工作。

由于本章主要关注同步原语,为了方便起见,本章的后续内容均认为 CPU 严格按照程序顺序执行访存操作,且不同的核心也将严格按照程序顺序观察到这些访存操作的执行结果。

软硬件协同:使用原子操作实现互斥锁

除了软件方法,我们还可以利用硬件提供的原子操作(atomic operation)设计新的软件算法来解决临界区问题。

原子操作

原子操作指的是不可被打断的一个或一系列操作。

最常见的原子操作包括

  • 比较与置换(Compare-And-Swap, CAS)
    • CAS 操作会比较地址 addr 上的值与期望值 expected 是否相等,如果相等则将 addr 上的值置换为新的值 new_value,否则不进行置换。最后,CAS 将返回 addr 所存放的旧值
  • 拿取并累加(Fetch-And-Add, FAA)
    • FAA 操作则会读取 addr 上的旧值,将其加上 add_value 后重新存回该地址,最后返回该地址上的旧值。

CAS与FAA操作:

// 代码片段 8-3 CAS 与 FAA 操作int CAS(int *addr, int expected, int new_value)
{int tmp = *addr;if (*addr == expected)*addr = new_value;return tmp;
}int FAA(int *addr, int add_value)
{int tmp = *addr;*addr = tmp + add_value;return tmp;
}

注意代码该片段只是用 C 语言来展现操作逻辑,这段代码本身并不具备原子性。

更新遗失:

  • 在执行FAA时,线程0首先执行int tmp = *addr; ,随后线程1执行完全部的函数,由于线程0中tmp存储了原始的 *addr值,再继续执行剩余代码后,线程1所做的累加操作被覆盖。

为了解决这个问题,我们需要硬件辅助保证上述操作的原子性。

为了在硬件层面支持这些原子操作,不同的硬件平台提供了不同的解决方案。

Intel:

在 Intel 平台,应用程序主要通过使用带 lock 前缀的指令来保证操作的原子性。代码片段 8-4 展示了在 Intel x86-64 平台上使用内联汇编实现原子 CAS 操作的代码。

这里,cmpxchg 指令用于比较与置换指定的地址与值。它先比较地址 addr(对应于 %[ptr])中存储的值与寄存器 %eax 中的值(即 expected 的值,通过 “+a” (expected) 指定),如果相等则将 new_value(对应于 %[new])存入地址 addr 中,否则将 addr 中存储的数据读入寄存器 %eax(即 expected)中。通过给这条指令加上 lock 前缀,Intel 处理器可以保证上述比较与置换操作的原子性。

架构的原子操作实现:

// 代码片段 8-4 Intel x86-64 架构的原子操作实现int atomic_CAS(int *addr, int expected, int new_value)
{asm volatile("lock cmpxchg %[new], %[ptr]": "+a"(expected), "[ptr]" "*m"(*addr): [new] "r"(new_value): "memory");return expected;
}

ARM:

ARM 则采用了 Load-Link/Store-Conditional(LL/SC)的指令组合。在 Load-Link 时,CPU 将使用一个专门的监视器(monitor)记录当前访问的地址,而在 Store-Conditional 时,当且仅当监视的地址没有被其他核修改,才执行存储操作,否则将返回存储失败。

以原子 CAS 为例,在 Load-Link 阶段,该指令将告诉硬件,使用监视器监视 addr 这个地址。而当发现 addr 中存储的值与 expected 相等,因而需要更新 addr 中存储的值时,我们使用 Store-Conditional 来确保此期间没有其他的核心更新了 addr 中存储的值。

// 代码片段 8-5 ARM AArch64 架构的原子操作实现int atomic_CAS(int *addr, int expected, int new_value)
{int oldval, ret;asm volatile("1: ldxr %w0, %2\n""   cmp %w0, %w3\n""   b.ne 2f\n""   stxr %w1, %w4, %2\n""   cbnz %w1, 1b\n""2:": "=r" (oldval), "=r" (ret), "+Q" (*addr): "r" (expected), "r" (new_value): "memory");return oldval;
}

代码片段展示了如何在 ARM AArch64 架构下实现原子 CAS 操作。其中 ldxr 与 stxr 分别对应于 Load-Link 操作与 Store-Conditional 操作。代码中第 5 行利用 ldxr 读取 addr(对应于 %2)的值存入 oldval(对应于 %w0)中,同时监视 addr 这个地址。第 6 行比较 oldval 与 expected(对应于 %w3)是否相等。如果不相等,则在第 7 行跳到标记 2 处(第 10 行)结束该函数,并返回旧值。否则,在第 8 行利用 stxr 操作将 new_value(对应于 %w4)存储到地址 addr 中。如果有其他核心修改 addr 中的值导致存储失败,其会通过设置寄存器 %w1 为 1 来告知。最后,第 9 行通过检查寄存器 %w1 的值来判断是否失败。如果失败,则回到标记 1 处(第 5 行)重新尝试原子 CAS。

互斥锁抽象

利用硬件保证的原子操作,我们可以实现互斥锁(mutex lock)来解决临界区问题。互斥锁的抽象简单易用,一把锁在同一时刻只能被一个线程所拥有。一旦一个线程获取了锁,其他的线程均不能同时拥有该锁,只能等待锁被释放。

互斥锁提供了 lock 与 unlock 操作,分别对应于加锁与放锁。为了保证临界区的互斥访问,我们可以在申请进入临界区时尝试获取共享的一把锁(lock),只有拥有锁的线程才能被允许执行临界区的代码。在退出临界区时,线程将释放锁(unlock),从而允许其他线程拥有锁并进入临界区。

互斥锁的实现种类繁多,不同的互斥锁被用于不同的场景,以达到最好的性能表现。本小节将介绍两种不同的互斥锁,分别为

  • 利用原子 CAS 实现的自旋锁(spin lock)
  • 利用原子 FAA 实现的排号自旋锁(ticket lock)

自旋锁利用一个变量 lock 来表示锁的状态。lock 为 1 表示已经有人拿锁,而为 0 表示该锁空闲。自旋锁的加锁操作是通过使用原子的 CAS 操作实现的。

  • 在加锁时,线程会通过 CAS 判断 lock 是否空闲(是否为 0),如果空闲则上锁(置为 1),否则将一遍一遍重试(第 9 行)
  • 而放锁时,直接将 lock 设置为 0 代表其空闲(第 15 行)

由于大部分 64 位 CPU 对于地址对齐的 64 位单一写操作都是原子的,因此这里不需要使用额外的硬件指令保护写操作的原子性。

由于本章主要关注同步原语为了方便起见,在本章后续内容中均认为对于 64 位及以下数据的单一读写操作为原子的。

代码片段自旋锁:

void lock_init(int *lock)
{// 初始化自旋锁*lock = 0;
}void lock(int *lock)
{while(atomic_CAS(lock, 0, 1) != 0); // 循环忙等
}void unlock(int *lock)
{*lock = 0;
}

分析自旋锁是否满足解决临界区问题的三个条件(即互斥访问、有限等待与空闲让进):

  • 互斥访问:多个竞争者中只会有一个成功完成 CAS 操作并获取锁,进入临界区后未退出临界区前,其他竞争者无法通过获取锁再次进入临界区
  • 空闲让进:当没有竞争者在使用临界区时,竞争者中通过完成 CAS 操作并获取锁并进入临界区
  • 有限等待:(❌)自旋锁并非按照申请的顺序决定下一个获取锁的竞争者,而是让所有的竞争者均同时尝试完成原子操作,而原子操作的成功与否完全取决于硬件特性。
    • 比如,在一些异构环境下(如 ARM 移动 CPU 中的大小核架构),小核由于运行频率低,在与大核竞争时成功完成原子操作的概率远远小于大核。因此运行在小核上的竞争者可能永远也无法获取锁,从而出现不公平的情况。

不过,由于自旋锁实现简单,在竞争程度低时非常高效,因此它依然广泛地应用在各种软件之中。

排号自旋锁

排号自旋锁(ticket lock,下文简称为排号锁)采取了一种更加公平的选择策略:排号锁按照锁竞争者申请锁的顺序传递锁。排号锁的竞争者将依照申请的先后顺序,分得一个排队的序号。排号锁将依照序号,将锁传递给最先到达的竞争者。因此可以认为,锁的竞争者组成了一个先进先出(First In First Out, FIFO)的等待队列。

排号锁的结构体有两个成员,其中

  • owner 表示当前的锁持有者序号
  • next 表示下一个需要分发的序号

获取排号锁需要先通过原子的 atomic_FAA 操作拿到最新的序号并同时增加锁的分发序号(lock函数),来避免其他竞争者拿到相同的序号。

拿到序号后,竞争者将通过判断 owner 的值,等待排到自己的序号(lock函数)。一旦两者相等,竞争者拥有该锁并被允许进入临界区。

而在释放锁时,锁持有者通过更新 owner 的值来将锁传递给下一个竞争者(unlock函数)。

由于锁的传递顺序是依照申请锁( volatile int my_ticket = atomic_FAA(&lock->next, 1);)的顺序而定的,因此该锁保证了获取锁的公平性,且锁的竞争者均会在有限时间内拿到锁并进入临界区(有限等待)。

// 代码片段 8-8 排号锁struct lock
{volatile int owner;volatile int next;
};void lock_init(struct lock *lock)
{// 初始化排号锁lock->owner = 0;lock->next = 0;
}void lock(struct lock *lock)
{// 拿取自己的序号volatile int my_ticket = atomic_FAA(&lock->next, 1);while (lock->owner != my_ticket); // 循环忙等
}void unlock(struct lock *lock)
{// 传递给下一位竞争者lock->owner++;
}

虽然互斥锁能解决竞争冒险,保证在多核环境下对共享资源操作的正确性,但在实际场景中,对于同步的需求多种多样,仅仅依靠互斥锁是远远不够的。后续将介绍一些其他基于互斥锁的同步原语,它们为开发者应对不同场景提供了更加易用的接口。

条件变量

在之前介绍的生产者消费者例子中,当剩余的空位为 0 时,生产者会陷入循环等待(busy looping),也称为循环忙等。因此,我们需要使用条件变量(condition variable)构建一种挂起 / 唤醒的机制来解决循环忙等造成的 CPU 资源浪费问题。

条件变量的使用

条件变量提供了两个接口:

  • cond_wait
    • 用于挂起当前线程
    • 接收两个参数,分别为条件变量与搭配使用的互斥锁
    • 在调用cond_wait时,需要保证当前已经获取搭配的互斥锁
    • cond_wait 在挂起当前线程的同时,该互斥锁会被原子地释放
  • cond_signal
    • 用于唤醒等待在该条件变量上的线程
    • 只接收一个参数:当前使用的条件变量

代码片段展示了如何在生产者与消费者中使用条件变量来避免循环等待。

  • 使用了两个不同的互斥锁 empty_cnt_lockfilled_cnt_lock 来保护对其共享计数器 empty_cntfilled_cnt 的修改。
  • 针对生产者与消费者等待的条件,这里也创建了两个不同的条件变量(empty_condfilled_cond),分别对应于“缓冲区无空数据”和“缓冲区无数据”这两个条件。
  • 当生产者发现没有空闲位置时,就会使用 cond_wait 进入阻塞状态,避免循环等待(第 15 行)。由于生产者需要等待的条件是 empty_slot 不为 0,即缓冲区没满,因此这里需要等待在 empty_cond 上。当有新的空位产生时,消费者会利用 cond_signal 操作唤醒等待在 empty_cond 上的生产者(第 42 行)。
  • 生产者在更新完 filled_slot 计数器之后,会加入一个 cond_signal 操作(第 23 行),用于唤醒等待的消费者。由于消费者等待的条件是 filled_slot 不为 0,即缓冲区不为空,因此这里唤醒的是等待在 filled_cond 上的消费者(第 34 行)。
int empty_slot = 5;
int filled_slot = 0;
struct cond empty_cond;
struct lock empty_cnt_lock;
struct cond filled_cond;
struct lock filled_cnt_lock;void producer(void)
{int new_msg;while(TRUE) {new_msg = produce_new();lock(&empty_cnt_lock);while(empty_slot == 0)cond_wait(&empty_cond, &empty_cnt_lock);  //15行empty_slot --;unlock(&empty_cnt_lock);buffer_add_safe(new_msg);lock(&filled_cnt_lock);filled_slot ++;cond_signal(&filled_cond);  //23行unlock(&filled_cnt_lock);}
}void consumer(void)
{int cur_msg;while(TRUE) {lock(&filled_cnt_lock);while(filled_slot == 0)cond_wait(&filled_cond, &filled_cnt_lock);  //34行filled_slot --;unlock(&filled_cnt_lock);cur_msg = buffer_remove_safe();lock(&empty_cnt_lock);empty_slot ++;cond_signal(&empty_cond);  //42行unlock(&empty_cnt_lock);consume_msg(cur_msg);}
}

条件变量的实现

条件变量的实现需要考虑并发环境下的正确性。代码片段展示了条件变量的一种实现。每个条件变量的结构体中都包含一个 wait_list,用于记录等待在该条件变量上的线程。

// 代码片段 8-10 条件变量的一种实现struct cond {struct thread *wait_list;
};void cond_wait(struct cond *cond, struct lock *mutex)
{list_append(cond->wait_list, thread_self());   //7行atomic_block_unlock(mutex); // 原子挂起并释放锁   //8行lock(mutex); // 重新获得互斥锁                  //9行
}void cond_signal(struct cond *cond)
{if (!list_empty(cond->wait_list))            //14行wakeup(list_remove(cond->wait_list));    //15行
}void cond_broadcast(struct cond *cond)
{while(!list_empty(cond->wait_list))  wakeup(list_remove(cond->wait_list));
}

cond_wait :

  • 将当前线程加入等待队列(第 7 行)
  • 原地挂起当前线程并放锁(第 8 行),这两个步骤中第二步为关键,必须保证挂起与放锁原子地完成。这里使用 atomic_block_unlock 操作来原子地完成挂起与放锁操作。
  • 当该线程被其他线程唤醒之后,其应当再次获取锁进入临界区(第 9 行)并返回

cond_signal:

  • 该函数先判断是否有线程等待在条件变量上(第 14 行)。如果存在这样的线程,则利用操作系统提供的唤醒(wakeup)服务将该线程唤醒(第 15 行)

cond_broadcast:

  • 用于唤醒所有等待在条件变量上的线程

互斥锁与条件变量

互斥锁与条件变量解决的是不同的问题。

  • 互斥锁用于解决临界区问题,保证互斥访问共享资源
  • 条件变量通过提供挂起/唤醒机制来避免循环等待,节省 CPU 资源。条件变量需要和互斥锁搭配使用。

信号量

信号量在不同的线程之间充当信号灯,其根据剩余资源数量控制不同线程的执行或等待。

除了初始化之外,信号量只能通过两个操作来进行更新:

  • P 操作,源自荷兰语 Proberen,即“检验”
  • V 操作,源自荷兰语 Verhogen,即“自增”

正因为如此,信号量又被称为 PV 原语。为了便于理解,一般会使用 wait 和 signal 来表示信号量的 P 和 V 操作。

其中 wait 操作用于等待。当信号量的值(其代表资源数量)小于或等于 0 时进入循环等待(第 3 行)。只有在信号量的值大于 0 时,才停止等待并消耗该资源(第 5 行,减少信号量的值)。

signal 操作则用于通知,其会增加信号量的值供 wait 的线程使用。

代码片段只展示了信号量的语义,如果需要保证多个线程并发执行的正确性,还需要对代码进行很多修改。我们将在本节稍后介绍信号量的具体实现,这里先关注信号量具体的使用场景和使用方法。

// 代码片段 8-11 信号量语义void wait(int *S)
{while(*S <= 0)  //3行; // 循环忙等*S = *S - 1;    //5行
}void signal(int *S)
{*S = *S + 1;
}

信号量的使用

在生产者消费者模型中,两个信号量分别代表两个共享的资源:

  • empty_slot 代表空余的缓冲区资源
  • filled_slot 代表已填入内容的缓冲区资源

生产者在该问题中应先消耗空余的缓冲区资源,再增加已填入的缓冲区资源。消费者则刚好相反。

除了 wait 与 signal 操作,另一个可能会修改信号量的操作是赋予信号量初值。对于 empty_slot 与 filled_slot 的初值,我们应该分别填入缓冲区大小与 0。

// 代码片段 8-12 利用信号量解决生产者消费者问题sem_t empty_slot;
sem_t filled_slot;void producer(void)
{int new_msg;while(TRUE) {new_msg = produce_new();wait(&empty_slot); // P   // 9行buffer_add_safe(new_msg);signal(&filled_slot); // V   //11行}
}void consumer(void)
{int cur_msg;while(TRUE) {wait(&filled_slot); // Pcur_msg = buffer_remove_safe();signal(&empty_slot); // Vconsume_msg(cur_msg);}
}

信号量的实现

前面通过代码片段 8-11 展示了信号量的语义,然而这些代码并非信号量的正确实现,该代码存在两个问题。

问题1:

可能存在多个线程同时更新共享信号量的情况。如果不使用原子操作对其进行更新,就会出现 更新遗失的问题。

问题2:

即使使用原子操作来更新信号量,其仍然存在正确性问题。即,多个消费者同时消耗信号量使信号量的值为负。

代码片段 8-13 展现了信号量的一种实现。信号量结构体包含四个成员,除了条件变量与互斥锁之外,还包含 value 和 wakeup。

  • value 的值既可为正数,也可为负数。当没有线程等待时,value 为正数或零,其表示剩余的资源数量。而当有线程等待时,value 为负数,其绝对值代表正在等待获取资源的线程数量。
  • wakeup 来表示有线程等待时的可用资源数量,其只能为正数或零。该值同时也代表应当唤醒的线程数量,因此取名为 wakeup。

在该实现中,如果想判断当前可用的资源数量,应当首先查看 value,若 value 为正数或零,则取 value 为当前资源数量,否则,取 wakeup 为当前资源数量。

// 代码片段 8-13 信号量的实现struct sem {int value;int wakeup;struct lock sem_lock;struct cond sem_cond;
};void wait(struct sem *S)
{lock(&S->sem_lock);  //10行S->value --;         //11行if (S->value < 0) {do {cond_wait(&S->sem_cond, &S->sem_lock);} while (S->wakeup == 0);S->wakeup --;}unlock(&S->sem_lock);    //18行
}void signal(struct sem *S)
{lock(&S->sem_lock);S->value ++;     //24行if (S->value <= 0) {    //25行S->wakeup ++;cond_signal(&S->sem_cond);     //27行}unlock(&S->sem_lock);
}

首先分析信号量的 wait 操作。当线程在获取该信号量的互斥锁后(第 10 行),先将信号量的 value 减 1(第 11 行)。若其值大于或等于 0,则代表有空闲的资源,wait 操作成功(第 18 行)。如果 value 的值小于 0,那么它不再能表示当前可用的资源数量,我们需要去检查 wakeup 来判断是否还有空闲的资源可以消耗。当 wakeup 大于 0 时,代表还有空闲的资源可以消耗。而如果 wakeup 也为 0,就需要用 8.2 节实现的 cond_wait 将当前线程挂起等待(第 14 行)。而这里使用了 do … while 而非 while,是为了提供有限等待的保证,确保调用了 signal 后立刻调用 wait 的线程不会直接拿走资源,而是将资源交给正在等待的线程。

对应地,在执行 signal 操作时,线程首先将 value 的值加 1(第 24 行),然后判断是否还有线程等待在该信号量上(第 25 行)。如果有,则增加 wakeup(第 26 行),并使用 cond_signal 唤醒其中的一个线程来获取资源(第 27 行)。

互斥锁与信号量

互斥锁与信号量有相似之处。当信号量的初值设置为 1,且只允许其值在 0 和 1 之间变化时,wait 和 signal 操作分别与互斥锁的 lock 和 unlock 操作类似。我们称这种信号量为二元信号量。二元信号量与互斥锁的差别在于:互斥锁有拥有者这一概念,而二元信号量没有。互斥锁往往由同一个线程加锁和放锁,而信号量允许不同线程执行 wait 与 signal 操作。

互斥锁与计数信号量(非二元信号量的其他信号量)区别较大。 计数信号量允许多个线程通过,其数量等于剩余可用资源数量;而互斥锁同一时刻只允许一个线程获取。互斥锁用于保证多个线程对一个共享资源的互斥访问,而信号量则用于协调多个线程对一系列共享资源的有序操作。例如,互斥锁可以用于保护生产者消费者问题中的共享缓冲区,而信号量可以用于协调生产者与消费者何时该等待,何时该放入或拿取缓冲区数据。

条件变量与信号量

条件变量与信号量提供了类似的操作接口(包括 wait 与 signal)。因此很容易弄混。在 8.3 节中,信号量是由条件变量、互斥锁以及计数器实现的。而这个计数器就是信号量的核心,用于表示当前可用资源的数量。因此可以理解为,信号量利用条件变量实现了更高层级的抽象。

读写锁

互斥锁中,所有对共享资源的操作均需要使用互斥锁进行保护。

而当需要读共享数据的线程之间,相互不会产生干扰,我们需要保证的是写共享数据的线程与读共享数据的线程不能同时执行。读写锁(reader writer lock)就是为了解决这个问题而提出的同步原语。

在这种情况下,虽然直接使用互斥锁不会导致任何正确性问题,但是会削减读者之间的并行度,造成一定的性能损失。

读写锁的使用

读写锁使用起来与互斥锁非常类似,不过读写锁针对读者(读共享数据的线程)与写者(写共享数据的线程)分别提供了不同的 lock 与 unlock 操作。

读写锁的规则:

  • 如果此时有写者在同一把读写锁保护的临界区内,读者就必须等待该写者退出临界区后才能进入
  • 而如果只有读者在临界区内,后续的读者就可能可以进入临界区

写者需要使用 lock_writer 与 unlock_writer 来保护写临界区。写临界区既不允许其他读者进入读写锁保护的读临界区,也不允许其他写者进入读写锁保护的写临界区。

// 代码片段 8-14 读写锁使用示例struct rwlock lock;
char data[SIZE];void reader(void)
{lock_reader(&lock);read_data(data); // 读临界区unlock_reader(&lock);
}void writer(void)
{lock_writer(&lock);update_data(data); // 写临界区unlock_writer(&lock);
}

除了最基本的加锁放锁操作,有的读写锁还提供了升级与降级的选择。可以从一个读者锁升级为写者锁,也可以从一个写者锁降级成读者锁。本书不做过多介绍。

读写锁的实现

倾向性问题:

假设在某一个时刻,已经有一些读者在临界区中。此时有一个写者与一个读者同时申请进入临界区,我们是否应该允许读者直接进入读临界区?

  • 偏向读者的读写锁
    • 如果允许读者直接进入,那么写者会被一直阻塞,直到没有任何读者,甚至可能陷入无限等待
  • 偏向写者的读写锁
    • 如果等之前的读者离开临界区后,先允许写者进入,再允许读者进入,这样虽能避免写者陷入无限等待,但同时也减少了读者的并行性

代码片段 8-15 展示了一种偏向读者的读写锁。

  • reader_cnt:用于表示当前读者数目的计数器
  • reader_lock:控制读者对 reader_cnt 进行更新的互斥锁
  • writer_lock:用于保证读者与写者、写者与写者之间互斥的互斥锁
// 代码片段 8-15 偏向读者的读写锁struct rwlock {int reader_cnt;struct lock reader_lock;struct lock writer_lock;
};void lock_reader(struct rwlock *lock)
{lock(&lock->reader_lock);lock->reader_cnt ++;if (lock->reader_cnt == 1) // 第一个读者lock(&lock->writer_lock);unlock(&lock->reader_lock);
}void unlock_reader(struct rwlock *lock)
{lock(&lock->reader_lock);lock->reader_cnt --;if (lock->reader_cnt == 0) // 最后一个读者unlock(&lock->writer_lock);unlock(&lock->reader_lock);
}void lock_writer(struct rwlock *lock)
{lock(&lock->writer_lock);
}void unlock_writer(struct rwlock *lock)
{unlock(&lock->writer_lock);
}

代码片段 8-16 展示了一种偏向写者的读写锁。

  • reader_cnt:用于记录当前读者数量
  • has_writer:表示当前是否有写者到达

当有写者期望获取锁时,即使其还未真正进入临界区,也会将 has_writer 设置为 TRUE。读者将通过判断 has_writer 的值来决定是否需要等待前序的写者。

由于写者也会操作读写锁中的元数据(包括 has_writer 与 reader_cnt),因此偏向写者的读写锁提供了一把读者和写者共享的锁 rwlock->lock,要求读者与写者在操作元数据前获取该锁。

// 代码片段 8-16 偏向写者的读写锁struct rwlock {volatile int reader_cnt;volatile bool has_writer;struct lock lock;struct cond reader_cond;struct cond writer_cond;
};void lock_reader(struct rwlock *rwlock)
{lock(&rwlock->lock);while(rwlock->has_writer == TRUE)  //12行cond_wait(&rwlock->writer_cond, &rwlock->lock);  //13行rwlock->reader_cnt ++;   //14行unlock(&rwlock->lock);   //15行
}void unlock_reader(struct rwlock *rwlock)
{lock(&rwlock->lock);rwlock->reader_cnt --;   //21行if (rwlock->reader_cnt == 0)   //22行cond_signal(&rwlock->reader_cond);   //23行unlock(&rwlock->lock);
}void lock_writer(struct rwlock *rwlock)
{lock(&rwlock->lock);while(rwlock->has_writer == TRUE)     //30行cond_wait(&rwlock->writer_cond, &rwlock->lock);  //31行rwlock->has_writer = TRUE;   //32行while(rwlock->reader_cnt > 0)    //33行cond_wait(&rwlock->reader_cond, &rwlock->lock);  //34行unlock(&rwlock->lock);
}void unlock_writer(struct rwlock *rwlock)
{lock(&rwlock->lock);rwlock->has_writer = FALSE;      //41行cond_broadcast(&rwlock->writer_cond);      //42行unlock(&rwlock->lock);
}
  • 读者在申请进入临界区(lock_reader)
    • 如果观察到 has_writer 被置为 TRUE(第 12 行),则不能进入临界区,需要使用条件变量挂起等待(第 13 行)。当确保没有写者等待时,读者在增加读者计数器 reader_cnt 并放锁后便可进入读临界区(第 14、15 行)
  • 在写者需要进入临界区(lock_writer)
    • 其也需要通过 has_writer 判断是否有其他写者在临界区内(第 30 行)。如果存在其他写者,则需要使用条件变量挂起等待(第 31 行)。当没有其他写者时,当前写者就可以将 has_writer 设置为 TRUE(第 32 行),并使用条件变量等待之前所有的读者离开读临界区(第 33、34 行)。最后,释放保护元数据的锁即可进入写临界区(第 35 行)
  • 读者在放锁(unlock_reader)
    • 除了需要减少读者的计数器 reader_cnt(第 21 行),还需要判断自己是否为最后一个读者(第 22 行)。如果是最后一个读者,还需要使用 cond_signal 来唤醒等待在 reader_cnt 上的写者(第 23 行)
  • 写者在放锁(unlock_writer)
    • 除了设置 has_writer 为 FALSE(第 41 行),还应当使用 cond_broadcast 将等待在 has_writer 上的读者和写者都唤醒(第 42 行)。注意,这里使用 cond_broadcast 是因为读者与写者都有可能等待在 has_writer 上,因此两者都需要被唤醒。如果使用 cond_signal,有可能只唤醒了一个读者,那么等待在 has_writer 上的写者将永远不会被唤醒,从而陷入无限等待

偏向读者的读写锁能大幅提高读者之间的并行度,但是由于写者需要等到某一个时刻完全没有读者时才能更新数据,因而可能面临很高的写延迟。

偏向写者的读写锁虽然避免了这个问题,让写者能够在之前的读者离开临界区后立刻进入临界区,但读者之间的并行度会大幅下降。

因此开发者需要根据具体场景需求,选用合适的读写锁

RCU

RCU (read copy update)允许多个读者同时进入临界区。与此同时,RCU 还希望写者不会阻塞读者,且读者不需要使用额外的同步原语来保护读操作。
我们需要一种原子操作的技术(如arm的单核原子性),让读者在写者更新时,要么读到旧的值,要么读到新的值,而不能观察到任何中间结果。
为此 RCU 引入了一种订阅/发布机制,来利用对 64 位指针(或 32 位机器上的 32 位指针)的读写的原子性,让读者能够原子地更新任意大小的数据。

订阅/发布机制

下面结合一个实际的例子,来展示订阅/发布机制的原理。

下图展示了向一个链表中插入一个新的节点 C 的操作分为两步完成:

  • 第一步先初始化节点 C,即填入节点 C 的数据和指向节点 B 的指针
  • 第二步通过更新节点 A 中的指针,使其指向节点 C

通过这个机制,可以将写者原子地更新任意大小的数据。图中展示的是第一步即是发布操作。写者需要利用发布的接口(如 rcu_assign_pointer(&NodeA->Next, NodeC)),完成指针的更新。

而当读者遍历到节点 A 时,读取 Next 指针寻找下一个节点的操作就是订阅操作(如 next = rcu_dereference(&cur->Next))。读者也必须使用订阅接口来读取指针并解析其指向的内容。

订阅/发布操作本质上就是一次读操作/写操作,为何还需要将它们抽象为订阅和发布的接口呢?这是以为在实际硬件中,为了更好的性能,有的 CPU 会允许访存操作乱序执行。因此普通的读写操作可能造成未写先读的情况,订阅/发布机制将保证顺序的操作打包成两个接口,以此保证第一步与第二步的访存顺序。

利用订阅/发布机制插入新节点:

在这里插入图片描述

  • 如果需要删除节点 C,我们可以原子地更新节点 A 中的 Next 指针,使其指向节点 B。
  • 如果需要修改节点 C,我们可以首先创建节点 C 的拷贝 C’,并在 C’ 上更新数据。之后与插入操作类似,我们会原子地修改节点 A 中的 Next 指针,使其指向节点 C’。

利用订阅/发布机制删除(左)和修改(右)节点:

在这里插入图片描述

下面通过分析分别使用读写锁与 RCU 保护对链表进行修改的操作,进一步展示两者的区别:

场景:系统中存在多个读者需要遍历该链表,同时存在一个写者需要更新节点 C 的内容。

  • 读写锁
    • 若读者先获得了读者锁,则写者必须等待所有读者读完旧的数据(即 Data-C),才能进入临界区将数据更新为 Data-C’
    • 若写者先获得了写者锁,则所有读者都必须等待更新结束后才能开始遍历。这些读者将全部读到 Data-C’
  • RCU
    • 读者与写者可以同时执行临界区
    • 其中一部分读者在写者更新 NodeA->Next 之前便已到达节点 C,则它们将读到旧的数据(即 Data-C)
    • 剩下的读者在写者更新完 NodeA->Next 后到达节点 C’,它们将读到新的数据(即 Data-C’)

虽然订阅/发布机制可以保证读者要么读到旧的数据(还能遍历到节点 C),要么读到新的数据(能遍历到节点 C’),但是我们又面临了一个新的问题:节点 C 何时能够被安全地回收?在更新完指针之后,写者并不能确定是否还有读者正在读取节点 C,此时还不能回收节点 C 占用的内存。

宽限期

RCU 引入了一个叫作宽限期(grace period)的概念。宽限期用于描述从写者更新指针到最后一个可能观察到旧数据的读者离开的这段时间。当宽限期结束之时,没有读者会再观察到旧数据了,此时旧数据可以被回收。

为了实现这个目标,写者必须区分出有可能观察到旧数据的读者,而读者也必须标记自己的读临界区开始与结束的位置。为此,RCU 为读者提供了两个接口(rcu_read_lock 与 rcu_read_unlock)分别用于标记读临界区的开始与结束。

这里虽然有 lock/unlock 关键字,但并非意味着读者需要使用耗时的互斥锁来保护读临界区。这里的两个函数仅用于标示读临界区的开始与结束,其实现可以是对某一时段的读者计数器的增减操作。

RCU 为写者提供了 synchronize_rcu 接口,用于阻塞写者直到宽限期结束。写者需要调用 synchronize_rcu,然后才能回收需要删除的内容。

宽限期:

在这里插入图片描述

通过使用订阅/发布机制来更新数据以及使用宽限期机制来回收失效数据,RCU 允许读者在有写者并发写入共享数据时,也能无须任何耗时的同步操作、不被阻塞地随意读取共享数据。读者只需标明读临界区的开始与结束,以及使用订阅接口读取指针即可。而写者则需要使用互斥锁来保证写者之间的互斥访问。

当然,RCU 并非完美,其最大限制为其仅适用于能够使用订阅/发布机制来更新的对象。

  • 对于双向链表这种更为复杂的数据结构,由于需要同时更新两个指针,RCU 就很难实现
  • RCU 的写者需要一直等到最后一个可能观察到旧数据的读者离开临界区后才能回收旧的数据,这个等待的耗时长短也会充满不确定性

RCU 往往还提供异步的回收机制,将等待与回收的开销从写者的关键路径上剔除。如在 Linux 内核中,提供了 call_rcu 函数。该函数要求写者传入一个回调函数,其会在所有读者离开读临界区时被调用。写者在调用 call_rcu 注册回调函数后会立即返回,无须等待所有读者离开。

读写锁与RCU

这两个同步原语都可用于应对读多写少的场景,通过增加读者之间的并行度来提升应用程序的性能。不同的是,RCU 中的读者不会被写者阻塞且无须使用耗时的同步操作,因此 RCU 中的读者的开销更小。但是读写锁的接口较为方便,可以轻松用于不同的场景。

管程

线程安全(thread-safe)是指某个函数、函数库在多线程环境中被调用时,能够正确地使用同步原语保护多个线程对共享变量的访问与修改。

前文介绍了互斥锁、条件变量、信号量和读写锁这些同步原语来帮助开发者实现线程安全的应用程序。然而,这些工具拥有各不相同的语义和适用场景,开发者在实际应用程序中稍有不慎就会出现死锁等错误。因此,为了减轻开发者的负担,人们提出了管程(Monitor)这一抽象概念。

管程包含两部分内容:

  • 共享的数据
  • 操作共享数据的函数

由于管程自身已经保证了访问共享数据的互斥性,开发者只需要直接调用管程提供的函数即可,无须使用额外的同步原语,从而减轻了开发者的负担。

由于管程的概念既包含了数据,又包含了操作数据的函数,它一般被用于面向对象的程序中。如高级语言 Java 就对管程提供了支持。

Java 中的管程

Java 的管程能够保证:对于其保护对象的所有代码区域,最多只有一个 Java 线程能进入这些区域并处理数据。这一保证大大简化了 Java 中同步代码的设计:如果一部分代码涉及对共享对象的并发修改,可以将它用 synchronized 代码块保护起来;而对于现有的线程不安全的方法,可以在声明时直接加入 synchronized 关键字来保证其线程安全。

同步带来的问题

死锁

当有多个(两个及以上)线程为有限的资源竞争时,有的线程就会因为在某一时刻没有空闲的资源而陷入等待。而死锁(deadlock)就是指这一组中的每一个线程都在等待组内其他线程释放资源从而造成的无限等待。

死锁产生的原因

导致死锁出现一共有四个必要条件:

  • 互斥访问:互斥访问保证一个共享资源在同一时刻只能被至多一个线程访问。在有互斥访问的前提下,线程才会出现等待
  • 持有并等待:线程持有一些资源,并等待一些资源。如,两个线程在各自持有一把互斥锁的同时等待对方放锁
  • 资源非抢占:一旦一个资源被持有,除非持有者主动放弃,否则其他竞争者都得他得不到这个资源
  • 循环等待:循环等待是指存在一系列线程 T₀, T₁, …, Tₙ,其中 T₀ 等待 T₁, T₁ 等待 T₂, …, Tₙ₋₁ 等待 Tₙ,而 Tₙ 等待 T₀。因此形成了一个循环。由于循环中任意一个线程都不能等到资源,因而如果不能释放已经占有的资源,便出现了循环等待

除了多个线程相互阻塞造成死锁之外,还有一种特殊的情况也会导致死锁:

  • 在中断处理流程中使用了互斥锁,如果在获取锁后、放锁前再次出现中断,就会导致死锁
  • 在递归函数中使用互斥锁也会面临同样的问题

这类死锁可以通过使用可重入锁来解决。可重入锁在加锁时会判断锁的持有者是否为线程自己,如果是自己则不会阻塞和等待,而是通过一个计数器来记录加锁次数。这个计数器会在放锁时减少,当为 0 时才真正放锁。

此外,在操作系统中断处理流程中,往往通过在加锁时关中断以及在放锁时开中断来解决这个问题。

死锁检测与恢复

当死锁出现时,我们需要一个第三者来打破僵局,帮助死锁恢复。操作系统往往扮演这个第三者的角色。

结合之前介绍的死锁出现的四个必要条件,我们来分析一下如何检测系统中是否出现了死锁。由于四个必要条件中的前三个都是描述使用场景本身的性质,只有一条(即循环等待)和实际运行状态相关,因此循环等待是检测死锁的关键。

为了确认系统中是否存在循环等待,需要获取系统中资源分配与线程等待的相关信息。其中,资源分配表记录着当前不同资源被占有的情况,而线程等待表记录着线程等待不同资源的情况。

左图中,其中从资源指向线程的实线箭头代表该线程占有该资源,而从线程指向资源的虚线箭头代表该线程等待着该资源。当左图中的存在环形时,就可以判断此时出现了死锁。

死锁检测:

在这里插入图片描述

为了让系统恢复正常运行,我们需要打破循环等待的关系。

  • 最直接的一个方法是,找到这个环中任意的线程作为受害者,直接终止该线程并释放其占有的资源
  • 还可以让环上所有线程回退到之前的某一个状态再次运行。由于死锁的出现往往是由于特定的调度和触发时机而导致的,再次运行很大概率不会触发死锁

那么,操作系统到底应该何时开始做死锁检测呢?

其通常选用的策略有:

  • 定时监测(如每运行 24 小时检测一次)
  • 超时等待检测(如等待在某资源上超过一个限定的时间就检测一次)

研究者们还提出了一系列方法直接避免死锁的出现。这些方法主要分为两类:

  • 死锁预防:是指从源头设计上避免死锁的出现
  • 死锁避免:是在运行时动态检查,避免死锁的出现

死锁预防

死锁预防是通过设计合理的资源分配算法,从源头上预防死锁。为了实现这个目标,只需要保证产生死锁的四个必要条件不被同时满足。

  • 避免互斥访问:(此条件不易被满足)
  • **不允许持有并等待:**要求线程在真正开始操作之前一次性申请所有的资源。
    • 这种方法的问题在于,当资源的竞争程度较高时,一个线程很可能不能一次性拿到所有共享资源的使用权。因此可能会进入申请-释放的循环,造成资源利用率低,甚至出现饥饿(starvation)的情况
  • **允许资源被抢占:**即允许一个线程抢占其他线程已经占有的资源。而这种方法的难点是我们需要保证被抢占的线程能够正确地恢复。这种策略也只适用于易于保存和恢复的场景
  • **避免循环等待:**为了避免出现循环等待,我们可以要求线程必须按照一定顺序来获取资源。在之前产生死锁的例子中,我们可以对锁 A 与锁 B 进行编号,要求所有线程按照顺序获取锁 A 与锁 B。这样就可以避免这两个线程发生死锁。

死锁避免

死锁避免是通过在系统运行时跟踪资源分配过程来避免出现死锁。在系统运行时,任意线程需要新的资源都必须向系统提出申请。而系统将根据其所处状态,判断是否能够将资源分配给线程。系统存在以下两种状态:安全状态与非安全状态。

  • 安全状态是指系统中存在至少一个安全序列 {T₁, T₂, …, Tₙ}。如果系统按照这个序列调度线程执行,即可避免资源不足的情况发生。
  • 如果系统中不存在这样一个安全序列,则称之为非安全状态。在非安全状态下,系统一定找不到一种分配资源的顺序,让所有线程都得到满足,因此必定会产生死锁。

死锁避免算法就是让系统在每一次分配资源后都处于安全状态。如果分配之后不能处于安全状态,则不分配资源给线程。

我们将详细介绍一种具体的死锁避免算法:银行家算法

为了描述系统中的供给/需求关系,银行家算法使用了一系列数据结构来保存这些关系。假设我们的系统中存在 M 类资源,而线程有 N 个,则这些数据结构为以下四种。

  • 全局可利用资源:Available[M]。该数组代表某一时刻系统中每一类元素的可用个数。这个数组在初始化时被设置为系统中拥有的 M 类资源的总量。
  • 每个线程的最大需求量:Max[N][M]。该矩阵包含所有 N 个线程对 M 类资源的最大需求量。
  • 已分配资源数量:Allocation[N][M]。该矩阵包含已经分配给所有 N 个线程的 M 类资源的数量。
  • 还需要的资源数量:Need[N][M]。该矩阵包含所有 N 个线程对 M 类资源还需要的资源数量

此外,我们还需要保证以下前提,银行家算法才能正确工作。

  • 系统中的供给关系必须是固定的。比如系统中的资源类别数 M、线程总数 N、整个系统中 M 类资源的可用数量,以及每个线程所需要的不同种类资源的数量都是固定的。
  • 任意线程对任意资源的最大需求量不能超过系统的资源总量。
  • 对于任意一类资源,任意线程已分配的资源加上该线程还需要的资源,必须要小于或等于该线程对这一类资源的最大需求量。
  • 线程在获得了其需要的所有资源之后,能够在有限时间内完成工作,并释放已经获得的资源。

对于满足这些前提的系统,我们可以利用银行家算法保证该系统一直处于安全状态。正如之前所述,银行家算法的核心是模拟分配某个资源之后,检查系统是否还处于安全状态,从而保证系统能够始终处于安全状态。这个检查算法被称为安全性检查算法。安全性检查算法的执行流程如下:

  1. 创建一个临时数组 Available_sim,其初始值与数组 Available 一致,用于记录在接下来的模拟执行中系统可用资源数量。
  2. 找到系统当前剩余资源能够满足的线程。即找到一个线程 x,对于 Available_sim 中任意成员 m,都满足 Available[m] >= Need[x][m]。如果无法找到这样的线程,则代表系统处于非安全状态。
  3. 假设将资源全部分配给该线程,且它执行完后会释放所有的资源。即对于 Available_sim 中所有成员 m,都执行 Available_sim[m] += Allocation[x][m]。同时标记线程 x 执行结束。
  4. 遍历所有线程,查看是否还有未被标记执行结束的线程。如果有则回到第二步继续执行。否则,代表系统处于安全状态。

活锁

同死锁类似,出现活锁时,锁的竞争者很长一段时间都无法获取锁进入临界区。下面将以之前在死锁预防中介绍的不允许持有并等待的方法为例,详细介绍活锁。

“不允许持有并等待策略要求一次性申请所有的资源,一旦任意需要获取的资源不可用,则需要放弃所有已经占有的资源并尝试重新获取”。代码片段 8-19 展示了如何采用这种方法改写 proc_A。

// 代码片段 8-19 避免持有并等待示例while (TRUE) {if(trylock(A) == SUCC) {if(trylock(B) == SUCC) {// 临界区unlock(B);unlock(A);break;} else {unlock(A);}}
}

虽然这样能够避免死锁,但是会导致在特殊情况下没有线程能够同时获取锁 A 与锁 B。

假设存在另外一个线程,其运行代码与代码片段 8-19 类似,不同的是,其先获取锁 B 再尝试获取锁 A。当这两个线程同时执行时,便有可能出现:一个线程获取了锁 A 并尝试获取锁 B 时,另一个线程获取了锁 B 并尝试获取锁 A。此时,两个线程都无法成功获取锁 A 与锁 B,因此只能释放自己已经获取的锁并重试。而若两个线程执行速度相似,则很有可能下一次又出现同样的情况,再次占有对方尝试获取的锁,但又无法获取全部的资源,最终只能再次释放。

如此往复,这两个线程都无法成功地同时获取两把锁并进入临界区。这种现象就称为活锁。

活锁并没有发生阻塞,且活锁可能会自行解开。

活锁由于产生的条件比较特殊,并没有一种统一的方法来避免,需要结合具体问题来分析和提出避免方案。如在刚才的例子中,我们可以采取“让线程在获取失败后等待随机的时间再开始下次尝试”的策略来减少出现活锁的概率,或者要求所有的线程都按照一个统一的顺序来获取锁,从而避免出现活锁。

优先级反转

先级反转(priority inversion)是由于同步导致线程执行顺序违反预设优先级的问题。下面我们通过一个例子来介绍优先级反转出现的场景与原因。假设在一个 CPU 核心上有高、中、低三个优先级的线程 T₁、T₂ 与 T₃。调度器依照优先级高低进行调度。其中高优先级线程 T₁ 与低优先级线程 T₃ 竞争同一把锁。图 展现了出现优先级反转的一种场景。

优先级反转:

在这里插入图片描述

此时由于T3首先获得了锁,T1进行等待。由于调度器会选择当前优先级最高的线程执行,在 t₁ 时刻,中等优先级的线程 T₂ 被创建并开始执行。导致优先级比T3低的T2先执行。

在实时操作系统中,优先级反转问题造成的后果十分严重。高优先级任务对实时性要求高,而优先级反转会导致高优先级任务被低优先级任务阻塞,甚至错过其预定的截止时间(deadline)。

为了避免优先级反转,最直接的方法是使用不可抢占临界区协议(Non-preemptive Critical section Protocol, NCP)。这个协议的核心是避免线程在临界区中被抢占。在上面的例子中,T₃ 一旦获取锁,便不允许任何其他线程抢占。这能有效地避免 T₂ 在 T₃ 执行临界区时抢占 CPU 执行,从而避免了优先级反转问题。但 NCP 并非完美,使用该协议意味着所有的线程都必须等待前序线程的临界区执行完毕,这也包括一些没有竞争关系且优先级高的线程。阻塞这些线程同样可能会导致其错过截止时间。

在现代操作系统中,更常采用的方法是使用优先级继承协议(Priority Inheritance Protocol, PIP)。这个协议的核心是通过继承优先级来避免出现优先级反转。在优先级继承协议中,当高优先级线程等待锁时,会使锁的持有者继承其优先级,从而避免该锁的临界区被低优先级的任务打断。

使用优先级继承协议避免优先级反转:

在这里插入图片描述

除了上述两种协议之外,还有一种优先级置顶协议(Priority Ceiling Protocol, PCP)。顾名思义,优先级置顶是将获取锁的线程的优先级置为可能竞争该锁的线程中的最高优先级(需要由线程提前告知操作系统)。根据置顶的时机的不同,该协议又分为

  • 即时优先级置顶协议(Immediate Priority Ceiling Protocol, IPCP)
    • 即时优先级置顶协议要求线程在获取锁的时刻,将自己的优先级提升到可能会竞争该锁的线程中的最高优先级
  • 原生优先级置顶协议(Original Priority Ceiling Protocol, OPCP)
    • 原生优先级置顶协议则是在真的有其他的线程竞争该资源时提升优先级

在这里插入图片描述

原生优先级置顶协议的执行流程与优先级继承协议的执行流程一致,它们的区别在哪里?

原生优先级置顶协议与优先级继承协议的主要区别在于原生优先级置顶协议会一次性提升 T₃ 的优先级至可能会竞争该锁的线程中的最高优先级。如果未来还有一个更高优先级的线程 T₀ 需要获取同一把锁,那么这里 T₃ 就会直接提升到 T₀ 的优先级。如果采用优先级继承,当 T₀ 到来时,还需要切换到线程 T₀ 执行,并等到其获取锁时再执行一次优先级继承流程。

从实现难易度来说,NCP 与 IPCP 的实现难度较小,它们分别只需要禁止抢占和在获取锁时提升优先级。PIP 与 OPCP 的实现难度较大,它们需要监视已经被获取的锁,在其他线程尝试获取该锁时提升拥有者的优先级。

案例分析:Linux 中的 futex

Linux 提供了 futex(Fast User-space muTEX)机制。使用 futex 机制实现的互斥锁能在竞争程度较低时,直接使用原子操作完成加锁。而在竞争程度较高时,应用程序能够通过系统调用挂起并等待被后续锁持有者唤醒。futex 机制不仅可以用于避免互斥锁中的循环等待,还能用于实现条件变量等各类同步原语。

Linux 内核中的 futex 功能完善且繁多,本节介绍简化后的 futex 接口:

包括 futex_wait 和 futex_wake,分别用于等待和唤醒。

  • futex_wait 接收两个参数:uaddr 和 val。futex_wait 操作会判断地址 uaddr 上的值是否与 val 相等。如果相等则挂起该线程,否则直接返回
  • futex_wake 只接收一个参数 uaddr,其会尝试唤醒之前通过 futex_wait 等待在相同 uaddr 上的线程。futex 提供的这两个操作是互斥(即线程安全)的

代码片段 8-20 利用简化版的 futex 接口实现了 lock 和 unlock 功能。在 lock 操作尝试 atomic_CAS 失败(第 11 行)后,会先使用原子操作增加等待者计数器 waiters(第 12 行),然后调用 futex_wait 进入内核尝试挂起(第 13 行)。由于用户态的 atomic_CAS 执行和最终挂起之间存在时间差,在这段时间内锁可能已经被释放,因此在内核中会检查传入的 uaddr(即 lock->val)上的值是否等于 val(即 1)。如果不相等,即 lock->val 的值为 0,说明锁已经被释放,此时会直接返回用户态,并再次尝试 atomic_CAS;相反,如果 lock->val 的值为 1,则说明该线程需要挂起。操作系统内核会为每个等待的 uaddr 维护一个 FIFO 队列,当不同线程调用 futex_wait 等待在相同的 uaddr 上时,线程会被加入该队列中,等待接下来被 futex_wake 依次唤醒。等待者计数器 waiters 记录了当前有多少线程调用 futex_wait,如果在unlock时waiters的值不为0(第21行),则使用futex_wake尝试唤醒正在等待lock的线程(第22行)。

// 代码片段 8-20 简化的 futex 接口和使用 futex 的互斥锁void futex_wait(int *uaddr, int val);
void futex_wake(int *uaddr);struct lock {int val;int waiters;
};void lock(struct lock *lock)
{while(atomic_CAS(&lock->val, 0, 1) != 0) {atomic_FAA(&lock->waiters, 1);futex_wait(&lock->val, 1);atomic_FAA(&lock->waiters, -1);}
}void unlock(struct lock *lock)
{lock->val = 0;if (lock->waiters != 0)futex_wake(&lock->val);
}

与条件变量类似,futex 的使用也需要考虑一些特殊情况。比如当有线程在调用 futex_wait,但还未将该线程插入内核的等待队列时,另一个线程同时调用 futex_wake,发现等待队列为空,这就有可能造成无法唤醒该线程的情况。为了避免这种情况,Linux 内核保证了不同 futex 操作的互斥性,多个 futex 操作之间只可能先后发生,不可能同时发生。

http://www.dtcms.com/a/553092.html

相关文章:

  • Java-164 MongoDB 认证与权限实战:单实例与分片集群 整体认证配置实战 最小化授权/错误速查/回滚剧本
  • 北京公司的网站建设きょこんきょうしゃ在线
  • 第4讲:理解Flutter的灵魂 - “Everything is a Widget”
  • 驱动精灵、驱动人生、NVIDIA专业显卡驱动、360驱动大师、联想乐驱动,电脑驱动修复工具大全
  • Spring Boot 4与Spring Framework 7:云原生Java的全新革命与企业级实战
  • 虚拟机在云原生与智能时代的未来应用场景探析
  • 电脑如何设置wifi密码,详细步骤教程指南
  • C#面试题及详细答案120道(51-60)-- LINQ与Lambda
  • 北京网站备案的地址ps怎么做网站分隔线
  • DLSS是什么
  • web网页开发,旧版在线%考试,判题%系统demo,基于python+flask+随机分配考试题目,基于开发语言python,数据库mysql
  • 【C++】哈希表封装实现 unordered_map 和 unordered_set
  • 353-Spring AI Alibaba ARK 多模型示例
  • 安徽海绵城市建设协会网站ip查询网站备案查询系统
  • MVVM架构与ICommand核心笔记
  • Web后端开发学习总结
  • 萍乡做网站的公司有哪些门户网站建设方案ppt 百度文库
  • Wireshark抓包教程:获取网站登录凭证
  • 销售驱动的黄昏:医药商业化模式的效率悖论与转型必然
  • 【mysql】锁机制 - 2.行锁间隙锁临键锁
  • 做网站制作需要多少钱网络设计公司有哪些
  • 外卖骑手的Python转型指南:从送餐到编程的实战路径
  • 一款端侧TTS:NeuTTS-Air,3秒语音克隆,声音听起来没有生硬感,语气和节奏感相对自然
  • 网站建设网站软件页面设计属于什么知识产权
  • 网站管理的含义长春做网站哪家好
  • Nacos和Nginx集群,项目启动失败问题
  • Opencv(五): 腐蚀和膨胀
  • 17.React获取DOM的方式
  • 编码器读写操作方式
  • WEB服务