Go Select

go select

main

Let’s use the go tool tool to analyze it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
c1 := make(chan int)
c2 := make(chan int)
go func() {
time.Sleep(time.Second)
<-c2
c1 <- 1
}()
select {
case v := <-c1:
fmt.Printf("%d <- c1", v)
case c2 <- 1:
fmt.Println("c2 <- 1")
}
}

Filter the analysis results for CALL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
main.go:9             0x4a05c6                e81542f6ff                      CALL runtime.makechan(SB)               
main.go:10 0x4a05ec e8ef41f6ff CALL runtime.makechan(SB)
main.go:11 0x4a0620 e82b3bf9ff CALL runtime.newproc(SB)
main.go:16 0x4a0654 e82c94fbff CALL 0x459a85
main.go:16 0x4a06e3 e8d8b7f9ff CALL runtime.selectgo(SB)
main.go:18 0x4a074c e8df8df6ff CALL runtime.convT2E64(SB)
main.go:18 0x4a07ec e8cf89ffff CALL fmt.Printf(SB)
main.go:18 0x4a0806 e8f587fbff CALL runtime.gcWriteBarrier(SB)
main.go:20 0x4a088c e87f8bffff CALL fmt.Println(SB)
main.go:8 0x4a0898 e85369fbff CALL runtime.morestack_noctxt(SB)
main.go:12 0x4a0945 e8868efaff CALL time.Sleep(SB)
main.go:13 0x4a095c e8ff4bf6ff CALL runtime.chanrecv1(SB)
main.go:14 0x4a0976 e85541f6ff CALL runtime.chansend1(SB)
main.go:11 0x4a0985 e86668fbff CALL runtime.morestack_noctxt(SB)

As you can see, the implementation of select depends on the selectgo function.

Think that’s it, and then we start to analyze the selectgo function. No, I found another situation when I was cheap.

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
c1 := make(chan int)
go func() {
time.Sleep(time.Second)
c1 <- 1
}()
select {
case <-c1:
fmt.Printf("c1 <- 1")
default:
fmt.Println("default")
}
}

The results are as follows:

1
2
3
4
5
6
7
8
9
main.go:9             0x49eca8                e8335bf6ff              CALL runtime.makechan(SB)               
main.go:11 0x49eccf e85c54f9ff CALL runtime.newproc(SB)
main.go:17 0x49ece6 e83570f6ff CALL runtime.selectnbrecv(SB)
main.go:18 0x49ed1c e88f8bffff CALL fmt.Printf(SB)
main.go:22 0x49ed8f e86c8dffff CALL fmt.Println(SB)
main.go:8 0x49ed96 e8556cfbff CALL runtime.morestack_noctxt(SB)
main.go:12 0x49ee35 e87692faff CALL time.Sleep(SB)
main.go:13 0x49ee4f e87c5cf6ff CALL runtime.chansend1(SB)
main.go:11 0x49ee5e e88d6bfbff CALL runtime.morestack_noctxt(SB)

As you can see, the implementation of select here relies on the underlying selectnbrecv function. If, since there is selectnbrecv function, will there be selectnbsend function? Keep trying.

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
c1 := make(chan int)
go func() {
time.Sleep(time.Second)
<- c1
}()
select {
case c1 <- 1:
fmt.Printf("c1 <- 1")
default:
fmt.Println("default")
}
}

Analysis of j results

1
2
3
4
5
6
7
8
9
main.go:9             0x49ecb3                e8285bf6ff                      CALL runtime.makechan(SB)               
main.go:11 0x49ecda e85154f9ff CALL runtime.newproc(SB)
main.go:17 0x49ed05 e81670f6ff CALL runtime.selectnbsend(SB)
main.go:18 0x49ed3b e8708bffff CALL fmt.Printf(SB)
main.go:22 0x49edb4 e8478dffff CALL fmt.Println(SB)
main.go:8 0x49edbb e8306cfbff CALL runtime.morestack_noctxt(SB)
main.go:12 0x49ee65 e84692faff CALL time.Sleep(SB)
main.go:13 0x49ee7c e8df66f6ff CALL runtime.chanrecv1(SB)
main.go:11 0x49ee8b e8606bfbff CALL runtime.morestack_noctxt(SB)

Here we use the selectnbsend function to implement the select statement, and then continue to experiment, and draw the following conclusions:

  • If there is only one case in the select statement waiting to receive data from the channel, the selectnbrecv implementation is called

  • If there is only one case in the select statement waiting to send data to channel, the selectnbsend implementation is called

  • If there are multiple case s in the select statement waiting to send or receive data to or from one or more channel s, the selectgo implementation is called

  • 如果select语句中只有一个case等待从通道接收数据,将调用selectnbrecv实现。

  • 如果select语句中只有一个case等待向通道发送数据,将调用selectnbsend实现。

  • 如果select语句中有多个case等待发送或接收数据到一个或多个通道,将调用selectgo实现。

Okay, we started tracking from selectgo, but before tracking selectgo, we need to select reflect_rselect, or we’ll look at the parameters of selectgo function, which is totally confused.

reflect_rselect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func reflect_rselect(cases []runtimeSelect) (int, bool) {
// If there is no case select ion, dormant the current goroutine
if len(cases) == 0 {
block()
}
sel := make([]scase, len(cases))
order := make([]uint16, 2*len(cases))
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
sel[i] = scase{kind: caseDefault}
case selectSend:
// If it is sent, C < - 1, rc. Val is the address of 1.
sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
case selectRecv:
// If it is received, v:= < - c, rc. Val is the address of V.
sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
}
}
return selectgo(&sel[0], &order[0], len(cases))
}

selectgo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16 where ncases must be <= 65536.
// Both reside on the goroutine's stack (regardless of any escaping in
// selectgo).
//
// For race detector builds, pc0 points to an array of type
// [ncases]uintptr (also on the stack); for other builds, it's set to
// nil.
//
// selectgo returns the index of the chosen scase, which matches the
// ordinal position of its respective select{recv,send,default} call.
// Also, if the chosen scase was a receive operation, it reports whether
// a value was received.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
if debugSelect {
print("select: cas0=", cas0, "\n")
}

// NOTE: In order to maintain a lean stack size, the number of scases
// is capped at 65536.
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
// NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler.

// Even when raceenabled is true, there might be select
// statements in packages compiled without -race (e.g.,
// ensureSigM in runtime/signal_unix.go).
var pcs []uintptr
if raceenabled && pc0 != nil {
pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
pcs = pc1[:ncases:ncases]
}
casePC := func(casi int) uintptr {
if pcs == nil {
return 0
}
return pcs[casi]
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// The compiler rewrites selects that statically have
// only 0 or 1 cases plus default into simpler constructs.
// The only way we can end up with such small sel.ncase
// values here is for a larger select in which most channels
// have been nilled out. The general code handles those
// cases correctly, and they are rare enough not to bother
// optimizing (and needing to test).

// generate permuted order
norder := 0
for i := range scases {
cas := &scases[i]

// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}

j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
for i := range lockorder {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := len(lockorder) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}

if debugSelect {
for i := 0; i+1 < len(lockorder); i++ {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
throw("select: broken sort")
}
}
}

go

// 锁定所有参与选择的通道
// lock all the channels involved in the select
sellock(scases, lockorder)

var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
// pass 1 - 寻找已经在等待的事情
// pass 1 - look for something already waiting
var casi int
var cas *scase
var caseSuccess bool
var caseReleaseTime int64 = -1
var recvOK bool
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c

if casi >= nsends {
sg = c.sendq.dequeue() // 从通道的发送队列中出队一个发送操作
if sg != nil {
goto recv // 如果成功出队一个发送操作,则跳转到 "recv" 标签
}
if c.qcount > 0 { // 如果通道有缓冲数据,则跳转到 "bufrecv" 标签
goto bufrecv
}
if c.closed != 0 { // 如果通道已关闭,则跳转到 "rclose" 标签
goto rclose
}
} else {
if raceenabled {
racereadpc(c.raceaddr(), casePC(casi), chansendpc)
}
if c.closed != 0 { // 如果通道已关闭,则跳转到 "sclose" 标签
goto sclose
}
sg = c.recvq.dequeue() // 从通道的接收队列中出队一个接收操作
if sg != nil { // 如果成功出队一个接收操作,则跳转到 "send" 标签
goto send
}
if c.qcount < c.dataqsiz { // 如果通道的缓冲区有空间,则跳转到 "bufsend" 标签
goto bufsend
}
}
}

if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}
// 第二阶段 - 将所有通道入队
// pass 2 - enqueue on all chans
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog() // 获取sudog,将当前goroutine绑定到sudog上
sg.g = gp
sg.isSelect = true
// 在赋值 elem 和将 sg 进行入队操作到 gp.waiting 之间不能进行堆栈分割
// 在此处 copystack 可以找到它
// 在给 elem 赋值和将 sg 入队到 gp.waiting 时不能进行栈分割,
// 这样 copystack 才能找到它。
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// 按锁定顺序构造等待列表

// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink
// 加入相应等待队列
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}

// wait for someone to wake us up
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
gp.activeStackChans = false

// 加锁所有的channel
sellock(scases, lockorder)

gp.selectDone = 0
sg = (*sudog)(gp.param) // param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil

gp.param = nil

// pass 3 - 从非成功的通道中出队
// 否则它们会堆积在安静的通道上
// 记录成功的案例,如果有的话。
// 我们已经按锁定顺序单链上了 SudoGs。
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
caseSuccess = false
// 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
// 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
// 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了

gp.waiting = nil

for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist { // 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
// sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队

// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
// 不是此case唤醒当前goroutine, 将goroutine从此case的发送队列或接收队列出队
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
} // 释放当前case的sudog,然后处理下一个case的sudog
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}

if cas == nil {
throw("selectgo: bad wakeup")
}

c = cas.c

if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
}

if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}

if raceenabled {
if casi < nsends {
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
} else if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
}
}
if msanenabled {
if casi < nsends {
msanread(cas.elem, c.elemtype.size)
} else if cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
}
if asanenabled {
if casi < nsends {
asanread(cas.elem, c.elemtype.size)
} else if cas.elem != nil {
asanwrite(cas.elem, c.elemtype.size)
}
}

selunlock(scases, lockorder)
goto retc

bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
}
racenotify(c, c.recvx, nil)
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
if asanenabled && cas.elem != nil {
asanwrite(cas.elem, c.elemtype.size)
}
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc

bufsend:
// can send to buffer
if raceenabled {
racenotify(c, c.sendx, nil)
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
if asanenabled {
asanread(cas.elem, c.elemtype.size)
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc

recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc

rclose:
// read at end of closed channel
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc

send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
if asanenabled {
asanread(cas.elem, c.elemtype.size)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc

retc:
if caseReleaseTime > 0 {
blockevent(caseReleaseTime-t0, 1)
}
return casi, recvOK

sclose:
// send on closed channel
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}

selectnbrecv

When there is only one case in a select and the case is an operation to receive data, the select calls the selectnbrecv function to implement it.

1
2
3
4
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}

Here you will find that selectnbrecv is implemented by calling chanrecv, that is to say, the <-c1 we parsed above is the same, which is equivalent to the expression of selectnbrecv back into a separate <-c.

selectnbsend

Like selectnbrecv, when there is only one case for selectnbrecv and the case is sent to channel, it will fall back to C < - 1 expression.

1
2
3
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

Summary

So, the select ion process is roughly as follows

  • Judge whether each case needs to be blocked or not, and jump execution directly.

  • If the sending and receiving operations of each case need to be blocked, then judge whether there is default, and if so, execute default.

  • If there is no default for each case, create a sudog for each case and bind it to the sendq or recvq queue of the channel corresponding to the case.

  • If a sudog is fortunate, it is woken up, clears all sudog data and other attributes, and removes other sudogs from the queue.

  • At this point, a select operation ends

  • 对于每个 case,判断是否需要阻塞操作,并直接跳转执行。

  • 如果每个 case 的发送和接收操作都需要被阻塞,那么判断是否存在 default,如果存在,则执行 default。

  • 如果每个 case 都没有 default,则为每个 case 创建一个 sudog,并将其绑定到与 case 对应的通道的 sendq 或 recvq 队列上。

  • 如果一个 sudog 得到运行机会,它会被唤醒,清除所有 sudog 数据和其他属性,并从队列中移除其他 sudogs。

  • 此时,select 操作结束。

Summary

I still very much like Tucao, selectgo function gorgeous written more than 300 lines, which also used a number of goto to jump, really can not split it, but the code of God, or really need to worship.

转载于 Programming VIP