golang-互斥锁-mutex-源码阅读笔记
互斥锁的机制
Go语言中互斥锁(Mutex)的公平性机制。互斥锁是一种用于保护共享资源,防止多个goroutine同时访问的同步原语。这段注释详细说明了互斥锁的两种操作模式:普通模式和饥饿模式。
普通模式(Normal Mode)
在普通模式下,等待获取锁的goroutine会按照先进先出(FIFO)的顺序排队。当一个等待的goroutine被唤醒时,它并不直接拥有锁,而是与新到达的goroutine竞争锁的所有权。新到达的goroutine由于已经在运行,因此在竞争中有优势。如果一个等待的goroutine在竞争中失败,它会被重新排队到等待队列的前面。如果一个goroutine连续1毫秒以上无法获取锁,互斥锁会切换到饥饿模式。
饥饿模式(Starvation Mode)
在饥饿模式下,锁的所有权直接从解锁的goroutine传递给等待队列前面的goroutine。新到达的goroutine即使发现锁处于未锁定状态,也不会尝试获取锁,而是直接排队到等待队列的末尾。这种模式下,新到达的goroutine不会尝试自旋(即不断尝试获取锁)。
模式切换
如果一个goroutine成功获取到锁,并且发现它是队列中的最后一个等待者,或者它等待的时间少于1毫秒,它会将互斥锁切换回普通模式。
性能考虑
普通模式具有更好的性能,因为一个goroutine可以连续多次获取锁,即使有阻塞的等待者。饥饿模式则用于防止极端情况下的尾部延迟问题,确保等待时间较长的goroutine能够及时获取锁。
介绍
数据结构
// A Mutex is a mutual exclusion lock.
type Mutex struct {
state int32
sema uint32
}
包中的常量
const (
// mutex = 1
mutexLocked = 1 << iota // mutex is locked
// mutexWoken = 2
mutexWoken
// mutexStarving = 4
mutexStarving
// mutexWaiterShift = 3
mutexWaiterShift = iota
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
// 100000
starvationThresholdNs = 1e6
)
加锁
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
fast path
如果state为0,也就是没有被加锁,也不是饥饿模式,没有排队等待的goroutine。直接走fast path,通过原子操作给mutex加上锁
如果失败,则进入slow path
slow path
自旋
满足一下三种情况进入自旋模式
- mutex被加锁,也就是mutexLocked标志位为1
- mutex不是饥饿模式
- 满足自旋的条件,如下
进入自旋后,如果无协程正在取锁,但是有协程在阻塞等待,则将mutexWoken标志位置为1
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// mutex已被加锁且不是饥饿模式,并且runtime_canSpin()返回true
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 如果无协程正在取锁,但是有协程在阻塞等待,则将mutexWoken标志位置为1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
根据状态位设置新的状态位
不满足自旋情况后,也就是三个条件不满足之后进入这一步
- mutex的mutexLocked标志位未被加锁
- mutex处于饥饿模式
- 不满足自旋条件
会一直进行下面三步
- 如果不是饥饿模式,设置加锁标志位,尝试抢锁。饥饿模式的话,需排队抢锁
- 如果状态已被加锁或者饥饿模式,增加state中等待的goroutine数量
- 如果进入饥饿模式,且已被加锁,则将新值设置为饥饿模式
- 如果已经有阻塞的协程被唤醒,需要将mutexWoken标志位置为0;因为接下来要么是加锁成功,要么失败阻塞
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// ... 自旋逻辑
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 不处于饥饿模式,考虑抢锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果被加锁或者饥饿模式,增加state中等待的goroutine数量
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 如果进入饥饿模式,且已被加锁,则将新值设置为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果已经有阻塞的协程被唤醒,需要将mutexWoken标志位置为0
// 因为接下来要么是加锁成功,要么失败阻塞
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// ...
new &^= mutexWoken
}
替换新值
抢锁
- CAS失败,说明mutex的值已被修改,进入新一轮加锁流程
- CAS成功,旧值未加锁且不是饥饿模式,加锁成功,退出循环
- 未抢到锁或者饥饿模式,如果是第一次阻塞,设置等待起始时间
- 阻塞挂起, 老协程插入队头,新协程插入队尾
唤醒
- 判断是否是饥饿模式
- 如果是饥饿模式,无需竞争加锁,直接成功。更新状态值
- 如果不是饥饿模式,进入新一轮加锁流程
// ... 自旋
// ... 根据状态位设置新值
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 加锁成功且不是饥饿模式,直接退出
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
// 新进入抢锁的协程,未阻塞过,设置等待起始时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞, 老协程插入队头,新协程插入队尾
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
// 被唤醒,设置饥饿模式,原来是饥饿模式或者等待时间超过阈值1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 饥饿模式被唤醒,无需竞争抢锁
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
// ... 检测状态是否正常
// 更新锁的状态,等待的协程个数,是否为饥饿模式
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 如果不是饥饿模式,进入新的一轮抢锁流程
awoke = true
iter = 0
} else {
// CAS失败,重新进入循环
old = m.state
}
解锁
快速路径
如果发现只有一个goroutine等待解锁,直接返回即可
如果发现锁中还有阻塞协程,则走入unlockSlow分支
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
慢分支
未被加锁
倘若发现mutex此前未被加锁,直接抛出fatal
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
...
}
非饥饿模式
如果不是饥饿模式
且无等待的阻塞协程,或者mutexLocked|mutexWoken|mutexStarving任一标志位不为0,说明已经有其他活跃协程介入,无需关心后续流程
如果不满足,则唤醒一个等待的协程,失败则再次进入循环
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 2)
return
}
old = m.state
}
}
饥饿模式
直接唤醒阻塞协程队列头部的协程
if new&mutexStarving == 0 {
... // 非饥饿模式的操作
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 2)
}