type hchan struct { qcount uint// total data in the queue 队列中的数据总数 dataqsiz uint// size of the circular queue 环形队列的大小,大于 0 表示有缓冲区,等于 0 表示无缓冲区 buf unsafe.Pointer // points to an array of dataqsiz elements 指向元素数组的指针 elemsize uint16//单个元素的大小 closed uint32//表示通道是否已关闭的标志。 elemtype *_type // element type 元素类型的描述,通常在接口部分编写 sendx uint// send index 发送数组的索引 recvx uint// receive index 接收数组的索引 recvq waitq // list of recv waiters 等待接收数据的 goroutine 列表 sendq waitq // list of send waiters 等待发送数据的 goroutine 列表
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex //互斥锁,用于保护对通道的并发访问。 }
waitq
1 2 3 4
type waitq struct { first *sudog //指向等待队列中的第一个 sudog 结构体的指针。 last *sudog //指向等待队列中的最后一个 sudog 结构体的指针。 }
// sudog represents a g in a wait list, such as for sending/receiving // on a channel. // // sudog is necessary because the g ↔ synchronization object relation // is many-to-many. A g can be on many wait lists, so there may be // many sudogs for one g; and many gs may be waiting on the same // synchronization object, so there may be many sudogs for one object. // // sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. // 用于表示等待队列 type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.
g *g //指向等待者(goroutine)的 g 结构体的指针
//next、prev:用于链接 sudog 结构体形成链表,以便在等待队列中进行调度。 next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) 数据元素的指针
// The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock.
// isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. //isSelect 字段是 sudog 结构体的一个布尔字段,用于指示关联的 goroutine (g) 是否参与了 select 语句 isSelect bool
// success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool//指示通信是否成功,如果等待者被唤醒是因为从通道接收到值,则为 true;如果被唤醒是因为通道关闭,则为 false。 // waitlink、waittail:用于链接等待者的链表,通常在等待队列或信号量的根节点中使用 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel 指向等待的通道(hchan 结构体)的指针 }
hcase
This is the structure generated by a case in select
1 2 3 4 5 6 7 8
//scase 结构体用于描述 select 语句中的每个 case 条件,包括通道和相关的数据元素 type scase struct { c *hchan // chan elem unsafe.Pointer // data element kind uint16// Current case type, nil recv send or default pc uintptr// race pc (for race detector / msan) releasetime int64 }
From the above structure, we can see that the internal essence of channel is a buffer pool + two queues (send recv). So how does the data interact? There is a sketch map on the internet, which shows a more vivid image.
Combining the above structure and illustration, we can probably infer the channel’s send recv process.
If it’s a recv (<-channel) request, first determine if someone in a sendq queue is waiting for the data to be placed.
If the sendq queue is not empty and the buffer pool is not empty, then the sendq queue is waiting to put data. The g of recv takes data from the buffer pool, and then puts the data carried by the first g of sendq into the buf buffer pool.
If sendq is not empty but the buffer pool is empty, then this is chan without buffer pool. I take the first g data from sendq and it’s ok.
If sendq is empty, go to the buffer pool and see if the buffer pool has data, then take it and go.
If sendq is empty and the buffer pool has no data, wait here.
If send, the process is the same as recv
If the channel is closed at this point, wake up all waiting g in the waiting queue (sendq or recvq) and tell them channel.close = true
Next is tracking the source code, proving and correcting the conjecture.
// compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") }
mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // 队列或元素大小为零。 // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. // 元素不包含指针。 // 一次性分配 hchan 和 buf。 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. // 元素包含指针。 c = new(hchan) c.buf = mallocgc(mem, elem, true) }
// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. funcchanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect.
if debugChan { print("chanrecv: chan=", c, "\n") }
// 如果 c 为空 则根据 block参数决定是否阻塞接收操作 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
//如果不是阻塞操作且通道为空,则接收操作失败,返回 // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // After observing that the channel is not ready for receiving, we observe whether the // channel is closed. // // Reordering of these checks could lead to incorrect behavior when racing with a close. // For example, if the channel was open and not empty, was closed, and then drained, // reordered reads could incorrectly indicate "open and empty". To prevent reordering, // we use atomic loads for both checks, and rely on emptying and closing to happen in // separate critical sections under the same lock. This assumption fails when closing // an unbuffered channel with a blocked send, but that is an error condition anyway. // 如果通道未关闭,则返回 if atomic.Load(&c.closed) == 0 { // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } returntrue, false } }
var t0 int64 if blockprofilerate > 0 { t0 = cputicks() }
lock(&c.lock)
// 如果通道已关闭且缓冲区中没有待接收的元素,则接收操作失败,返回 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } returntrue, false }
// 从发送队列中获取一个等待的发送者,如果存在则直接接收值并返回 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) returntrue, true }
// 如果通道的缓冲区中有元素,则直接接收值并返回 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) returntrue, true }
// 如果不是阻塞操作,则返回不选定接收操作 if !block { unlock(&c.lock) returnfalse, false }
// 准备阻塞接收操作 // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) returntrue, success }
From the above logic, we can see four possibilities of data transmission.
Sendq queue is not empty, but buf is empty (in the case of synchronous blocking g): get sudog of sendq queue head and copy sudog.elem data to the target address ep
Sendq queue is not empty, buf is not empty (in the case of asynchronous blocking g g): copy the buf header element to the target address ep, get sudog of sendq queue header, and then copy the sudog.elem data to the end of the buf queue, release sudog
sendq queue is empty, but buf is not empty (in the case of asynchronous non-blocking g): copy the buf header element to the target address ep
sendq queue is empty and buf is empty (synchronous non-blocking g case): at this time, you need to block yourself, get a sudog structure, put it in channel’s recvq queue, wait for send g to wake up and copy your data to the target address
If you think about it, you will find a problem. After L66 goparkunlock (& C. lock, waitReason ChanReceive, traceEvGoBlockRecv, 3) hibernates g above, G is woken up and continues to execute from here, as if there is no logic to show that the recv g gets the data, and this G is blocked here for equivalence. So far, but none of the following logic operates on data?
// recv processes a receive operation on a full channel c. // There are 2 parts: // 1) The value sent by the sender sg is put into the channel // and the sender is woken up to go on its merry way. // 2) The value received by the receiver (the current G) is // written to ep. // For synchronous channels, both values are the same. // For asynchronous channels, the receiver gets its data from // the channel buffer and the sender's data is put in the // channel buffer. // Channel c must be full and locked. recv unlocks c with unlockf. // sg must already be dequeued from c. // A non-nil ep must point to the heap or the caller's stack. funcrecv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 非缓存型的 if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) // 数据竞争检测:同步通道和发送者 } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) // 从发送者直接复制数据 } } else {
// Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // 缓冲区已满,从队列头部取出元素,并让发送者将其放入队列尾部 qp := chanbuf(c, c.recvx) // 获取队列头部元素地址 if raceenabled { racenotify(c, c.recvx, nil) // 数据竞争检测:通知接收者正在读取该位置的元素 racenotify(c, c.recvx, sg) // 数据竞争检测:通知发送者正在写入该位置的元素 } // 从队列复制数据到接收者 // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 从发送者复制数据到队列 // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ // 更新接收索引 if c.recvx == c.dataqsiz { c.recvx = 0// 接收索引达到缓冲区末尾,重置为0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz // 发送索引与接收索引相同,队列已满 } sg.elem = nil gp := sg.g unlockf() // 解锁通道 gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() // 记录释放时间 } goready(gp, skip+1) // 唤醒接收者的goroutine }
Combining the above logic, we find that before g is waked up, the sudog data associated with g is already used by channel, so when g is waked up, there is no need to deal with the logic related to data transmission.
acquireSudog
Get a sudog structure, which uses the two-level cache of p scheme, i.e. local cache of an array of sudog, just like the queues of cache and scheduler scheduler scheduled to run g. At the same time, a linked list of sudog cache is maintained on the global scheme structure. When p local sudog is insufficient or excessive, go ahead. Balancing with global sched
funcacquireSudog() *sudog { // Lock up mp := acquirem()// 获取当前的 m (线程)上下文 pp := mp.p.ptr()// 获取 p (处理器)上下文 // If the current cache does not have sudog, then go to the global scheme and pull some sudog caches in batches to the current p iflen(pp.sudogcache) == 0 { lock(&sched.sudoglock)// 锁定全局 sudog 缓存 // First, try to grab a batch from central cache. // 首先,尝试从中央缓存中获取一批 sudog forlen(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // If the central cache is empty, allocate a new one. // 如果中央缓存为空,则分配一个新的缓存。 iflen(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // Get the first return from the local cache sudog and update the sudogcache slice n := len(pp.sudogcache)// 当前缓存中 sudog 的数量 s := pp.sudogcache[n-1]// 获取最后一个 sudog pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } // De lock releasem(mp)// 释放当前的 m (线程)上下文 return s }
releaseSudog
releaseSudog is sudog that releases the currently used sudog and balances the sudog of p local cache and sudog of global queue.
funcreleaseSudog(s *sudog) { mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() // If the number of sudogs cached locally in p exceeds the maximum length of this slice, balance the general sudog to the global scheme iflen(pp.sudogcache) == cap(pp.sudogcache) { // Transfer half of local cache to the central cache. var first, last *sudog forlen(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } // Put the released sudog in the slice of the local cache pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
chansend1
The sending logic is similar to the receiving logic.
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool { if c == nil { if !block { returnfalse } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
if debugChan { print("chansend: chan=", c, "\n") }
if raceenabled { racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) }
// Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second full()). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. However, nothing here // guarantees forward progress. We rely on the side effects of lock release in // chanrecv() and closechan() to update this thread's view of c.closed and full().
var t0 int64 if blockprofilerate > 0 { t0 = cputicks() }
lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
// 找到等待的接收者。我们将要发送的值直接传递给接收者,绕过通道缓冲区(如果有)。 // 同上 要么是没有缓存区 要么是缓存区满了 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) returntrue }
// 通道缓冲区中有空间。将要发送的元素入队。 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) returntrue }
if !block { unlock(&c.lock) returnfalse }
// 在通道上阻塞。某个接收者将完成我们的操作。 // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep)
// someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } returntrue }
send
Send and recv have the same logic, and because a sudog is taken from recvq, it means that the buffer is empty, so the send method does not need to consider adding data to the buffer. Send is simpler than recv, it only needs to exchange data and wake up g.
var glist gList // 释放所有的读取者(接收者) // release all readers for { sg := c.recvq.dequeue() // 从接收队列中出队一个等待的sudog if sg == nil { break } if sg.elem != nil { // 如果sudog的elem字段不为nil,表示有接收者正在等待接收数据 typedmemclr(c.elemtype, sg.elem) // 清空接收者的元素 sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) // 将sudog作为参数存储到Goroutine的param字段中 sg.success = false// 设置接收失败标志 if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) // 将Goroutine添加到glist中 }
// 释放所有的写入者(发送者),它们将引发panic // release all writers (they will panic) for { sg := c.sendq.dequeue() // 从发送队列中出队一个等待的sudog if sg == nil { break } sg.elem = nil//清空sudog的元素 if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) // 将sudog作为参数存储到Goroutine的param字段中 sg.success = false// 设置发送失败标志 if raceenabled { raceacquireg(gp, c.raceaddr()) // 执行竞争检测 } glist.push(gp) // 将Goroutine添加到glist中 } unlock(&c.lock) // 释放通道的锁 // 在释放通道锁后,将所有的Goroutine标记为可运行状态,以便它们可以继续执行 // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
After chan close, sudog in all blocked recvq and sendq (recvq and sendq have only one queue), clear some data and state of sudog, set gp.param = nil, let upper logic know that it is caused by close chan
After waking up all g, g will continue to execute the remaining logic in chansend or chanrecv, that is, to release sudog (which is why closechan does not need to release sudog)
Summary
Language expression is always pale. When looking for information on the Internet, I saw two flow charts, which can be seen in combination.