锁实现

代码注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
//
// Values containing the types defined in this package should not be copied.
package sync

import (
"internal/race"
"sync/atomic"
"unsafe"
)

func throw(string) // provided by runtime

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
//三十二位数字
// ... 2 1 0
// ^ ^ ^ ^
// | | | |
// 等 是 是 锁
// 待 否 否 状
// 数 饥 唤 态
// 量 饿 醒
// g
state int32

sema uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}

const (
mutexLocked = 1 << iota // mutex is locked 锁状态
mutexWoken // 是否有人被唤醒
mutexStarving // 是否处于饥饿状态
mutexWaiterShift = iota

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
starvationThresholdNs = 1e6
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.//使用sas加锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
// 第二种情况:慢上锁,即此刻有竞争对手
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64 //等待时间
starving := false //我的饥饿状态
awoke := false //我的是否唤醒态
iter := 0 //我的自旋次数
old := m.state //当前锁的信息
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
//
//正常模式 并且允许自旋 ==> 看不太懂可以看看 & 和| 操作
//runtime_canSpin为true的条件
//1运行在多核 CPU 的机器上
//2当前 Goroutine 为了获取该锁进入自旋的次数小于四次
//3当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
//如果我没有被唤醒 且没有人被唤醒 并且等等的g数量不为0 那么我尝试将自己设置为唤醒态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true //成功则设置自己为唤醒态
}
runtime_doSpin() //自旋一次
iter++ //自旋次数加一
old = m.state //更新当前锁的状态
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
//非饥饿模式加锁 确保锁定标志为1
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果锁被占了或者处于饥饿状态 那么就去排队
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift //等待的g加1
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.

// 如果我已经饥饿 并且锁已经被占用
if starving && old&mutexLocked != 0 {
//那么锁也随之变为饥饿态
new |= mutexStarving
}
//如果我是唤醒态
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
//但是new不是唤醒态 那就抛出错误
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//重置唤醒标志位
new &^= mutexWoken
}
//尝试更新锁的状态为 new 即尝试上面设置的一系列状态 抢锁 排队 饥饿 重置唤醒
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
//抢锁成功 即不在饥饿状态 也没有锁定
}
// If we were already waiting before, queue at the front of the queue.
//判断我的等待时间
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() //设置开始等待时间
}
//原语:如果我之前排过队,这次就把我放到等待队列队首,否则把我放到队尾,并将我挂起
// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
// 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
// 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//判断我的饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state //更新锁的状态
//如果锁处于饥饿状态
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
//如果锁没有等待的g 或者 锁被占用 或者锁有g被唤醒 那么互斥状态不一致
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta是一个中间状态,atomic.AddInt32方法将给锁落实这个状态
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 如果现在我不饥饿或者等待锁的就我一个,那么就将锁解除饥饿切换到正常状态。
// 饥饿模式效率很低,而且一旦有两个g把mutex切换为饥饿模式,那就会死锁。
delta -= mutexStarving
}
// 原语:给锁落实delta的状态。
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state //不成功 更新当前锁状态
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
//检测状态是否正常
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//正常状态下
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果锁正处在正常模式下,同时 没有等待锁的g 或者 已经有g被唤醒了 或者 锁已经被抢了,就什么也不用做直接返回
// 如果锁正处在饥饿模式下,也是什么也不用做直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 给锁的唤醒标志位置1,表示已经有g被唤醒了,Mutex.state后三位010
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 唤醒锁的等待队列头部的一个g
// 并把g放到p的funq尾部
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 锁处在饥饿模式下,直接唤醒锁的等待队列头部的一个g
//因为在饥饿模式下没人跟刚被唤醒的g抢锁,所以不用设置锁的唤醒标志位
runtime_Semrelease(&m.sema, true, 1)
}
}

CompareAndSwapInt32

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// bool Casp1(void **val, void *old, void *new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
TEXT runtime∕internal∕atomic·Casp1(SB), NOSPLIT, $0-25
// 首先将 ptr 的值放入 BX
MOVQ ptr+0(FP), BX
// 将假设的旧值放入 AX
MOVQ old+8(FP), AX
// 需要比较的新值放入到CX
MOVQ new+16(FP), CX
LOCK
CMPXCHGQ CX, 0(BX)
SETEQ ret+24(FP)
RET

MOV 指令有有好几种后缀 MOVB MOVW MOVL MOVQ 分别对应的是 1 字节 、2 字节 、4 字节、8 字节

TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17,$0-17表示的意思是这个TEXT block运行的时候,需要开辟的栈帧大小是0,而17 = 8 + 4 + 4 + 1 = sizeof(pointer of int32) + sizeof(int32) + sizeof(int32) + sizeof(bool)(返回值是 bool ,占据 1 个字节)

FP,是伪寄存器(pseudo) ,里边存的是 Frame Pointer, FP配合偏移 可以指向函数调用参数或者临时变量

MOVQ ptr+0(FP) BX 这一句话是指把函数的第一个参数ptr+0(FP)移动到BX寄存器中

MOVQ代表移动的是8个字节,Q 代表64bit ,参数的引用是 参数名称+偏移(FP),可以看到这里名称用了ptr,并不是val,变量名对汇编不会有什么影响,但是语法上是必须带上的,可读性也会更好些。

LOCK并不是指令,而是一个指令的前缀(instruction prefix),是用来修饰CMPXCHGL CX,0(BX) 的

The LOCK prefix ensures that the CPU has exclusive ownership of the appropriate cache line for the duration of the operation, and provides certain additional ordering guarantees. This may be achieved by asserting a bus lock, but the CPU will avoid this where possible. If the bus is locked then it is only for the duration of the locked instruction

CMPXCHGL 有两个操作数,CX 和 0(BX),0(BX)代表的是val的地址。

CMPXCHGL指令做的事情,首先会把0(BX)里的值和AX寄存器里存的值做比较,如果一样的话会把CX里边存的值保存到0(BX)这块地址里 (虽然这条指令里并没有出现AX,但是还是用到了,汇编里还是有不少这样的情况)

SETEQ 会在AX和CX相等的时候把1写进 ret+16(FP)(否则写 0)

好久之前写的 我记得当时参考了别的资料 一时找不见了 ,大家可以帮忙评论下 我加以下参考链接。