Linux 内核空间 并发竞争处理 共享资源线程同步
一、并发竞争概述
并发与并行是软件工程和操作系统领域中的两个核心概念。
并发是指多个任务在时间上交替执行,虽然宏观上是同时进行,但是微观上同一时刻只有一个线程在进行动作,通过系统调度中断进行切换
并行是指在同一时间内,多个任务在多个处理器(多核CPU)上真正同时执行。
二、并发导致的竞争
由上面可知,同一实际只有一个任务在cpu中进行,但是由于linux系统支持多任务切换和中断,导致会发生资源的竞争。简单说就是任务1在使用资源(某个内存数据),此时发生了任务切换到任务2,任务2同时也操作了同一个内存数据。多个任务同时访问同一片内存区域的情况,这些任务可能会相互覆盖这段内存中的数据,造成内存数据混乱。
并发源 | 产生原因 |
---|---|
多线程并发访问 | |
抢占式并发访问 | 一个任务在内核态执行时,可能被更高优先级的任务抢 |
中断程序并发访问 | 中断程序打断当前正在执行的代码 |
SMP(多核)核间并发访问 | 多个 CPU 核心同时在内核中执行代码 |
三、共享资源的保护
所谓的临界区就是共享数据段(某段内存数据),对于临界区必须保证一次只有一个线程访问,也就是要保证临界区是原子访问的,原子访问就表示这一个访问是一个步骤,不能再进行拆分。
防止并发访问共享资源,linux提供了多种方法和处理机制,下面主要分两部分进行描述,一种是linux提供的基础方法api接口,一种是我们在实现编写驱动需要主动规避(编写程序的思维)
方法 | 描述 |
---|---|
原子锁 | |
自旋锁 | |
信号量 | |
互斥体 | |
RCU | 一种免锁机制 |
中断禁用 | local_irq_disable() / local_irq_enable() local_irq_save(flags) / local_irq_restore(flags) ) 不能防止 SMP 并发。关中断时间必须非常短。 |
内核抢占禁用 | preempt_disable()/preempt_enable() |
Per-CPU Variables | DEFINE_PER_CPU()/get_cpu_var()/put_cpu_var() |
3.1 原子锁
原子操作就是指不能再进一步分割的操作,一般原子操作用于变量或者位操作。假如现在要对无符号整形变量 a 赋值,值为 3,对于 C 语言来讲很简单,直接就是:
原子整形操作 API 函数
用于整型变量和位操作
- 接口位于include/linux/types.h
- /arch/arm/include/asm/atomic.h
接口 | |
---|---|
atomic_t | 变量定义 |
ATOMIC_INIT(int i) | 原子变量初始化。 |
int atomic_read(atomic_t*v) | 读取 v的值,并且返回 |
void atomic_set(atomic_t *v, int i) | 向 v写入 i值。 |
void atomic_add(int i, atomic_t *v) | 给 v加上 i值。 |
void atomic_sub(int i, atomic_t *v) | 从 v减去 i值。 |
void atomic_inc(atomic_t *v) | 给 v加 1,也就是自增。 |
void atomic_dec(atomic_t *v) | 从 v减 1,也就是自减 。 |
int atomic_dec_return(atomic_t *v) | 从 v减 1,并且返回v的值 。 |
int atomic_inc_return(atomic_t *v) | 给 v加 1,并且返回 v的值。 |
int atomic_sub_and_test(int i, atomic_t *v) | 从 v减 i,如果结果为0就返回真,否则就返回假 |
int atomic_dec_and_test(atomic_t *v) | 从 v减 1,如果结果为0就返回真,否则就返回假 |
int atomic_inc_and_test(atomic_t *v) | 给 v加 1,如果结果为0就返回真,否则就返回假 |
int atomic_add_negative(int i, atomic_t *v) | 给 v加 i,如果结果为负就返回真,否则返回假 |
- 64位的整型原子操作将“atomic_”前缀换成“atomic64_”,将int换成long long
位原子操作函数
接口 | 描述 |
---|---|
void set_bit(int nr, void *p) | 将p地址的nr位置1 |
void clear_bit(int nr,void *p) | 将p地址的nr位清零 |
void change_bit(int nr, void *p) | 将p地址的nr位反转 |
int test_bit(int nr, void *p) | 获取p地址的nr位的值 |
int test_and_set_bit(int nr, void *p) | 将p地址的nr位置1,并且返回nr位原来的值 |
int test_and_clear_bit(int nr, void *p) | 将p地址的nr位清0,并且返回nr位原来的值 |
int test_and_change_bit(int nr, void *p) | 将p地址的nr位翻转,并且返回nr位原来的值 |
3.2 自旋锁
/include/linux/spinlock.h
含义
当一个线程要访问某个共享资源的时候首先要先获取相应的锁,锁只能被一个线程持有,
只要此线程不释放持有的锁,那么其他的线程就不能获取此锁。对于自旋锁而言,如果自旋锁
正在被线程 A 持有,线程 B 想要获取自旋锁,那么线程 B 就会处于忙循环-旋转-等待状态,线
程 B 不会进入休眠状态或者说去做其他的处理,而是会一直等待锁可用。
使用范围
- 原子操作只能对整形变量或者位进行保护,自旋锁用于大的资源保护
- 自旋锁API 函数适用于SMP或支持抢占的单CPU下线程之间的并发访问,也就是用于线程与线程之间
- 适用于锁持有时间非常短的场景
注意事项
-
(死锁问题)
- 睡眠阻塞引起的死锁
被自旋锁保护的临界区一定不能调用任何能够引起睡眠和阻塞的API 函数 否则的话会可能会导致死锁现象的发生。自旋锁会自动禁止抢占,也就说当线程 A得到锁以后会暂时禁止内核抢占。如果线程 A 在持有锁期间进入了休眠状态,那么线程 A 会自动放弃 CPU 使用权。线程 B 开始运行,线程 B 也想要获取锁,但是此时锁被 A 线程持有,而且内核抢占还被禁止了!线程 B 无法被调度出去,那么线程 A 就无法运行,锁也就无法释放,死锁发生。 - 中断导致的死锁
线程A先运行,并且获取到了lock这个锁,当线程A运行 functionA函数的时候中断发生了,中断抢走了CPU使用权。中断服务函数也要获取lock这个锁,但是这个锁被线程A占有着,中断就会一直自旋,等待锁有效。但是在中断服务函数执行完之前,线程A是不可能执行的,死锁发生 - 不能递归申请自旋锁
递归的方式申请一个正在持有的锁,那么就必须“自旋”,等待锁被释放,然而正处于“自旋”状态,根本没法释放锁。
- 睡眠阻塞引起的死锁
-
锁的持有时间不能太长
因为在等待自旋锁的时候处于“自旋”状态,因此锁的持有时间不能太长,一定要短,否则的话会降低系统性能。如果临界区比较大,运行时间比较长的话要选择其他的并发处理方式,比如的信号量和互斥体。 -
在编写驱动程序的时候我们必须考虑到驱动的可移植性,因此不管你用的是单核的还是多核的 SOC,都将其当做多核 SOC 来编写驱动程序。
API接口函数
线程间使用自旋锁
函数 | 描述 |
---|---|
spinlock_t | 变量定义 |
DEFINE_SPINLOCK(spinlock_t lock) | 定义并初始化一个自旋变量 |
int spin_lock_init(spinlock_t *lock) | 初始化自旋锁 |
void spin_lock(spinlock_t *lock) | 获取指定的自旋锁,也叫加锁 |
void spin_unlock(spinlock_t *lock) | 释放指定的自旋锁。 |
int spin_trylock(spinlock_t *lock) | 尝试获取指定的锁,如果没有获取到,返回0 |
int spin_is_locked(spinlock_t *lock) | 检查指定的自旋锁是否被获取,如果没有被获取返回非0,否则返回0. |
中断和线程间中使用自旋锁
在中断里面使用自旋锁的时候,在获取锁之前一定要先禁止本地中断(也就是本 CPU 中断,对于多核 SOC来说会有多个 CPU 核)
函数 | 描述 |
---|---|
void spin_lock_irq(spinlock_t *lock) | 禁止本地中断,并获取自旋锁。 |
void spin_unlock_irq(spinlock_t *lock) | 激活本地中断,并释放自旋锁。 |
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags) | 保存中断状态,禁止本地中断,并获取自旋锁。 |
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags) | 将中断状态恢复到以前的状态,并且激活本地中断,释放自旋锁。 |
使用 spin_lock_irq/spin_unlock_irq 的时候需要用户能够确定加锁之前的中断状态,实际上内核很庞大我们是很难确定某个时刻的中断状态,因此不推荐使用spin_lock_irq/spin_unlock_irq。
建议使用 spin_lock_irqsave/spin_unlock_irqrestore,这一组函数会保存中断状态,在释放锁的时候会恢复中断状态。
一般在线程中使用 spin_lock_irqsave/spin_unlock_irqrestore,在中断中使用 spin_lock/spin_unlock
DEFINE_SPINLOCK(lock) /* 定义并初始化一个锁 *//* 线程 A */
void functionA (){
unsigned long flags; /* 中断状态 */
spin_lock_irqsave(&lock, flags) /* 获取锁 *//* 临界区 */
spin_unlock_irqrestore(&lock, flags) /* 释放锁 */
}/* 中断服务函数 */
void irq() {
spin_lock(&lock) /* 获取锁 */
/* 临界区 */
spin_unlock(&lock) /* 释放锁 */
}
下半部(未完)
下半部竞争处理API函数
读写锁
读写自旋锁为读和写操作提供了不同的锁
- 一次只能一个线程持有写锁,而且不能进行读操作。
- 当没有写操作的时候允许一个或多个线程持有读锁,可以进行并发的读操作。
接口函数 | 描述 |
---|---|
rwlock_t | 变量定义 |
DEFINE_RWLOCK(rwlock_t lock) | 定义并初始化读写锁 |
void rwlock_init(rwlock_t *lock) | 初始化读写锁。 |
读锁 | |
---|---|
void read_lock(rwlock_t *lock) | 获取读锁。 |
void read_unlock(rwlock_t *lock) | 释放读锁。 |
void read_lock_irq(rwlock_t *lock) | 禁止本地中断,并且获取读锁。 |
void read_unlock_irq(rwlock_t *lock) | 打开本地中断,并且释放读锁。 |
void read_lock_irqsave(rwlock_t *lock,unsigned long flags) | 保存中断状态,禁止本地中断,并获取读锁。 |
void read_unlock_irqrestore(rwlock_t *lock,unsigned long flags) | 中断状态恢复到以前的状态,激活本地中断,释放读锁。 |
void read_lock_bh(rwlock_t *lock) | 关闭下半部,并获取读锁。 |
void read_unlock_bh(rwlock_t *lock) | 打开下半部,并释放读锁。 |
写锁 | |
---|---|
void write_lock(rwlock_t *lock) | 获取写锁。 |
void write_unlock(rwlock_t *lock) | 释放写锁。 |
void write_lock_irq(rwlock_t *lock) | 禁止本地中断,并且获取写锁。 |
void write_unlock_irq(rwlock_t *lock) | 打开本地中断,并且释放写锁。 |
void write_lock_irqsave(rwlock_t *lock,unsigned long flags) | 保存中断状态,禁止本地中断,并获取写锁。 |
void write_unlock_irqrestore(rwlock_t *lock,unsigned long flags) | 中断状态恢复到以前的状态,激活本地中断,释放读锁。 |
void write_lock_bh(rwlock_t *lock) | 关闭下半部,并获取读锁。 |
void write_unlock_bh(rwlock_t *lock) | 打开下半部,并释放读锁。 |
顺序锁
顺序锁是读写锁的优化版本,读写锁不允许同时读写,而使用顺序锁可以完成同时进行读和写的操作,但并不允许同时的写。虽然顺序锁可以同时进行读写操作,但并不建议这样,读取的过程并不能保证数据的完整性。
- 顺序锁保护的资源不能是指针,因为如果在写操作的时候可能会导致指针无效,而这个时候恰巧有读操作访问指针的话就可能导致意外发生,比如读取野指针导致系统崩溃
函数 | 描述 |
---|---|
seqlock_t | 变量定义 |
DEFINE_SEQLOCK(seqlock_t sl) | 定义并初始化顺序锁 |
void seqlock_ini seqlock_t *sl) | 初始化顺序锁 |
写锁 | 描述 |
---|---|
void write_seqlock(seqlock_t *sl) | 获取写顺序锁 |
void write_sequnlock(seqlock_t *sl) | 释放写顺序锁 |
void write_seqlock_irq(seqlock_t *sl) | 禁止本地中断,并且获取写顺序锁 |
void write_sequnlock_irq(seqlock_t *sl) | 打开本地中断,并且释放写顺序锁 |
void write_seqlock_irqsave(seqlock_t *sl,unsigned long flags) | 保存中断状态,禁止本地中断,并获取写顺序 |
void write_sequnlock_irqrestore(seqlock_t *sl,unsigned long flags) | 将中断状态恢复到以前的状态,并且激活本地中断,释放写顺序锁 |
void write_seqlock_bh(seqlock_t *sl) | 关闭下半部,并获取写读锁 |
void write_sequnlock_bh(seqlock_t *sl) | 打开下半部,并释放写读锁 |
写锁 | 描述 |
---|---|
DEFINE_RWLOCK(rwlock_t lock) | 读单元访问共享资源的时候调用此函数,此函数会返回顺序锁的顺序号 |
unsigned read_seqretry(const seqlock_t *sl,unsigned start) | 读结束以后调用此函数检查在读的过程中有没有对资源进行写操作,如果有的话就要重读 |
3.3 信号量
含义
与ucos类似。信号量本质上是一个计数器,用于进程间对共享数据对象的读取,它和管道有所不同,它不传送数据,它主要是用来保护共享资源(信号量也属于临界资源),使得资源在一个时刻只有一个进程独享。
分为计数型信号量和二值信号量。
这里只说内核空间的信号量,如果是用户空间,信号量接口不一样。
使用范围
- 与自旋锁相比,信号量可以使等待资源线程进入休眠状态,适用于那些占用资源比较久的场合。
- 信号量不能用于中断中,因为信号量会引起休眠,中断不能休眠
- 如果共享资源的持有时间比较短,不适合使用信号量了,因为休眠、切换线程引起较大开销
- 通过信号量控制访问资源的线程数,在初始化的时候将信号量值设置的大于 1,这个信号量就是计数型信号量,计数型信号量不能用于互斥访问,因为它允许多个线程同时访问共享资源。
- 如果要互斥的访问共享资源那么信号量的值就不能大于 1,此时信号量就是一个二值信号量。
API接口函数
接口函数 | 描述 |
---|---|
struct semaphore | 定义一个信号量 |
DEFINE_SEAMPHORE(name) | 定义一个信号量 |
void sema_init(struct semaphore *sem, int val) | 初始化信号量 sem,设置信号量值为 val。 |
void down(struct semaphore *sem) | 获取信号量。 |
int down_trylock(struct semaphore *sem); | 尝试获取信号量,如果能获取到信号量就获取,并且返回 0。如果不能就返回非 0,并且不会进入休眠。 |
int down_interruptible(struct semaphore *sem) | 获取信号量,和 down 类似,只是使用 down 进入休眠状态的线程不能被信号打断。而使用此函数进入休眠以后是可以被信号打断的。 |
void up(struct semaphore *sem) | 释放信号量 |
3.4 互斥体
在 FreeRTOS 和 UCOS 中也有互斥体,将信号量的值设置为 1 就可以使用信号量进行互斥访问了,虽然可以通过信号量实现互斥,但是 Linux 提供了一个比信号量更专业的机制来进行互斥,它就是互斥体—mutex。互斥访问表示一次只有一个线程可以访问共享资源,不能递归申请互斥体。在我们编写 Linux 驱动的时候遇到需要互斥访问的地方建议使用 mutex。
使用范围
- 类似信号量,但是是互斥访问
注意事项
- mutex 可以导致休眠,因此不能在中断中使用 mutex,中断中只能使用自旋锁。
- 和信号量一样,mutex 保护的临界区可以调用引起阻塞的 API 函数。
- 因为一次只有一个线程可以持有 mutex,因此,必须由 mutex 的持有者释放 mutex。并且 mutex 不能递归上锁和解锁。
API函数
函数 | 描述 |
---|---|
struct mutex | 定义mutex变量 |
DEFINE_MUTEX(name) | 定义并初始化一个 mutex变量 |
void mutex_init(mutex *lock) | 初始化 mutex |
void mutex_lock(struct mutex *lock) | 获取 mutex,也就是给 mutex上锁。如果获取不到就进休眠 |
void mutex_unlock(struct mutex *lock) | 释放 mutex,也就给 mutex解锁 |
int mutex_trylock(struct mutex *lock) | 判断 mutex是否被获取,如果是的话就返回,否则返回0 |
int mutex_lock_interruptible(struct mutex *lock) | 使用此函数获取信号量失败进入休眠以后可以被信号打断 |
3.5 RCU(未完)
https://www.cnblogs.com/codestack/p/12447983.html
https://blog.csdn.net/zhoutaopower/article/details/86646688
四、线程间同步机制(未完)
方法 | 描述 |
---|---|
等待队列(Wait Queues) | 任务在某个条件不满足时,睡眠在一个队列上,直到另一个任务(或中断)唤醒它 |
完成量(Completions) | |
内核定时器(Timers) |
五 编程思路
除了使用内核提供的同步原语,良好的设计可以从根本上减少竞争和死锁的风险。
总则: 保持临界区尽可能的短小。在进入临界区之前获取锁,做完对共享资源的最小必要操作后,立即释放锁。不要在临界区内进行耗时的操作(如文件 I/O、网络操作、等待用户输入等)。
- 识别所有共享资源:
在编写代码前,明确哪些是共享资源(全局变量、静态局部变量、硬件状态、链表等)。
为每一个共享资源明确一个锁(或其它同步机制)。不要试图用一个锁保护所有东西(性能差),也不要用过多的锁(复杂度高)。 - 锁的粒度:
粗粒度锁: 一个锁保护很多资源。简单,但并发性差。
细粒度锁: 为不同的资源使用不同的锁。并发性好,但设计复杂,容易引起死锁。
原则: 在满足安全性的前提下,尽可能使用细粒度锁以提高性能。 - 锁的顺序规则
当代码需要同时持有多个锁时,必须定义一个全局的、固定的获取顺序。所有代码都必须按照这个顺序来获取锁。
如果顺序不一致,就可能发生 **A 持有锁1等待锁2,B 持有锁2等待锁1” 的经典死锁场景。 - 避免在持有锁时睡眠:
自旋锁时绝对不能睡眠。
持有互斥体或信号量时,睡眠要非常小心,确保不会导致不可接受的延迟或意想不到的死锁。 - 中断上下文与进程上下文
如果中断处理程序和进程上下文共享数据,必须使用 spin_lock_irqsave() 来保护,以防止中断导致死锁。
考虑使用 tasklet 或 workqueue 将中断中的大部分工作推迟到进程上下文中执行,从而简化同步。 - 使用无锁算法和RCU:
对于读多写少的场景,可以考虑使用读-复制-更新 机制。它允许读者在无锁的情况下访问数据,性能极高。
无锁编程非常复杂,仅在性能瓶颈明确且无法通过其它方式优化时使用。
总结:
场景 | 推荐同步机制 |
---|---|
短临界区,SMP并发,中断并发 | spin_lock_irqsave() / spin_unlock_irqrestore() |
短临界区,仅需SMP安全并发,中断不参与竞争 | spin_lock() / spin_unlock() |
长临界区,中断不参与竞争 | sema_init信号量 |
长临界区,中断不参与竞争,任务间互斥 | mutex_lock() / mutex_unlock() |
简单的计数器/标志位 | atomic_t 操作 |
等待某个事件完成 | completion 机制 |
读操作远多于写操作的共享数据 | 考虑 RCU |