Go chan

hchan

This is the structure of channel.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
qcount uint // total data in the queue 队列中的数据总数
dataqsiz uint // size of the circular queue 环形队列的大小,大于 0 表示有缓冲区,等于 0 表示无缓冲区
buf unsafe.Pointer // points to an array of dataqsiz elements 指向元素数组的指针
elemsize uint16 //单个元素的大小
closed uint32 //表示通道是否已关闭的标志。
elemtype *_type // element type 元素类型的描述,通常在接口部分编写
sendx uint // send index 发送数组的索引
recvx uint // receive index 接收数组的索引
recvq waitq // list of recv waiters 等待接收数据的 goroutine 列表
sendq waitq // list of send waiters 等待发送数据的 goroutine 列表

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,用于保护对通道的并发访问。
}

waitq

1
2
3
4
type waitq struct {
first *sudog //指向等待队列中的第一个 sudog 结构体的指针。
last *sudog //指向等待队列中的最后一个 sudog 结构体的指针。
}

sudog

sudog represents a waiting g

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
// 用于表示等待队列
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g //指向等待者(goroutine)的 g 结构体的指针

//next、prev:用于链接 sudog 结构体形成链表,以便在等待队列中进行调度。
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack) 数据元素的指针

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

//用于记录等待者的获取时间和释放时间。
acquiretime int64
releasetime int64
ticket uint32

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
//isSelect 字段是 sudog 结构体的一个布尔字段,用于指示关联的 goroutine (g) 是否参与了 select 语句
isSelect bool

// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool //指示通信是否成功,如果等待者被唤醒是因为从通道接收到值,则为 true;如果被唤醒是因为通道关闭,则为 false。
// waitlink、waittail:用于链接等待者的链表,通常在等待队列或信号量的根节点中使用
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel 指向等待的通道(hchan 结构体)的指针
}

hcase

This is the structure generated by a case in select

1
2
3
4
5
6
7
8
//scase 结构体用于描述 select 语句中的每个 case 条件,包括通道和相关的数据元素
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
kind uint16 // Current case type, nil recv send or default
pc uintptr // race pc (for race detector / msan)
releasetime int64
}

From the above structure, we can see that the internal essence of channel is a buffer pool + two queues (send recv). So how does the data interact? There is a sketch map on the internet, which shows a more vivid image.

中文:
从上面的结构可以看出,通道的内部本质是一个缓冲池加上两个队列(发送队列和接收队列)。那么数据是如何进行交互的呢?网络上有一张示意图,可以更形象地展示这个过程。

Illustration

Buffer-free (synchronization)

synchronization

Buffered (asynchronous)

synchronization

Combining the above structure and illustration, we can probably infer the channel’s send recv process.

  1. If it’s a recv (<-channel) request, first determine if someone in a sendq queue is waiting for the data to be placed.
    1. If the sendq queue is not empty and the buffer pool is not empty, then the sendq queue is waiting to put data. The g of recv takes data from the buffer pool, and then puts the data carried by the first g of sendq into the buf buffer pool.
    2. If sendq is not empty but the buffer pool is empty, then this is chan without buffer pool. I take the first g data from sendq and it’s ok.
    3. If sendq is empty, go to the buffer pool and see if the buffer pool has data, then take it and go.
    4. If sendq is empty and the buffer pool has no data, wait here.
  2. If send, the process is the same as recv
  3. If the channel is closed at this point, wake up all waiting g in the waiting queue (sendq or recvq) and tell them channel.close = true

Next is tracking the source code, proving and correcting the conjecture.

中文:

  1. 如果是获取请求,则首先确定在发送队列中是否有数据等待发送
    1. 如果sendq队列不空,缓冲池也不空,那么sendq队列正在等待放入数据。recv的g从缓冲池中获取数据,然后将sendq的第一个g携带的数据放入buf缓冲池。
    2. 如果sendq不是空的,但是缓冲池是空的,那么这就是没有缓冲池的chan。我从sendq中取第一个g数据,这没问题。
    3. 如果sendq为空,进入缓冲池,看看缓冲池中是否有数据,然后获取数据。
    4. 如果sendq为空,并且缓冲池没有数据,则在这里等待。
  2. 如果send,处理与recv相同
  3. 如果通道此时关闭,唤醒等待队列(sendq或recvq)中的所有等待g,并告诉它们通道关闭。

Source code analysis

Receiving and Sending

main

Let’s use the go tool to analyze how channel generation, C < - i,< - C are implemented at the bottom level.

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
c1 := make(chan int)
c2 := make(chan int, 2)
go func() {
c1 <- 1
c2 <- 2
}()
<-c1
<-c2
close(c1)
close(c2)
}
1
2
go build -gcflags=all="-N -l" main.go
go tool objdump -s "main.main" main

After we filter out the CALL

go tool objdump -s “main.main” main | grep CALL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
main.go:4             0x459a01                e8faa1faff              CALL runtime.makechan(SB)
main.go:5 0x459a17 e8e4a1faff CALL runtime.makechan(SB)
main.go:6 0x459a28 e8f31dfbff CALL runtime.newobject(SB)
main.go:6 0x459a60 e89bd1ffff CALL runtime.gcWriteBarrierDX(SB)
main.go:6 0x459a8a e871d1ffff CALL runtime.gcWriteBarrierDX(SB)
main.go:6 0x459a98 e8a3eafdff CALL runtime.newproc(SB)
main.go:10 0x459aa4 e8d7adfaff CALL runtime.chanrecv1(SB)
main.go:11 0x459ab0 e8cbadfaff CALL runtime.chanrecv1(SB)
main.go:12 0x459aba e841abfaff CALL runtime.closechan(SB)
main.go:13 0x459ac4 e837abfaff CALL runtime.closechan(SB)
main.go:3 0x459ad3 e848cdffff CALL runtime.morestack_noctxt.abi0(SB)
main.go:7 0x459b12 e869a3faff CALL runtime.chansend1(SB)
main.go:8 0x459b23 e858a3faff CALL runtime.chansend1(SB)
main.go:6 0x459b32 e849ccffff CALL runtime.morestack.abi0(SB)
  1. makechan: Functions that create channel s are the same with or without buffers
  2. Chanrecv1: <-c1, the function called
  3. Cloechan: The function called when close (c1) closes channel usage
  4. Chansend1: C1 < - 1, which is the function used to send data

makechan

The main part of creating channel is to allocate memory to the structure and bug buffer pool, and then initialize the structure of hchan.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 队列或元素大小为零。
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 元素不包含指针。
// 一次性分配 hchan 和 buf。
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 元素包含指针。
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}

chanrecv1

chanrecv1 calls the chanrecv implementation. chanrecv listens to channels and receives data from channels and writes it into ep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

// 如果 c 为空 则根据 block参数决定是否阻塞接收操作
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

//如果不是阻塞操作且通道为空,则接收操作失败,返回
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
// 如果通道未关闭,则返回
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

// 如果通道已关闭且缓冲区中没有待接收的元素,则接收操作失败,返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

// 从发送队列中获取一个等待的发送者,如果存在则直接接收值并返回
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// 如果通道的缓冲区中有元素,则直接接收值并返回
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

// 如果不是阻塞操作,则返回不选定接收操作
if !block {
unlock(&c.lock)
return false, false
}

// 准备阻塞接收操作
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

From the above logic, we can see four possibilities of data transmission.

  • Sendq queue is not empty, but buf is empty (in the case of synchronous blocking g): get sudog of sendq queue head and copy sudog.elem data to the target address ep
  • Sendq queue is not empty, buf is not empty (in the case of asynchronous blocking g g): copy the buf header element to the target address ep, get sudog of sendq queue header, and then copy the sudog.elem data to the end of the buf queue, release sudog
  • sendq queue is empty, but buf is not empty (in the case of asynchronous non-blocking g): copy the buf header element to the target address ep
  • sendq queue is empty and buf is empty (synchronous non-blocking g case): at this time, you need to block yourself, get a sudog structure, put it in channel’s recvq queue, wait for send g to wake up and copy your data to the target address

If you think about it, you will find a problem. After L66 goparkunlock (& C. lock, waitReason ChanReceive, traceEvGoBlockRecv, 3) hibernates g above, G is woken up and continues to execute from here, as if there is no logic to show that the recv g gets the data, and this G is blocked here for equivalence. So far, but none of the following logic operates on data?

The recv method that follows is understandable

中文:
从上面的逻辑中,我们可以看到数据传输的四种可能性。

  • sendq 队列不为空,但 buf 为空(同步阻塞的 g 的情况):获取 sendq 队列头的 sudog,将 sudog.elem 的数据复制到目标地址 ep。
  • sendq 队列不为空,buf 也不为空(异步阻塞的 g 的情况):将 buf 的头元素复制到目标地址 ep,获取 sendq 队列头的 sudog,然后将 sudog.elem 的数据复制到 buf 队列的末尾,释放 sudog。
  • sendq 队列为空,但 buf 不为空(异步非阻塞的 g 的情况):将 buf 的头元素复制到目标地址 ep。
  • sendq 队列为空,buf 也为空(同步非阻塞的 g 的情况):此时,需要阻塞自己,获取一个 sudog 结构,将其放入通道的 recvq 队列中,等待发送的 g 唤醒并将数据复制到目标地址。

如果仔细思考,您会发现一个问题。在上面的代码中,当 G 执行到 L66 的 goparkunlock(&c.lock, waitReason | ChanReceive, traceEvGoBlockRecv, 3) 时,G 会进入休眠状态,就好像没有逻辑来表明接收的 G 获取到数据,而此时的 G 在此被阻塞,而后续的逻辑都没有操作数据。

recv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 非缓存型的
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg) // 数据竞争检测:同步通道和发送者
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep) // 从发送者直接复制数据
}
} else {

// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 缓冲区已满,从队列头部取出元素,并让发送者将其放入队列尾部
qp := chanbuf(c, c.recvx) // 获取队列头部元素地址
if raceenabled {
racenotify(c, c.recvx, nil) // 数据竞争检测:通知接收者正在读取该位置的元素
racenotify(c, c.recvx, sg) // 数据竞争检测:通知发送者正在写入该位置的元素
}
// 从队列复制数据到接收者
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 从发送者复制数据到队列
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++ // 更新接收索引
if c.recvx == c.dataqsiz {
c.recvx = 0 // 接收索引达到缓冲区末尾,重置为0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz // 发送索引与接收索引相同,队列已满
}
sg.elem = nil
gp := sg.g
unlockf() // 解锁通道
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks() // 记录释放时间
}
goready(gp, skip+1) // 唤醒接收者的goroutine
}

Combining the above logic, we find that before g is waked up, the sudog data associated with g is already used by channel, so when g is waked up, there is no need to deal with the logic related to data transmission.

acquireSudog

Get a sudog structure, which uses the two-level cache of p scheme, i.e. local cache of an array of sudog, just like the queues of cache and scheduler scheduler scheduled to run g. At the same time, a linked list of sudog cache is maintained on the global scheme structure. When p local sudog is insufficient or excessive, go ahead. Balancing with global sched

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func acquireSudog() *sudog {
// Lock up
mp := acquirem()// 获取当前的 m (线程)上下文
pp := mp.p.ptr()// 获取 p (处理器)上下文
// If the current cache does not have sudog, then go to the global scheme and pull some sudog caches in batches to the current p
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)// 锁定全局 sudog 缓存
// First, try to grab a batch from central cache.
// 首先,尝试从中央缓存中获取一批 sudog
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
// If the central cache is empty, allocate a new one.
// 如果中央缓存为空,则分配一个新的缓存。
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
// Get the first return from the local cache sudog and update the sudogcache slice
n := len(pp.sudogcache)// 当前缓存中 sudog 的数量
s := pp.sudogcache[n-1]// 获取最后一个 sudog
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
// De lock
releasem(mp)// 释放当前的 m (线程)上下文
return s
}

releaseSudog

releaseSudog is sudog that releases the currently used sudog and balances the sudog of p local cache and sudog of global queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func releaseSudog(s *sudog) {
mp := acquirem() // avoid rescheduling to another P
pp := mp.p.ptr()
// If the number of sudogs cached locally in p exceeds the maximum length of this slice, balance the general sudog to the global scheme
if len(pp.sudogcache) == cap(pp.sudogcache) {
// Transfer half of local cache to the central cache.
var first, last *sudog
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if first == nil {
first = p
} else {
last.next = p
}
last = p
}
lock(&sched.sudoglock)
last.next = sched.sudogcache
sched.sudogcache = first
unlock(&sched.sudoglock)
}
// Put the released sudog in the slice of the local cache
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}

chansend1

The sending logic is similar to the receiving logic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().

// 快速路径:在不获取锁的情况下检查非阻塞操作是否失败。
//
// 在观察到通道未关闭之后,我们观察到通道未准备好发送。每个观察都是一个单字大小的读取
//(首先是 c.closed,然后是 full())。
// 由于已关闭的通道不能从“准备好发送”转换为“不准备发送”,即使在两个观察之间通道关闭了,
// 它们也暗示了一个时刻,即在两次观察之间通道既未关闭也未准备好发送。
// 我们的行为就好像在那个时刻观察到了通道,并报告发送无法进行。
//
// 这里的读取重排是可以的:如果我们观察到通道未准备好发送,然后观察到它未关闭,
// 那意味着在第一次观察期间通道没有关闭。但是,这里没有保证向前推进。我们依赖于
// chanrecv() 和 closechan() 中锁释放的副作用来更新该线程对 c.closed 和 full() 的视图。
if !block && c.closed == 0 && full(c) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 找到等待的接收者。我们将要发送的值直接传递给接收者,绕过通道缓冲区(如果有)。
// 同上 要么是没有缓存区 要么是缓存区满了
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 通道缓冲区中有空间。将要发送的元素入队。
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// 在通道上阻塞。某个接收者将完成我们的操作。
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

send

Send and recv have the same logic, and because a sudog is taken from recvq, it means that the buffer is empty, so the send method does not need to consider adding data to the buffer. Send is simpler than recv, it only needs to exchange data and wake up g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg) // 对于无缓冲通道,执行竞争检测同步操作
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
// 假装我们通过缓冲区发送,即使我们直接复制。请注意,只有在启用竞争检测时,我们需要增加头/尾位置。
racenotify(c, c.recvx, nil) // 通知竞争检测器接收者已经准备好接收
racenotify(c, c.recvx, sg) // 通知竞争检测器发送者已经准备好发送
c.recvx++ // 增加接收位置
if c.recvx == c.dataqsiz { // 循环队列,如果接收位置超出缓冲区范围,则重置为0
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz // 发送位置等于接收位置(c.sendx = (c.sendx+1) % c.dataqsiz)
}
}
if sg.elem != nil { // 如果有要发送的元素
sendDirect(c.elemtype, sg, ep) // 直接将元素发送给等待的接收者
sg.elem = nil // 清空元素
}
gp := sg.g
unlockf() // 解锁通道
gp.param = unsafe.Pointer(sg) // 将sudog作为参数存储到Goroutine的param字段中
sg.success = true // 设置发送成功标志
if sg.releasetime != 0 { // 如果需要记录释放时间
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 将Goroutine标记为可运行状态
}

closechan

Receiving and receiving data is over, and finally the channel is closed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel")) // close 空 chan 抛出异常
}

lock(&c.lock) //获取锁
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) // 如果通道已关闭,则释放锁并抛出运行时异常
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) // 执行竞争检测
racerelease(c.raceaddr()) // 释放竞争检测资源
}

c.closed = 1 // 将通道的closed字段设置为1,表示通道已关闭

var glist gList
// 释放所有的读取者(接收者)
// release all readers
for {
sg := c.recvq.dequeue() // 从接收队列中出队一个等待的sudog
if sg == nil {
break
}
if sg.elem != nil { // 如果sudog的elem字段不为nil,表示有接收者正在等待接收数据
typedmemclr(c.elemtype, sg.elem) // 清空接收者的元素
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg) // 将sudog作为参数存储到Goroutine的param字段中
sg.success = false // 设置接收失败标志
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp) // 将Goroutine添加到glist中
}

// 释放所有的写入者(发送者),它们将引发panic
// release all writers (they will panic)
for {
sg := c.sendq.dequeue() // 从发送队列中出队一个等待的sudog
if sg == nil {
break
}
sg.elem = nil //清空sudog的元素
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg) // 将sudog作为参数存储到Goroutine的param字段中
sg.success = false // 设置发送失败标志
if raceenabled {
raceacquireg(gp, c.raceaddr()) // 执行竞争检测
}
glist.push(gp) // 将Goroutine添加到glist中
}
unlock(&c.lock) // 释放通道的锁
// 在释放通道锁后,将所有的Goroutine标记为可运行状态,以便它们可以继续执行
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

After chan close, sudog in all blocked recvq and sendq (recvq and sendq have only one queue), clear some data and state of sudog, set gp.param = nil, let upper logic know that it is caused by close chan

After waking up all g, g will continue to execute the remaining logic in chansend or chanrecv, that is, to release sudog (which is why closechan does not need to release sudog)

Summary

Language expression is always pale. When looking for information on the Internet, I saw two flow charts, which can be seen in combination.

send process

Receiving process (recv)

转载于 Programming VIP

添加了部分注释,翻译了一些英文解释,有问题大家可以添加评论讨论,我会及时更正