glibc pthread_mutex_lock/unlock futex 互斥锁的实现
起因
V站一篇面试题提到futexfd
这个东东,搜了下和锁有关,故 look look
文章链接:v2ex gullitintanni: 骑驴找马, Linux 面试凉经分享
开始
简单示例,两个任务争锁
#include <unistd.h>
#include <cstdio>
#include <iostream>
#include <pthread.h>pthread_mutex_t mutex;void thread_func(void*)
{while (true) {pthread_mutex_lock(&mutex);std::cout << "thread_func" << std::endl;sleep(1);pthread_mutex_unlock(&mutex);}
}int main(void)
{ pthread_mutex_init(&mutex, NULL);pthread_t thread;pthread_create(&thread, NULL, (void *(*)(void *))thread_func, NULL);thread_func(NULL);
}
用户层
pthread_mutex_lock
gdb,启动!
# g++ main.cpp -lpthread -g
# gdb a.out
(gdb) b pthread_mutex_lock
(gdb) r
跳转到LLL_MUTEX_LOCK_OPTIMIZED (mutex)
/root/mine/root/rpmbuild/BUILD/glibc-2.34/nptl/pthread_mutex_lock.c: 71int
PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex)
{else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)== PTHREAD_MUTEX_RECURSIVE_NP, 1)) // 可递归锁,即该锁本线程获取后可重复获取,上述示例即走的该路径{/* We have to get the mutex. */LLL_MUTEX_LOCK_OPTIMIZED (mutex);
LLL_MUTEX_LOCK_OPTIMIZED
实际走到lll_mutex_lock_optimized
/root/mine/root/rpmbuild/BUILD/glibc-2.34/nptl/pthread_mutex_lock.c: 54# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)
pthread_create
创建线程成功前是单线程的,会直接走第一个if
的位置,这样就不需要复杂逻辑直接判断返回即可,如果是多线程的情况下才会走lll_lock
/root/mine/root/rpmbuild/BUILD/glibc-2.34/nptl/pthread_mutex_lock.c: 35static inline void
lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{/* The single-threaded optimization is only valid for privatemutexes. For process-shared mutexes, the mutex could be in ashared mapping, so synchronization with another process is neededeven without any threads. If the lock is already marked asacquired, POSIX requires that pthread_mutex_lock deadlocks fornormal mutexes, so skip the optimization in that case aswell. */int private = PTHREAD_MUTEX_PSHARED (mutex);if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)mutex->__data.__lock = 1;elselll_lock (mutex->__data.__lock, private);
}
lll_lock
位置s
单步,进入的是__lll_lock_wait_private
内容看起来很简单,判断futex
内存里的值是不 是2,期望是从 0(未被使用) 到 2(获取锁)
atomic_exchange_acquire
原子指令,用架构提供的汇编指令,一条指令同时读取原值并写入2
/root/mine/root/rpmbuild/BUILD/glibc-2.34/nptl/lowlevellock.c: 25void
__lll_lock_wait_private (int *futex)
{if (atomic_load_relaxed (futex) == 2) // 是 2,已经被占用,直接进入循环goto futex;/* 原子指令,读取原来的值,并写入 2,如果原来的不是 0,说并状态还是不对,继续循环从循环体 != 2 结束 futex 后也要原子指令判断一次,防止中间被其他线程占用 */while (atomic_exchange_acquire (futex, 2) != 0) {futex:futex_wait ((unsigned int *) futex, 2, LLL_PRIVATE); /* Wait if *futex == 2. */}
}
pthread_mutex_unlock
对应的pthread_mutex_unlock
和上面的调用方式类似,最终执行到这里:
/root/mine/root/rpmbuild/BUILD/glibc-2.34/nptl/pthread_mutex_unlock.c: 32/* lll_lock with single-thread optimization. */
static inline void
lll_mutex_unlock_optimized (pthread_mutex_t *mutex)
{/* The single-threaded optimization is only valid for privatemutexes. For process-shared mutexes, the mutex could be in ashared mapping, so synchronization with another process is neededeven without any threads. */int private = PTHREAD_MUTEX_PSHARED (mutex);if (private == LLL_PRIVATE && SINGLE_THREAD_P)mutex->__data.__lock = 0;elselll_unlock (mutex->__data.__lock, private); // 多线程时候执行到这里
}
lll_unlock
跳转到的是__lll_lock_wake
,实际执行futex
系统调用
/root/mine/root/rpmbuild/BUILD/glibc-2.34/nptl/lowlevellock.c: 62void
__lll_lock_wake (int *futex, int private)
{lll_futex_wake (futex, 1, private); // 这里的 1,指的是唤醒数量为 1
/root/mine/root/rpmbuild/BUILD/glibc-2.34/sysdeps/nptl/lowlevellock-futex.h: 85/* Wake up up to NR waiters on FUTEXP. */
# define lll_futex_wake(futexp, nr, private) \lll_futex_syscall (4, futexp, \__lll_private_flag (FUTEX_WAKE, private), nr, 0)
内核层
futex
进入系统调用后根据用户层的要求,对于上面的简单测试用例,分别走下面两个函数
/root/qemu/linux-5.10.202/kernel/futex/core.c: 3723long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,u32 __user *uaddr2, u32 val2, u32 val3)
{switch (cmd) {case FUTEX_WAIT_BITSET:return futex_wait(uaddr, flags, val, timeout, val3);case FUTEX_WAKE_BITSET:return futex_wake(uaddr, flags, val, val3);
#1 0xffffffff81190b44 in do_futex (uaddr=uaddr@entry=0x4041a0, op=op@entry=128, val=val@entry=2, timeout=0x0 <fixed_percpu_data>, uaddr2=uaddr2@entry=0x0 <fixed_percpu_data>, val2=<optimized out>, val3=<optimized out>) at kernel/futex/core.c:3753
#2 0xffffffff811910f6 in __do_sys_futex (val3=<optimized out>, uaddr2=0x0 <fixed_percpu_data>, utime=0x0 <fixed_percpu_data>, val=2, op=128, uaddr=0x4041a0) at kernel/futex/core.c:3816
#3 __se_sys_futex (val3=<optimized out>, uaddr2=0, utime=0, val=2, op=128, uaddr=4211104) at kernel/futex/core.c:3782
#4 __x64_sys_futex (regs=<optimized out>) at kernel/futex/core.c:3782
#5 0xffffffff81aef243 in do_syscall_64 (nr=<optimized out>, regs=0xffffc9000023bf58) at arch/x86/entry/common.c:46
#6 0xffffffff81c000da in entry_SYSCALL_64 () at arch/x86/entry/entry_64.S:125
futex_wait
/root/qemu/linux-5.10.202/kernel/futex/core.c: 2694static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,ktime_t *abs_time, u32 bitset)
{struct hrtimer_sleeper timeout, *to;struct restart_block *restart;struct futex_hash_bucket *hb;struct futex_q q = futex_q_init;retry:/** Prepare to wait on uaddr. On success, holds hb lock and increments* q.key refs.*/ret = futex_wait_setup(uaddr, val, flags, &q, &hb); // 填充 key,根据 key 计算 hash,返回 futex_queues 桶位置到 hbif (ret)goto out;/* queue_me and wait for wakeup, timeout, or a signal. */futex_wait_queue_me(hb, &q, to); // 添加到队列,如果有定时开启定时,阻塞自己,被唤醒后也从这里开始/* unqueue_me() drops q.key ref */if (!unqueue_me(&q)) // 从队列中删除,取消定时,成功返回0,结束系统调用goto out;
futex_wait_setup
会读取地址中的值,不是 2 不会后续操作,也就是说,只有是 2 的时候才会阻塞自己,等待唤醒,是 2 代表锁正在被其他进程抢占。
/root/qemu/linux-5.10.202/kernel/futex/core.c: 2639static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,struct futex_q *q, struct futex_hash_bucket **hb)
{u32 uval;ret = get_futex_value_locked(&uval, uaddr); // 从 uaddr 读取值放到 uval 中if (uval != val) {queue_unlock(*hb);ret = -EWOULDBLOCK;}return ret;
把futex
要求的和current中的一些信息填充到这个key
/root/qemu/linux-5.10.202/include/linux/futex.h: 32union futex_key {struct {u64 i_seq;unsigned long pgoff;unsigned int offset;} shared;struct {union {struct mm_struct *mm; // current->mmu64 __tmp;};unsigned long address; // address - (address % PAGE_SIZE),即 address 对齐 PAGE_SIZEunsigned int offset;} private;struct {u64 ptr;unsigned long word;unsigned int offset; // address % PAGE_SIZE} both;
};
根据key
计算hash
,返回全局的futex_queues
中对应的 hash 桶位置。
/root/qemu/linux-5.10.202/kernel/futex/core.c: 361static struct futex_hash_bucket *hash_futex(union futex_key *key) // 根据 key 计算 hash,返回 futex_queues 桶位置
{u32 hash = jhash2((u32 *)key, offsetof(typeof(*key), both.offset) / 4,key->both.offset);return &futex_queues[hash & (futex_hashsize - 1)];
也就是futex_wait
时候,根据请求地址、期望的值等,放到key
中,计算hash
。保存在内核全局队列中,等待唤醒。
对应pthread_mutex_lock
futex_wake
对应的futex_wake
逻辑对应,根据请求地址,期望的值等,放到key
中,计算hash
,在内核全局队列中查找有没有阻塞的任务,如果有,就唤醒,如果没有,返回即可。
对应pthread_mutex_unlock
/root/qemu/linux-5.10.202/kernel/futex/core.c: 1596/** Wake up waiters matching bitset queued on this futex (uaddr).*/
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{struct futex_hash_bucket *hb;struct futex_q *this, *next;union futex_key key = FUTEX_KEY_INIT;int ret;DEFINE_WAKE_Q(wake_q);if (!bitset)return -EINVAL;ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ); // 填充一些 key 里的值if (unlikely(ret != 0))return ret;hb = hash_futex(&key); // 根据 key 计算 hash,返回 hash 桶位置:futex_queues[hash & (futex_hashsize - 1)]/* Make sure we really have tasks to wakeup */if (!hb_waiters_pending(hb)) // return atomic_read(&hb->waiters); 有等待者return ret;spin_lock(&hb->lock);plist_for_each_entry_safe(this, next, &hb->chain, list) {if (match_futex (&this->key, &key)) {if (this->pi_state || this->rt_waiter) {ret = -EINVAL;break;}/* Check if one of the bits is set in both bitsets */if (!(this->bitset & bitset))continue;mark_wake_futex(&wake_q, this); // 从 futex hash 去除 this,this 加入到 wake_qif (++ret >= nr_wake)break;}}spin_unlock(&hb->lock);wake_up_q(&wake_q); // 从 futex hash 中匹配到的可唤醒任务,唤醒return ret;
}
总结
pthread_mutex_lock
/pthread_mutex_unlock
在单线程环境中不会走futex
,会更简单直接对mutex_t
中的值判断赋值即可。
多线程时:
mutex->__data.__lock
里的值:
- 0:该锁没有被占用
- 2:被占用
pthread_mutex_lock
如果mutex->__data.__lock
是 2 直接发起futex
系统调用,如果mutex->__data.__lock
值是0,尝试原子写入从 0 变 2,如果失败也futex
挂起自己,循环,直到内存中是 0 可以被写入 2 即正常抢占锁。
futex
内核中将会读取该地址中的值再次判断,如果是 2 则根据该地址计算 hash,存储在内核全局的futex_queues
中挂起自己,否则表示锁没有被占用直接返回即可。
pthread_mutex_unlock
直接进入futex
,内核根据mutex->__data.__lock
地址计算 hash,如果内核全局的futex_queues
中有已阻塞任务,唤醒一个即可。
互斥锁靠内存地址中的值进行抢占与抢占判断,阻塞时仅仅是把当前信息存储到内核全局的位置,内存中的值变化从而唤醒进程不是靠硬件机制检测(比如缺页异常),靠的是释放锁的时候再次传入锁的地址,从内核全局位置找到阻塞的任务,唤醒。