Golang Mutex 原理及实现

Go Mutex

Mutex 是 Go 官方提供的一个互斥锁,在各种并发场景中使用广泛。

本文介绍 Mutex 的原理以及实现。

狠狠被位运算绕晕

实现模式

Barging

当锁被释放时,唤醒第一个等待者,然后把锁交给第一个等待者或者第一个请求锁的程序。

这种模式提高了吞吐量。

Handoff

当锁被释放时,锁会一直持有,直到第一个等待者准备好获取锁。

这种模式降低了吞吐量,因为这个锁是被持有的,即使另一个 goroutine 准备获取它,它也不会将这个锁给它。换句话说,这个 Handoff 的模式与 Linux 的 mutex lock 的一个模式相似,它只会将锁给给定的程序。

这种模式可以解决公平性的问题,因为在 Barging 模式下可能存在一种情况是:goroutine 被唤醒了,但获取不到锁。为什么呢?因为一直在 cpu 上跑的 goroutine 没有上下文切换,它相对于要切换的可能更快。唤醒的 goroutine 需要上下文切换,而 cpu 上跑的不用,导致已经唤醒的 goroutine 拿不到锁。

也因为持有锁的原因,这种模式缺点是性能会相对差一点。

Spining

Spining(自旋)。自旋在等待队列为空或者应用程序重度使用锁时效果更好。

这里简单说一下自旋锁的定义:当一个线程尝试获取某一把锁时,如果锁已经被占用了,自旋锁的做法是等待,间隔一段时间后再次尝试获取。这种 循环加锁 -> 等待 的机制被称为自旋锁。跟轮询很像。

根据上面的定义,为什么自旋这种模式在等待队列为空或者应用程序重度使用锁时效果更好是显而易见的。但是自旋的成本是很高的,所以在 go 的实现中进入自旋的条件十分苛刻。

Mutex 基本结构

下面来看看 Mutex 的结构体以及有关常量

1
2
3
4
5
6
7
8
9
10
11
12
type Mutex struct {
state int32
sema uint32
}

const (
mutexLocked = 1 << iota // mutex 是否锁定
mutexWoken
mutexStarving
mutexWaiterShift = iota // 此时 iota = 3
starvationThresholdNs = 1e6
)
  • state 是一个 int32 类型,它表示了 Mutex 的一些状态。它:
    • 低 1 位:mutexLocked,Mutex 是否锁定,也就是是否上锁了。1 为已上锁。
    • 低 2 位:mutexWoken,Mutex 上是否有 goroutine 处于唤醒状态。1 为已唤醒。
    • 低 3 位:mutexStarving,Mutex 是否是饥饿状态的。
    • 剩余的 29 位则表示 waiter 队列中等待的 goroutine 数量,可见 mutexWaiterShift = 3,将 state 右移 3 位可以得到 waiter 队列中等待的 goroutine 数量。
  • sema 是信号量,用于控制 goroutine 的睡眠和唤醒。

一点补充

这里解释以下常量 mutexLocked 和 mutexStarving 为啥这么定义。

我们知道按位与(位值相等则为 1)和按位或(同为 0 才为 0)的运算规则。

  • mutexLocked = 1,那么任何数与它进行按位或运算时,最低位都会是 1,就代表已上锁。
  • mutexStarving = 4,二进制表示为 100,任何数与它进行按位与运算时,如果得到的结果为 0,那么证明 waiter 队列中无 goroutine,当前也未上锁。

Mutex 原理及代码实现解析

而 Go 中锁的实现是结合上面提到的三种模式,提供了两种模式。

  • 正常模式:使用 Barging 模式进行锁的抢占和释放。在这个模式下,可以进入自旋状态。在多核的 CPU 上,自旋可以避免 goroutine 的切换,使用恰当会对性能带来很大的增益。
    但是随意使用自旋也有可能拖累 CPU,Golang 作者认为应该保守的引入自旋,所以 goroutine 进入自旋的条件非常苛刻:持有锁的goroutine能在较短的时间内归还锁时,才允许自旋。
    • 自旋次数 < 4
    • 必须是多核CPU 且 GOMAXPROCS>1
    • 至少有一个其他的正在运行的 P 并且本地运行队列为空
  • 饥饿模式:上面说了,Barging 模式会出现不公平的现象,有的 goroutine 可能一直都拿不到锁。为了公平性,golang 的 mutex 有饥饿模式。
    • 当 waiter(也就是等待锁的 goroutine 队列)中的 goroutine 等待超过 1ms 时,就会从正常模式进入到饥饿模式。饥饿模式使用 Handoff 模式进行锁的交接。
    • 当当前 goroutine 获取到了锁后,它是 waiter 队列中的最后一个 goroutine,且等待锁的时间 < 1ms 时,退出饥饿模式。

补充:

  • 饥饿模式是 Go 1.9 版本后引入的优化,用于解决公平性的问题。

Lock 加锁

加锁原理

我们调用 Lock 方法时,

Fast Path:

  • 如果当前锁处于初始化状态,调用 atomic 包的 CAS 方法尝试获取锁。
  • 如果获取锁的时候,失败了,进入 Slow Path。

Slow Path:

  • 首先判断当前状态是否能进入自旋状态。如果可以,进入自旋,并且最多只能自旋 4 次。
  • 自旋完成后,计算当前锁的状态。
  • 再次尝试 CAS 获取锁。
  • 如果上一步没有获取到锁,调用 runtime_SemacquireMutex 方法,休眠当前 goroutine 并尝试获取信号量。
  • 当 goroutine 被唤醒后,判断是否处于饥饿状态:
    • 进入饥饿状态的条件是:当前 goroutine 超过 1ms 都没获取到锁。
    • 如果处在饥饿状态,获得互斥锁。此时,如果等待队列中只存在当前 goroutine,互斥锁会结束饥饿状态。
    • 如果不处于饥饿状态,设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环。

补充:

  • CAS 指 atomic.CompareAndSwapInt32(addr, old, new) bool,这个方法先比较传入的地址的值是否是 old,如果是就将 new 赋给 addr,不是就返回 false。

整个过程的流程图如下:

![lock](../img/lock 流程图.webp)

源码

看完了大概原理,那么开始上源码。

1
2
3
4
5
6
7
8
9
10
11
12
func (m *Mutex) Lock() {
// Fast path: 理想状态,CAS 直接锁上
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
// 这是竟态检测,普通运行没影响
race.Acquire(unsafe.Pointer(m))
}
return
}
// 不行进入 Slow Path
m.lockSlow()
}

重点看 lockSlow,它是一个大循环,我去掉了一些没有啥关系的代码(比如竟态检测?)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 用来计算等待时间
starving := false // 是否饥饿
awoke := false // 是否被唤醒
iter := 0 // 自旋次数
old := m.state // 当前状态
for {
// 这里是判断是否进入自旋状态
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 这里进入了自旋状态
// 那么开始尝试抢占锁
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 抢到了,标志为唤醒
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}

// 这里没进入自旋
// new 为期望的状态
new := old
// 当前不是饥饿模式
if old&mutexStarving == 0 {
// 设为上锁状态
new |= mutexLocked
}
// 当前锁已经被抢占了,或者为饥饿状态
if old&(mutexLocked|mutexStarving) != 0 {
// waiter + 1,也就是申请上锁需要到 waiter 里排队
new += 1 << mutexWaiterShift
}
// 当前是饥饿模式,且锁已经被抢占
if starving && old&mutexLocked != 0 {
// 标记为饥饿模式
new |= mutexStarving
}

if awoke {
// 当前 goroutine 为唤醒状态
// 如果两者的标志位不一致,报错
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 将 awoke 位设为 1,标记为唤醒
new &^= mutexWoken
}
// 这里是尝试 CAS 上锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 如果上锁成功了,直接退出循环返回结果
break
}

// 没成功
// 如果到这了,等待时间不是 0(证明它不是第一次来了)
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
// 获取当前的时间,单位纳秒
waitStartTime = runtime_nanotime()
}
// 这个函数可以暂时这样理解
// 如果 queueLifo 为 true,放入队头等待
// 如果 queueLifo 为 false,排队
// 然后进入 waiter 等待
// 执行到这里会把 goroutine 睡眠,直到将其唤醒
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

// 上面说的,如果等待时间大于 1ms,进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 当前为饥饿模式,应该直接获取到锁
// 因为执行到这里,应该是程序将这个 goroutine 唤醒了。
// 为饥饿模式那么就是 Handoff 方式
if old&mutexStarving != 0 {
// 两者有一个是 true 就代表状态不一致。
// 前者代表锁已经被抢占了(那么不符合饥饿模式下 Handoff 的交接)
// 后者代表当前已经没有 goroutine 在等待锁了(不符合饥饿模式)
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta 为 -7 -> 1001
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
// 当前 goroutine 是 waiter 中最后一个 goroutine 了
delta -= mutexStarving
}
// 使用这个进行加锁
atomic.AddInt32(&m.state, delta)
break
}
// 重置状态
awoke = true
iter = 0
} else {
// 没加锁成功
old = m.state
}
}
}

可以慢慢看注释理解一下。

UnLock 解锁

解锁原理

解锁也有 Fast path 和 Slow Path。

Fast Path:

  • 调用 AddInt32 直接进行解锁。
  • 如果 waiter 中有其他的 groutine,则进入 Slow Path 分配锁的归属。

Slow Path:

源码

1
2
3
4
5
6
7
8
9
func (m *Mutex) Unlock() {
// Fast path: 直接解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 这里代表还有 goroutine 等待使用这个锁
// slow 则是分配这个锁的归属
m.unlockSlow(new)
}
}

看注释吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
// 这里是解锁了一个没上锁的锁
// 因为解锁了没上锁的锁,new 为 -1, (-1 + 1) & 1 = 0
fatal("sync: unlock of unlocked mutex")
}
// 如果此时不是饥饿模式
if new&mutexStarving == 0 {
old := new
for {
// 满足下面几个调教可以直接返回
// 1. 队列内无等待的 goroutine(没得分了
// 2. 锁已经被抢占(不用分配了
// 3. goroutine 处于唤醒状态(不用分配了
// 4. 处于饥饿模式(直接交给队头
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 这里就是锁是空闲的,谁手快锁给谁,然后将 waiter - 1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 否则,使用 Handoff 方式一个个给锁
runtime_Semrelease(&m.sema, true, 1)
}
}

Reference

Week03: Go并发编程(四) 深入理解 Mutex - 知乎 (zhihu.com)

深入分析Golang的Mutex - 知乎 (zhihu.com)