[Linux]学习笔记系列 -- [kernel][lock]mutex
title: mutex
categories:
- linux
- kernel
- lock
tags: - linux
- kernel
- lock
abbrlink: eecdb22f
date: 2025-10-03 09:01:49
文章目录
- kernel/locking/mutex.c 互斥锁(Mutex) 内核中基本的睡眠锁实现
- 历史与背景
- 这项技术是为了解决什么特定问题而诞生的?
- 它的发展经历了哪些重要的里程碑或版本迭代?
- 目前该技术的社区活跃度和主流应用情况如何?
- 核心原理与设计
- 它的核心工作原理是什么?
- 它的主要优势体现在哪些方面?
- 它存在哪些已知的劣势、局限性或在特定场景下的不适用性?
- 使用场景
- 在哪些具体的业务或技术场景下,它是首选解决方案?
- 是否有不推荐使用该技术的场景?为什么?
- 对比分析
- 请将其 与 其他相似技术 进行详细对比。
- include/linux/mutex_types.h
- include/linux/mutex.h
- DEFINE_MUTEX
- mutex_init
- kernel/locking/mutex.c
- __mutex_trylock_fast __mutex_unlock_fast
- mutex_lock
- mutex_unlock
- __mutex_init
- kernel/locking/rtmutex.c 实时互斥锁(Real-Time Mutex) 支持优先级继承的内核锁
- 历史与背景
- 这项技术是为了解决什么特定问题而诞生的?
- 它的发展经历了哪些重要的里程碑或版本迭代?
- 目前该技术的社区活跃度和主流应用情况如何?
- 核心原理与设计
- 它的核心工作原理是什么?
- 它的主要优势体现在哪些方面?
- 它存在哪些已知的劣势、局限性或在特定场景下的不适用性?
- 使用场景
- 在哪些具体的业务或技术场景下,它是首选解决方案?
- 是否有不推荐使用该技术的场景?为什么?
- 对比分析
- 请将其 与 标准`mutex`进行详细对比。
- kernel/locking/rtmutex.c
- 优先级继承
- **优先级继承(Priority Inheritance)问题与解决流程图**
- **1. 问题场景:优先级反转 (Priority Inversion)**
- **2. 解决方案:优先级继承 (Priority Inheritance)**
- **内核实现流程 (`rt_mutex_adjust_prio_chain`的角色)**
- rt_mutex_adjust_prio_chain 调整优先级链的实时互斥锁
- kernel/locking/rtmutex_api.c
- rt_mutex_adjust_pi 重新检查优先级继承(pi)链

https://github.com/wdfk-prog/linux-study
kernel/locking/mutex.c 互斥锁(Mutex) 内核中基本的睡眠锁实现
历史与背景
这项技术是为了解决什么特定问题而诞生的?
kernel/locking/mutex.c
实现的互斥锁(Mutex,Mutual Exclusion)是为了解决多任务内核中一个最基础、最普遍的并发控制问题:如何确保一段代码(临界区)在任何时刻最多只能被一个执行绪(线程或进程)执行,从而保护共享数据的完整性。
在没有互斥机制的情况下,如果多个CPU上的任务同时访问和修改同一个共享数据(如一个链表),就会导致数据损坏、状态不一致、内存泄漏等各种灾难性的后果。
mutex
被设计为一种睡眠锁(Sleeping Lock),专门用于解决**进程上下文(Process Context)**中的互斥问题。它的诞生是为了与另一种锁——**自旋锁(Spinlock)**形成互补:
- 自旋锁适用于中断上下文或保护极短的、不容许睡眠的临界区。获取不到锁的任务会“自旋”(忙等待),浪费CPU。
- 如果临界区比较长,或者在临界区内需要调用可能导致睡眠的函数(如
kmalloc(GFP_KERNEL)
、copy_from_user
、等待I/O),使用自旋锁会造成长时间的CPU空转,甚至因在持有自旋锁时睡眠而导致系统死锁。
mutex
正是为了这种临界区可睡眠的场景而设计的。当一个任务试图获取一个已经被占用的mutex
时,它不会忙等待,而是会被加入到一个等待队列中并进入睡眠状态,将CPU让给其他任务。
它的发展经历了哪些重要的里程碑或版本迭代?
Linux内核的mutex
实现经过了多次重要的优化演进:
- 从信号量(Semaphore)演变而来:在
mutex
正式出现之前,内核使用计数为1的信号量来模拟互斥锁。但信号量是一个更通用的同步原语,其实现比专门的互斥锁要复杂和低效。 - 通用
mutex
子系统的建立:一个重要的里程碑是创建一个独立的、高度优化的mutex
子系统。它提供了mutex_init
,mutex_lock
,mutex_unlock
等清晰的API。 - 性能优化——快速路径(Fastpath):现代
mutex
的实现核心是其性能优化。它区分了快速路径和慢速路径。- 快速路径:用于无竞争或低竞争的场景。通过一次原子的比较并交换(Compare-and-Swap)操作就能成功获取或释放锁,几乎没有额外开销。
- 慢速路径:当锁存在竞争时,代码会跳转到慢速路径。在这里,任务会被加入等待队列并进入睡眠。
- 公平性与优化:引入了不同的实现变体,如公平锁(确保等待者按顺序获取锁)和为了性能而进行优化的非公平锁。
- 调试支持(
CONFIG_DEBUG_MUTEXES
):增加了强大的调试功能,如死锁检测、锁所有权检查、在中断上下文中使用mutex
的错误检测等,极大地帮助了内核开发者发现和修复锁相关的bug。
目前该技术的社区活跃度和主流应用情况如何?
mutex
是Linux内核中最基础、最广泛使用的同步原语之一。其代码非常核心和稳定。社区的活跃度主要集中在:
- 对锁的性能进行持续的微调和优化,以适应新的处理器架构和工作负载。
- 增强其调试功能。
mutex
被内核中几乎所有可以睡眠的、需要进行互斥访问的场景所使用,特别是:
- 设备驱动程序:用于保护设备的状态数据结构,防止来自
open
,read
,write
,ioctl
等系统调用的并发访问。 - 文件系统:用于保护inode、superblock等核心数据结构。
- 内核核心子系统:在各种需要保护可睡眠路径中的共享数据时使用。
核心原理与设计
它的核心工作原理是什么?
现代Linux mutex
的核心是一个原子计数器、一个自旋锁和一个等待队列的巧妙结合。
-
数据结构 (
struct mutex
):owner
: 一个原子变量(atomic_long_t
),用于存储持有锁的task_struct
的指针。它同时也被用来编码锁的状态(是否被锁定)。wait_lock
: 一个raw_spinlock_t
,用于保护等待队列本身,确保并发地加入/移出等待队列的操作是安全的。wait_list
: 一个链表头,即等待队列。
-
加锁 (
mutex_lock
) 的流程:- 快速路径:首先,它会尝试一次原子的
cmpxchg
(比较并交换)操作。如果owner
字段是NULL
(表示锁未被持有),就原子地将其设置为当前任务的指针。如果这个操作成功,锁就获取到了,函数立即返回。这是无竞争下的情况,非常快。 - 慢速路径:如果快速路径失败(说明锁已被其他任务持有),代码会跳转到一个更复杂的函数(
__mutex_lock_slowpath
)。- 获取
wait_lock
自旋锁。 - 将当前任务添加到一个等待队列(
wait_list
)中。 - 释放
wait_lock
自旋锁。 - 在一个循环中,将当前任务的状态设置为
TASK_UNINTERRUPTIBLE
,然后调用schedule()
进入睡眠。 - 当被唤醒后,循环会检查自己是否成为了新的
owner
,如果是,则退出循环。
- 获取
- 快速路径:首先,它会尝试一次原子的
-
解锁 (
mutex_unlock
) 的流程:- 快速路径:它会尝试一次原子的
cmpxchg
操作,将owner
字段从当前任务的指针原子地设置为NULL
。如果成功(这意味着没有其他任务在等待队列中),函数立即返回。 - 慢速路径:如果快速路径失败(通常意味着
owner
字段被设置了特殊标志,表明有等待者),代码会跳转到__mutex_unlock_slowpath
。- 获取
wait_lock
自旋锁。 - 从等待队列中取出一个等待者。
- 将这个等待者的任务指针设置为锁的新
owner
。 - 调用
wake_up_process()
唤醒这个等待者。 - 释放
wait_lock
自旋锁。
- 获取
- 快速路径:它会尝试一次原子的
它的主要优势体现在哪些方面?
- 高效性:在无竞争或低竞争的情况下,通过原子操作实现的快速路径开销极低。在有竞争的情况下,通过让任务睡眠来避免CPU资源的浪费。
- 安全性:提供了严格的“一个所有者”语义。只有持有锁的任务才能释放它。
- 强大的调试支持:内核的调试选项可以帮助发现大量的常见锁使用错误。
- 简单易用的API:
mutex_lock/unlock
的接口非常直观。
它存在哪些已知的劣势、局限性或在特定场景下的不适用性?
- 不能在中断上下文中使用:这是
mutex
最根本的限制。中断上下文不能睡眠,而mutex
在有竞争时会导致睡眠。在中断上下文(硬中断、软中断、tasklet等)中尝试获取mutex
会触发内核错误。 - 性能开销:虽然经过高度优化,但在高竞争下,进入慢速路径、进行上下文切换的开销仍然远大于自旋锁。
- 只支持单一所有者:它不能像读写锁那样允许多个读者并发访问。
使用场景
在哪些具体的业务或技术场景下,它是首选解决方案?
mutex
是进程上下文中进行互斥保护的默认和首选工具,特别是当:
- 临界区内可能睡眠:这是使用
mutex
而非spinlock
的决定性因素。例如,驱动的ioctl
处理函数中需要调用copy_from_user()
,这个函数可能会因为缺页异常而睡眠。 - 临界区执行时间较长:即使临界区内不睡眠,但如果执行时间超过几个微秒,使用
mutex
也比spinlock
更合适,因为它避免了其他CPU长时间的自旋等待。 - 保护的数据结构被用户上下文访问:所有通过系统调用进入的内核路径都属于进程上下文,因此在这些路径中保护共享数据,
mutex
是标准选择。
是否有不推荐使用该技术的场景?为什么?
- 中断上下文:绝对禁止。必须使用
spinlock
。 - 需要保护极短的、不睡眠的临界区:如果临界区非常短(例如,只是修改几个整数),即使在进程上下文中,使用
spinlock
也可能比mutex
更快,因为它避免了进入慢速路径的函数调用开销。 - 需要允许多个读者并发访问:在这种“读多写少”的场景下,应该使用
rw_lock
(读写锁)或seqlock
来获得更好的性能。
对比分析
请将其 与 其他相似技术 进行详细对比。
特性 | 互斥锁 (Mutex) | 自旋锁 (Spinlock) | 读写锁 (RW_Lock) |
---|---|---|---|
基本行为 | 睡眠 (放弃CPU)。 | 自旋 (忙等待,占用CPU)。 | 睡眠或自旋 (取决于实现和竞争方)。 |
使用上下文 | 进程上下文 (可以睡眠)。 | 任何上下文 (中断、进程)。 | 进程上下文 (读写信号量) 或 任何上下文 (读写自旋锁)。 |
保护粒度 | 完全互斥。任何时候只有一个持有者。 | 完全互斥。 | 读者共享,写者互斥。允许多个读者或一个写者。 |
临界区约束 | 可以睡眠,可以较长。 | 绝对不能睡眠,必须极短。 | 读者不能阻塞写者太久。 |
性能(无竞争) | 非常高 (快速路径)。 | 非常高 (原子操作)。 | 非常高。 |
性能(有竞争) | 中等 (上下文切换开销)。 | 低 (CPU空转浪费资源)。 | 取决于读写比例。 |
死锁风险 | 相对较低 (调试机制可检测)。 | 较高 (在中断中获取已持有的锁会立即死锁)。 | 与mutex /spinlock 类似。 |
典型场景 | 驱动ioctl ,文件系统操作。 | 中断处理程序,调度器内部。 | 保护频繁被读、偶尔被写的配置数据。 |
include/linux/mutex_types.h
/** 具有严格语义的简单、直接的互斥锁:** - 一次只有一个任务可以持有互斥锁* - 只有所有者可以解锁互斥锁* - 不允许多次解锁* - 不允许递归锁定* - 互斥锁对象必须通过 API 初始化* - 互斥锁对象不得通过 memset 或复制进行初始化* - 任务不能在持有互斥锁的情况下退出* - 不得释放持有的锁所在的内存区域* - 持有的互斥锁不得重新初始化* - 互斥锁不能用于硬件或软件中断上下文,例如 tasklet 和 timers** 启用 DEBUG_MUTEXES 时,将完全强制实施这些语义。此外,除了强制执行上述规则外,互斥锁调试代码还实现了许多附加功能,使锁调试更容易、更快捷:** - 每当互斥锁在调试输出中打印时,都使用互斥锁的符号名称* - 获取点跟踪,函数名称的符号查找* - 系统中持有的所有锁的列表,它们的打印输出* - 所有者跟踪* - 检测自递归锁并打印出所有相关信息* - 检测多任务循环死锁并打印出所有受影响的锁和任务(并且只打印那些任务)*/
struct mutex {atomic_long_t owner;raw_spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNERstruct optimistic_spin_queue osq; /* Spinner MCS lock */
#endifstruct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXESvoid *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;
#endif
};
include/linux/mutex.h
DEFINE_MUTEX
#define __MUTEX_INITIALIZER(lockname) \{ .owner = ATOMIC_LONG_INIT(0) \, .wait_lock = __RAW_SPIN_LOCK_UNLOCKED(lockname.wait_lock) \, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \__DEBUG_MUTEX_INITIALIZER(lockname) \__DEP_MAP_MUTEX_INITIALIZER(lockname) }#define DEFINE_MUTEX(mutexname) \struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
mutex_init
/*** mutex_init - 初始化互斥锁* @mutex:需要初始化的互斥锁** 将互斥锁初始化为解锁状态。** 不允许初始化已锁定的互斥锁。*/
#define mutex_init(mutex) \
do { \static struct lock_class_key __key; \\__mutex_init((mutex), #mutex, &__key); \
} while (0)
kernel/locking/mutex.c
__mutex_trylock_fast __mutex_unlock_fast
#ifndef CONFIG_DEBUG_LOCK_ALLOC
/** Lockdep annotations are contained to the slow paths for simplicity.* There is nothing that would stop spreading the lockdep annotations outwards* except more code.*//** Optimistic trylock that only works in the uncontended case. Make sure to* follow with a __mutex_trylock() before failing.*/
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{unsigned long curr = (unsigned long)current;unsigned long zero = 0UL;MUTEX_WARN_ON(lock->magic != lock);if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))return true;return false;
}static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{unsigned long curr = (unsigned long)current;return atomic_long_try_cmpxchg_release(&lock->owner, &curr, 0UL);
}
#endif
mutex_lock
/*** mutex_lock - 获取互斥锁* @lock:需要获取的互斥量** 专门为此任务锁定互斥锁。如果互斥锁现在不可用,它将休眠,直到可以获取它。** 互斥锁稍后必须由获取它的同一任务释放。不允许递归锁定。如果不先解锁互斥锁,任务可能无法退出。此外,在互斥锁仍处于锁定状态的情况下,不得释放互斥锁所在的内核内存。必须先初始化 (或静态定义) mutex ,然后才能锁定它。memset()将互斥锁设置为 0 是不允许的。** (CONFIG_DEBUG_MUTEXES .config 选项打开调试检查,这些检查将强制实施限制,并且还将执行死锁调试)** 此函数类似于(但不等效于)down()。*/
void __sched mutex_lock(struct mutex *lock)
{might_sleep();if (!__mutex_trylock_fast(lock))__mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
mutex_unlock
/*** mutex_unlock - 释放互斥锁* @lock:需要释放的互斥锁** 解锁之前被此任务锁定的互斥锁。** 此函数不得在中断上下文中使用。不允许解锁未锁定的互斥锁。** 调用方必须确保互斥锁在此函数返回之前保持活动状态 - mutex_unlock() 不能直接用于释放对象,以便另一个并发任务可以释放它。在这个方面,互斥锁与自旋锁和引用计数不同。** 此函数类似于(但不等效于)up()。*/
void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOCif (__mutex_unlock_fast(lock))return;
#endif__mutex_unlock_slowpath(lock, _RET_IP_);
}
EXPORT_SYMBOL(mutex_unlock);
__mutex_init
void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{atomic_long_set(&lock->owner, 0);raw_spin_lock_init(&lock->wait_lock);INIT_LIST_HEAD(&lock->wait_list);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNERosq_lock_init(&lock->osq);
#endifdebug_mutex_init(lock, name, key);
}
EXPORT_SYMBOL(__mutex_init);
kernel/locking/rtmutex.c 实时互斥锁(Real-Time Mutex) 支持优先级继承的内核锁
历史与背景
这项技术是为了解决什么特定问题而诞生的?
kernel/locking/rtmutex.c
实现的实时互斥锁(RT-Mutex)是为了解决在实时(Real-Time)系统中,标准mutex
所面临的一个致命问题:无界优先级反转(Unbounded Priority Inversion)。
优先级反转是一个经典的多任务调度问题:
- 一个低优先级任务(L)获取了一个
mutex
锁。 - 一个高优先级任务(H)试图获取同一个锁,但因为锁被L持有,H被迫进入睡眠等待。
- 此时,一个中优先级任务(M)就绪并开始运行。由于M的优先级高于L,它会抢占L。
- 结果:高优先级任务H不仅要等待低优先级任务L释放锁,还要等待与该锁完全无关的中优先级任务M执行完毕。H的等待时间变得不可预测,取决于M的执行时间,这就是“无界优先级反转”。
在通用操作系统中,这可能只会导致性能下降。但在硬实时系统中(如航空、工业控制),任务必须在严格的截止时间(Deadline)内完成,无界的优先级反转是不可接受的。
rtmutex
通过实现**优先级继承协议(Priority Inheritance Protocol)**来解决这个问题:
- 当高优先级任务H因等待低优先级任务L持有的锁而阻塞时,系统会临时地将L的优先级提升到与H相同。
- 这样,中优先级任务M就无法再抢占L了。
- L会以高优先级继续执行,尽快完成临界区并释放锁。
- 一旦L释放锁,它的优先级会恢复到原来的水平,而H则可以立即获取锁并继续执行。
它的发展经历了哪些重要的里程碑或版本迭代?
RT-Mutex的引入是Linux内核向实时化演进的一个关键里程碑,与PREEMPT_RT
(实时抢占)补丁集的整合过程紧密相关。
- 作为
PREEMPT_RT
的核心组件:RT-Mutex最初是在Ingo Molnar领导的PREEMPT_RT
补丁集中被开发和完善的。它是将Linux从一个通用系统转变为一个功能齐全的实时操作系统的核心技术之一。 - 逐渐并入主线内核:随着
PREEMPT_RT
的成熟,其核心组件被逐步合并到主线内核中。rtmutex.c
的并入使得主线内核也具备了支持优先级继承的能力,尽管默认情况下可能并未对所有mutex
启用。 - 替换标准
mutex
:在完全启用PREEMPT_RT
的内核配置中,rtmutex
的实现会完全取代标准的mutex
实现。这意味着,当CONFIG_PREEMPT_RT
被选中时,内核中所有的mutex_lock()
调用实际上都会路由到rtmutex
的加锁逻辑。 - PI-Futex:将优先级继承的能力扩展到了用户空间,通过
FUTEX_LOCK_PI
等操作,使得用户空间的多线程程序也能利用内核的rtmutex
机制来避免优先级反-转。
目前该技术的社区活跃度和主流应用情况如何?
rtmutex.c
是Linux实时内核的核心,其代码非常稳定和关键。社区的活动主要由实时Linux社区驱动,集中在:
- 确保其在各种复杂场景下的正确性和性能。
- 修复与优先级继承相关的、非常微妙的边界情况bug。
- 持续将其与主线内核的最新变化进行同步和整合。
rtmutex
是所有需要确定性(deterministic)行为的Linux系统的基础,被广泛应用于:
- 工业自动化和机器人
- 电信基础设施
- 航空航天和汽车电子(特别是自动驾驶)
- 专业音频和视频处理
核心原理与设计
它的核心工作原理是什么?
rtmutex
的核心是在标准mutex
的“睡眠-唤醒”机制之上,增加了一套复杂的优先级跟踪和动态调整逻辑。
-
数据结构 (
struct rt_mutex
):owner
: 不再只是一个简单的指针,而是一个经过编码的字段,包含了任务指针和一些状态位。wait_lock
: 同样是一个raw_spinlock_t
,用于保护等待队列。wait_list
: 这是关键区别。它不再是一个简单的链表,而是一个按任务优先级排序的红黑树(Red-Black Tree)。这使得查找最高优先级的等待者等操作非常高效。
-
加锁 (
rt_mutex_lock
) 的流程:- 快速路径:与标准
mutex
类似,尝试一次原子的cmpxchg
来获取锁。 - 慢速路径(有竞争时):
- 进入慢速路径处理函数 (
rt_mutex_slowlock
)。 - 优先级继承:这是核心。函数会检查当前锁的持有者(owner)的优先级是否低于当前任务(waiter)的优先级。如果是,它会启动一个优先级提升(boosting)的过程,将owner的优先级临时提升到与waiter相同。这个过程可能是递归的,如果owner也在等待另一个锁,优先级提升会沿着这个“等待链”传播下去,这被称为死锁检测与处理的一部分。
- 加入等待树:将当前任务插入到
wait_list
这个按优先级排序的红黑树中。 - 睡眠:将当前任务设置为睡眠状态并调用
schedule()
。
- 进入慢速路径处理函数 (
- 快速路径:与标准
-
解锁 (
rt_mutex_unlock
) 的流程:- 慢速路径 (
rt_mutex_slowunlock
):- 恢复优先级:首先,检查当前任务(即将释放锁的owner)是否被提升了优先级。如果是,需要将其优先级恢复到原来的水平。这个过程同样复杂,需要考虑它是否还在等待其他锁。
- 选择下一个所有者:从
wait_list
红黑树中找到并移除优先级最高的那个等待任务。 - 交接所有权:将锁的
owner
设置为这个新的最高优先级任务。 - 唤醒:唤醒这个新owner。
- 慢速路径 (
它的主要优势体现在哪些方面?
- 解决优先级反转:这是其最核心的优势,为实时任务提供了可预测的执行时间保证。
- 死锁检测:其复杂的等待链跟踪机制,天然地集成了一个运行时的死锁检测器。如果加锁操作形成了一个依赖环路,
rt_mutex_lock
会检测到并返回-EDEADLK
错误。 - 性能:尽管比标准
mutex
复杂,但其实现经过高度优化,在无竞争时性能与标准mutex
相当。
它存在哪些已知的劣势、局限性或在特定场景下的不适用性?
- 复杂性:
rtmutex
的实现非常复杂,理解其完整的优先级继承和死锁检测逻辑需要深入的专业知识。 - 开销:在有竞争的情况下,维护优先级排序的红黑树、执行优先级提升/恢复等操作,其开销比标准
mutex
的慢速路径要高。 - 与
PREEMPT_RT
的强关联:虽然主线内核中存在rtmutex
的代码,但其全部威力只有在完全的PREEMPT_RT
配置下才能发挥(例如,将自旋锁也替换为可抢占的睡眠锁)。
使用场景
在哪些具体的业务或技术场景下,它是首选解决方案?
rtmutex
是任何需要实时确定性的内核同步场景下的唯一正确选择。
PREEMPT_RT
内核:在配置了PREEMPT_RT
的内核中,它就是标准的mutex
,被用于所有需要睡眠锁的地方。- PI-Futex:用户空间实时应用程序通过
FUTEX_LOCK_PI
来利用内核的rtmutex
,以避免用户空间线程间的优先级反转。这是构建用户空间实时同步原语(如实时pthread_mutex
)的基础。
是否有不推荐使用该技术的场景?为什么?
- 非实时通用系统:在标准的、非实时的Linux内核配置中,没有必要刻意去使用
rtmutex
的API(除非是为了实现PI-Futex)。标准的mutex
实现更简单、在竞争下的开销略低,并且已经能满足通用系统的需求。 - 中断上下文:与标准
mutex
一样,rtmutex
是睡眠锁,绝对不能在中断上下文中使用。
对比分析
请将其 与 标准mutex
进行详细对比。
特性 | 实时互斥锁 (RT-Mutex) | 标准互斥锁 (Mutex) |
---|---|---|
核心功能 | 优先级继承,解决优先级反转。 | 提供基本的互斥访问。 |
等待队列 | 按优先级排序的红黑树。 | 简单的FIFO链表。 |
性能(无竞争) | 非常高 (快速路径),与标准mutex 几乎相同。 | 非常高 (快速路径)。 |
性能(有竞争) | 较高,但有维护红黑树和优先级调整的额外开销。 | 较高,但慢速路径逻辑比rtmutex 简单。 |
死锁检测 | 是,内置于优先级继承的等待链跟踪中。 | 否 (调试版本DEBUG_MUTEXES 可以提供一些检测)。 |
复杂性 | 非常高。 | 中等。 |
主要目标 | 可预测性 (Predictability) 和 确定性 (Determinism)。 | 吞吐量 (Throughput) 和 公平性 (Fairness)。 |
典型应用 | 实时系统 (PREEMPT_RT ),PI-Futex。 | 通用Linux内核中的所有可睡眠临界区。 |
kernel/locking/rtmutex.c
优先级继承
优先级继承(Priority Inheritance)问题与解决流程图
1. 问题场景:优先级反转 (Priority Inversion)
首先,我们需要定义问题。优先级继承机制是为了解决“优先级反转”而生的。
参与者:
- H: 高优先级任务 (High Priority Task)
- M: 中等优先级任务 (Medium Priority Task)
- L: 低优先级任务 (Low Priority Task)
- Lock: 一个实时互斥锁 (RT-Mutex)
流程图: 优先级反转的发生
+---------------------------------+ +--------------------------------+ +---------------------------------+
| 任务 L (低优) | | 任务 M (中优) | | 任务 H (高优) |
+---------------------------------+ +--------------------------------+ +---------------------------------+| |v |
[1] L 开始运行 || |v |
[2] L 获取 Lock || || v
[3] H 变为就绪态,优先级高于L,抢占L ---------------------------------------------------> [4] H 开始运行| || v| [5] H 尝试获取 Lock| | (发现被L持有)| v| [6] H 进入睡眠,等待Lock| || |
[7.1] L 被调度器选中,恢复运行 (因为H在睡眠) <------------------------------------------------------||v
[7.2] M 变为就绪态,优先级高于L,抢占L ------> [8] M 开始运行| (M 与 Lock 无关)| |v v
[9] **问题发生**: M正在运行, [10] M 持续运行,消耗CPU时间...L 无法运行,因此无法释放Lock。|v[11] M 运行结束或睡眠|
[12] L 终于有机会再次运行 <------------------------------|v
[13] L 释放 Lock||
[14] 内核唤醒 H <---------------------------------------------------------------------------> [15] H 终于被唤醒并运行
问题分析: 在步骤,最高优先级的任务H在等待,但CPU时间却被一个不相关的中等优先级任务M“偷走”了。这就是优先级反转。H的有效优先级,实际上降到了比M还低的水平。
2. 解决方案:优先级继承 (Priority Inheritance)
现在,我们来看引入了优先级继承机制后,流程是如何变化的。
流程图: 优先级继承的解决过程
+---------------------------------+ +--------------------------------+ +---------------------------------+
| 任务 L (低优) | | 任务 M (中优) | | 任务 H (高优) |
+---------------------------------+ +--------------------------------+ +---------------------------------+| |v |
[1] L 开始运行 || |v |
[2] L 获取 Lock || || v
[3] H 变为就绪态,抢占L -----------------------------------------------------------------> [4] H 开始运行| |v v| [5] H 尝试获取 Lock| | (发现被L持有)| v
[6] **PI启动**: 内核检测到H在等待L, <----------------------------------------------------- [7] H 进入睡眠,等待Lock将 L 的有效优先级**提升**至与 H 相同。**L.eff_prio = H.prio**|v
[8] M 变为就绪态。此时 M.prio < L.eff_prio。M **无法**抢占 L。|v
[9] **问题解决**: L 继续以H的高优先级运行,没有被M打断。|v
[10] L 快速执行其临界区代码...|v
[11] L 释放 Lock。内核**立即**将 L 的优先级**恢复**到其原始的低优先级。||
[12] 内核唤醒 H <---------------------------------------------------------------------------> [13] H 被唤醒,由于其优先级最高,立即抢占L并运行。
解决方案分析:
- 核心步骤: 当H阻塞在L持有的锁上时,内核执行了优先级捐赠(Priority Donation)。L的有效优先级被临时提升,继承了H的优先级。
- 关键结果: 由于L现在具有高优先级,中等优先级的M无法再抢占它。这保证了持有锁的L能够尽快运行,从而尽快释放锁。
- 优先级恢复: 一旦锁被释放,继承关系就解除了,L的优先级必须立即恢复原状,防止它继续“滥用”不属于它的高优先级。
- 及时响应: H一旦被唤醒,就能立刻获得CPU,其等待时间被显著缩短,只取决于L执行其临界区的必要时间。
内核实现流程 (rt_mutex_adjust_prio_chain
的角色)
上面的流程图是宏观逻辑,而rt_mutex_adjust_prio_chain
则是实现这个逻辑的微观算法。
- 当步骤 **** 发生时,H调用
rt_mutex_lock()
,发现锁被持有,此时就会触发一次对rt_mutex_adjust_prio_chain
的调用,将H的优先级沿着依赖链向上“推(push)”给L。 - 当步骤 **** 发生时,L调用
rt_mutex_unlock()
,释放锁。解锁操作会再次触发对rt_mutex_adjust_prio_chain
的调用,但这次的目的是“去继承(deboosting)”,即检查L是否还需要保持高优先级。由于H已经被唤醒,不再是等待者,L的优先级就会被重新计算并恢复到其基础值。 - 如果在
sched_setscheduler
中改变了H或L的基础优先级,也会调用rt_mutex_adjust_prio_chain
来重新计算和平衡整条链的优先级。
这个宏观流程图清晰地展示了优先级继承机制所要解决的核心问题,以及它通过动态调整任务优先级来保证高优先级任务响应性的基本原理。
rt_mutex_adjust_prio_chain 调整优先级链的实时互斥锁
/** 调整优先级链。也用于死 deadlock 检测。* 将任务的引用计数减少一 —— 因此可能会释放该任务。** @task: 拥有该互斥锁的任务(owner),可能需要对其进行链遍历。* @chwalk: 我们是否必须执行 deadlock 检测?* @orig_lock: 该互斥锁 (如果我们是为了一个刚刚调整了优先级且正在等待互斥锁的* 任务而遍历链以重新检查事物,可以为 NULL)* @next_lock: 在我们在我们丢弃其 pi_lock 之前,@orig_lock 的持有者所阻塞* 的那个互斥锁。永远不会被解引用,仅用于比较以检测锁链的变化。* @orig_waiter: 刚刚将其优先级“捐赠”给互斥锁持有者的任务所对应的* rt_mutex_waiter 结构体 (在上面描绘的情况下,或者如果* 顶层等待者已经离开并且我们实际上在为持有者降级优先级时,* 可以为 NULL)* @top_task: 当前的顶层等待者任务。** 返回 0 或 -EDEADLK。** 链遍历基础和保护范围** [R] 任务的引用计数* [Pn] task->pi_lock 被持有* [L] rtmutex->wait_lock 被持有** 正常的加锁顺序:** rtmutex->wait_lock* task->pi_lock** 步骤 描述 受...保护* 函数参数:* @task [R]* @orig_lock if != NULL @top_task 正阻塞于其上* @next_lock 无保护。不能被解引用。仅用于比较。* @orig_waiter if != NULL @top_task 正阻塞于其上* @top_task 当前任务,或在代理锁定的情况下由调用代码保护** again:* loop_sanity_check();* retry:* [1] lock(task->pi_lock); [R] 获取 [P1]* [2] waiter = task->pi_blocked_on; [P1]* [3] check_exit_conditions_1(); [P1]* [4] lock = waiter->lock; [P1]* [5] if (!try_lock(lock->wait_lock)) { [P1] 尝试获取 [L]* unlock(task->pi_lock); 释放 [P1]* goto retry;* }* [6] check_exit_conditions_2(); [P1] + [L]* [7] requeue_lock_waiter(lock, waiter); [P1] + [L]* [8] unlock(task->pi_lock); 释放 [P1]* put_task_struct(task); 释放 [R]* [9] check_exit_conditions_3(); [L]* [10] task = owner(lock); [L]* get_task_struct(task); [L] 获取 [R]* lock(task->pi_lock); [L] 获取 [P2]* [11] requeue_pi_waiter(tsk, waiters(lock));[P2] + [L]* [12] check_exit_conditions_4(); [P2] + [L]* [13] unlock(task->pi_lock); 释放 [P2]* unlock(lock->wait_lock); 释放 [L]* goto again;** 其中 P1 是阻塞的任务,P2 是锁的持有者;向上走一步,* 持有者就成为下一个阻塞的任务,以此类推...**/
static int __sched rt_mutex_adjust_prio_chain(struct task_struct *task,enum rtmutex_chainwalk chwalk,struct rt_mutex_base *orig_lock,struct rt_mutex_base *next_lock,struct rt_mutex_waiter *orig_waiter,struct task_struct *top_task)
{/** 局部变量声明与注释*//* @waiter: 指向当前迭代中,'task'正阻塞于其上的rt_mutex_waiter结构体。*/struct rt_mutex_waiter *waiter;/* @top_waiter: 跟踪整个依赖链中,当前已知的最高优先级等待者。初始值为orig_waiter。*/struct rt_mutex_waiter *top_waiter = orig_waiter;/* @prerequeue_top_waiter: 在对锁的等待者树进行修改“之前”,临时保存当时的最高优先级等待者,用于事后比较。*/struct rt_mutex_waiter *prerequeue_top_waiter;/* @ret: 函数的返回值。默认为0(成功),如果检测到死锁,则设为-EDEADLK。*/int ret = 0;/* @depth: 迭代深度计数器。用于防止因过长的锁链或未检测到的循环导致的无限递归,是一种安全保护机制。*/int depth = 0;/* @lock: 指向当前迭代中,'task'正阻塞于其上的那个rt_mutex_base锁。*/struct rt_mutex_base *lock;/* @detect_deadlock: 布尔标志。根据输入参数chwalk决定本次遍历是否需要执行死锁检测逻辑。*/bool detect_deadlock;/* @requeue: 布尔标志。决定是否需要执行修改优先级和重新入队的核心逻辑。在某些“只看不动”的死锁检测路径上,会被设为false。*/bool requeue = true;// 获取当前任务的pi_lock引用计数detect_deadlock = rt_mutex_cond_detect_deadlock(orig_waiter, chwalk);/** 提升或降低优先级是一个充满陷阱的分步过程。我们希望它是可抢占的,* 并且每步最多持有两个锁。所以我们必须小心地检查我们操作期间* 是否有东西发生了改变。*/
again:/** 我们为每次调用限制锁链的长度。*/if (++depth > max_lock_depth) {static int prev_max;/** 只打印一次。如果管理员改变了限制,当再次达到限制时打印一条新消息。*/if (prev_max != max_lock_depth) {prev_max = max_lock_depth;printk(KERN_WARNING "Maximum lock depth %d reached ""task: %s (%d)\n", max_lock_depth,top_task->comm, task_pid_nr(top_task));}// 如果超过了最大锁深度,释放task的引用计数并返回死锁错误。put_task_struct(task);return -EDEADLK;}/** 我们在这里是完全可抢占的,只持有对@task的引用计数。所以* 自从调用者或我们自己的代码下面(goto retry/again)释放了所有* 锁之后,任何事情都可能已经改变了。*/
retry:/** [1] 任务不会消失,因为我们之前做了 get_task()!*/raw_spin_lock_irq(&task->pi_lock);/** [2] 获取@task所阻塞于其上的waiter。*/waiter = task->pi_blocked_on;/** [3] 在task->pi_lock保护下检查退出条件1。*//** 检查增强链的末端是否已达到,或者在我们释放锁期间链的状态是否已改变。*/if (!waiter)goto out_unlock_pi;/** 检查orig_waiter的状态。在我们释放锁后,锁的前一个持有者可能已经释放了锁。*/if (orig_waiter && !rt_mutex_owner(orig_lock))goto out_unlock_pi;/** 在我们获取对@task的引用计数后,我们释放了所有锁,所以该任务可能* 已经在锁链中前进了,甚至完全离开了锁链,现在阻塞在一个不相关的锁* 或@orig_lock上。** 我们将@task所阻塞的锁存储在@next_lock中,所以我们可以检测到链的变化。*/if (next_lock != waiter->lock)goto out_unlock_pi;/** 由于ww_mutex,锁图中可能会有‘虚假的’循环,考虑:** P1: A, ww_A, ww_B* P2: ww_B, ww_A* P3: A** P3不应该返回-EDEADLK,因为它被P1和P2创建的循环困住了(这个循环会解决* —— 并会触发上面的max_lock_depth)。因此,禁用detect_deadlock,* 以便在所有相关任务都被增强后,下面的终止条件可以触发。** 即使我们从ww_mutex开始,我们也可以禁用死锁检测,因为我们无论如何都* 会在[6]处抑制一个由ww_mutex引发的死锁。然而,在这里抑制它是不够的,* 因为我们仍可能因为调整驱动的迭代而碰到[6]。** 注意:如果有人在两个ww_class之间创建了死锁,我们完全无法报告它;* lockdep应该能做到。*/if (IS_ENABLED(CONFIG_PREEMPT_RT) && waiter->ww_ctx && detect_deadlock)detect_deadlock = false;/** 当任务没有等待者时退出。注意,top_waiter可以为NULL,* 当我们处于降级增强(deboosting)模式时!*/if (top_waiter) {if (!task_has_pi_waiters(task))goto out_unlock_pi;/** 如果关闭了死锁检测,当我们不是任务的最高pi等待者时,我们在这里停止。* 如果开启了死锁检测,我们继续,但在链遍历中停止重新排队。*/if (top_waiter != task_top_pi_waiter(task)) {if (!detect_deadlock)goto out_unlock_pi;elserequeue = false;}}/** 如果waiter的优先级与任务的优先级相同,那么就不需要进一步的优先级* 调整了。如果关闭了死锁检测,我们停止链遍历。如果开启了,我们继续,* 但在链遍历中停止重新排队。*/if (rt_waiter_node_equal(&waiter->tree, task_to_waiter_node(task))) {if (!detect_deadlock)goto out_unlock_pi;elserequeue = false;}/** [4] 获取下一个锁;在持有task->pi_lock期间,我们无法解锁,并能保证@lock的存在。*/lock = waiter->lock;/** [5] 我们在这里需要使用trylock,因为我们正持有task->pi_lock,* 这与其他的rtmutex操作是相反的加锁顺序。** 根据以上所述,持有task->pi_lock保证了lock的存在,所以* 从生命周期的角度来看,反转这个加锁顺序是不可行的。*/if (!raw_spin_trylock(&lock->wait_lock)) {raw_spin_unlock_irq(&task->pi_lock);cpu_relax();goto retry;}/** [6] 在task->pi_lock和lock->wait_lock保护下检查退出条件2。** 死锁检测。如果这个锁与最初导致我们遍历锁链的原始锁相同,* 或者如果当前锁的持有者是发起链遍历的任务,我们就检测到了死锁。*/if (lock == orig_lock || rt_mutex_owner(lock) == top_task) {ret = -EDEADLK;/** 当死锁是由于ww_mutex引起的;另见上文。不报告死锁,* 而是让ww_mutex的wound/die逻辑来选择哪个竞争线程获取-EDEADLK。** 注意:假设该循环只包含一个ww_class;任何其他配置我们都无法报告;* 另见lockdep。*/if (IS_ENABLED(CONFIG_PREEMPT_RT) && orig_waiter && orig_waiter->ww_ctx)ret = 0;raw_spin_unlock(&lock->wait_lock);goto out_unlock_pi;}/** 如果我们只是为了死锁检测而跟随锁链,就不需要做所有重新排队的操作。* 为了避免在下面各个地方使用大量的条件判断,只做最小的链遍历检查。*/if (!requeue) {/** 这里没有重新排队[7]。只释放@task [8]*/raw_spin_unlock(&task->pi_lock);put_task_struct(task);/** [9] 在lock->wait_lock保护下检查退出条件3。* 如果锁没有持有者,则链结束。*/if (!rt_mutex_owner(lock)) {raw_spin_unlock_irq(&lock->wait_lock);return 0;}/* [10] 获取下一个任务,即@lock的持有者 */task = get_task_struct(rt_mutex_owner(lock));raw_spin_lock(&task->pi_lock);/** 这里没有重新排队[11]。我们只做死锁检测。** [12] 存储持有者是否自己也被阻塞。在释放锁后做出决定。*/next_lock = task_blocked_on_lock(task);/** 获取下一次迭代的最高等待者。*/top_waiter = rt_mutex_top_waiter(lock);/* [13] 释放锁 */raw_spin_unlock(&task->pi_lock);raw_spin_unlock_irq(&lock->wait_lock);/* 如果持有者没有被阻塞,则链结束。*/if (!next_lock)goto out_put_task;goto again;}/** 在对@lock执行重新排队操作前,存储当前的最高等待者。* 我们在下面做增强/降级(boost/deboost)决策时需要它。*/prerequeue_top_waiter = rt_mutex_top_waiter(lock);/* [7] 在锁的等待者树中重新排队该waiter。*/rt_mutex_dequeue(lock, waiter);/** 既然我们已经出队了,现在更新waiter的优先级字段。** 这些值可能通过以下方式改变:** sys_sched_set_scheduler() / sys_sched_setattr()** 或者** DL CBS强制执行,推进了有效截止时间。*/waiter_update_prio(waiter, task);rt_mutex_enqueue(lock, waiter);/** [8] 释放(阻塞的)任务,为在[10]中获取持有者任务做准备。** 因为我们持有lock->wait_lock,所以即使我们释放了task->pi_lock,* task也无法解除阻塞。*/raw_spin_unlock(&task->pi_lock);put_task_struct(task);/** [9] 在lock->wait_lock保护下检查退出条件3。** 即使在死锁检测的情况下,如果锁没有持有者,我们也必须中止链遍历,* 因为这里我们无处可循。这是我们正在遍历的链的末端。*/if (!rt_mutex_owner(lock)) {/** 如果上面的重新排队[7]改变了最高等待者,* 那么我们需要唤醒新的最高等待者来尝试获取锁。*/top_waiter = rt_mutex_top_waiter(lock);if (prerequeue_top_waiter != top_waiter)wake_up_state(top_waiter->task, top_waiter->wake_state);raw_spin_unlock_irq(&lock->wait_lock);return 0;}/** [10] 获取下一个任务,即@lock的持有者。** 在持有lock->wait_lock并检查了!owner之后,必然有一个持有者,* 并且它不会消失。*/task = get_task_struct(rt_mutex_owner(lock));raw_spin_lock(&task->pi_lock);/* [11] 如果必要,重新排队pi等待者 */if (waiter == rt_mutex_top_waiter(lock)) {/** 该waiter成为了锁上新的最高(优先级)等待者。* 用这个waiter替换持有者任务的pi等待者树中前一个最高等待者,* 并调整持有者的优先级。*/rt_mutex_dequeue_pi(task, prerequeue_top_waiter);waiter_clone_prio(waiter, task);rt_mutex_enqueue_pi(task, waiter);rt_mutex_adjust_prio(lock, task);} else if (prerequeue_top_waiter == waiter) {/** 该waiter曾是锁上的最高等待者,但不再是最高优先级等待者了。* 用新的最高(优先级)等待者替换持有者任务的pi等待者树中的waiter,* 并调整持有者的优先级。* 新的最高等待者被存储在@waiter中,以便下面的@waiter == @top_waiter* 评估为真,我们继续为链的其余部分降级优先级。*/rt_mutex_dequeue_pi(task, waiter);waiter = rt_mutex_top_waiter(lock);waiter_clone_prio(waiter, task);rt_mutex_enqueue_pi(task, waiter);rt_mutex_adjust_prio(lock, task);} else {/** 没有任何改变。不需要做任何优先级调整。*/}/** [12] 在task->pi_lock和lock->wait_lock保护下检查退出条件4。* 实际的决定在我们释放锁后做出。** 检查持有当前锁的任务是否自己也被pi阻塞了。如果是,我们* 存储一个指向该锁的指针,用于上面的锁链变化检测。在我们释放* task->pi_lock之后,next_lock就不能再被解引用了。*/next_lock = task_blocked_on_lock(task);/** 存储@lock的最高等待者,用于下面链遍历结束的决策。*/top_waiter = rt_mutex_top_waiter(lock);/* [13] 释放锁 */raw_spin_unlock(&task->pi_lock);raw_spin_unlock_irq(&lock->wait_lock);/** 基于存储的值,做出实际的退出决定[12]。** 我们到达了锁链的末端。就在这里停止。没必要再回去才发现这一点。*/if (!next_lock)goto out_put_task;/** 如果当前waiter不是锁上的最高等待者,那么如果我们不是在* 完全死锁检测模式下,我们可以在这里停止链遍历。*/if (!detect_deadlock && waiter != top_waiter)goto out_put_task;goto again;out_unlock_pi:raw_spin_unlock_irq(&task->pi_lock);
out_put_task:put_task_struct(task);return ret;
}
kernel/locking/rtmutex_api.c
rt_mutex_adjust_pi 重新检查优先级继承(pi)链
/** 注释:重新检查优先级继承(pi)链,以防我们收到了一个优先级设置请求。** 注释:从sched_setscheduler中调用。** @task: 指向其调度属性刚刚被修改的那个任务。*/
void __sched rt_mutex_adjust_pi(struct task_struct *task)
{struct rt_mutex_waiter *waiter;struct rt_mutex_base *next_lock;unsigned long flags;/** 获取保护任务PI相关字段(如pi_blocked_on)的自旋锁,并保存中断状态。*/raw_spin_lock_irqsave(&task->pi_lock, flags);/** 获取任务当前正在等待的rt_mutex_waiter结构体。* 如果为NULL,表示任务没有在等待任何RT-Mutex。*/waiter = task->pi_blocked_on;/** 如果waiter为NULL,或者waiter的树节点与任务的等待节点相等* (这可能表示一种稳定的、已处理过的状态),则无需做任何事。* 直接解锁并返回。*/if (!waiter || rt_waiter_node_equal(&waiter->tree, task_to_waiter_node(task))) {raw_spin_unlock_irqrestore(&task->pi_lock, flags);return;}/* 如果任务确实在等待,获取它正在等待的那个锁的指针。*/next_lock = waiter->lock;/** 我们已经获取了所有需要的信息,可以释放pi_lock了,因为* rt_mutex_adjust_prio_chain内部会处理更复杂的加锁。*/raw_spin_unlock_irqrestore(&task->pi_lock, flags);/** 注释:这个引用计数会在rt_mutex_adjust_prio_chain()中被减少!* 增加task的引用计数,防止在接下来的链调整过程中,该task被释放。*/get_task_struct(task);/** 调用核心的优先级链调整函数。* @task: 作为本次调整的起点任务。* @RT_MUTEX_MIN_CHAINWALK: 一个标志,指示链调整的深度或行为。* @NULL, @next_lock, @NULL: 其他参数,用于指定旧锁、新锁等,* 在这里的上下文中,主要关注next_lock。* @task: 将task自身作为顶级所有者传入。*/rt_mutex_adjust_prio_chain(task, RT_MUTEX_MIN_CHAINWALK, NULL,next_lock, NULL, task);
}