channel 是什么
Go语言当中的并发模型是 CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。
源码概览
接下来我们将基于go1.24.4版本来进行源码的深入分析,源码路径为 go1.24.4/src/runtime/chan.go
。在开始分析之前,让我们首先从全局梳理一下channel源码的结构。包含结构体如下:
包含函数和常量如下:
代码结构还是很清晰的,没有太复杂的结构体关系。
可以看到,有几个函数都是以reflect
来作为开头,并且它们都有go:linkname
这样的注释,例如:
1 | //go:linkname reflect_makechan reflect.makechan |
那这个go:linkname
有什么用呢?其实它是一个给 Go 编译器的指令。它的主要作用是在两个包之间建立一个符号链接,使得一个包可以访问另一个包中的私有函数或变量。
也就是说,这里将runtime.reflect_makechan
与reflect.makechan
链接起来,当在调用reflect.makechan
时,实际上执行的就是runtime.reflect_makechan
。go:linkname
允许用户绕过成员可见性限制。
源码剖析
核心数据结构
hchan
1 | type hchan struct { |
hchan
结构体是 Go 语言中 channel 的运行时内部表现
- qcount:当前 channel 剩余的元素个数
- dataqsize:环形队列的长度,即 channel 的容量
- buf:指向长度为 dataqsize 的数组,即环形队列的底层存储
- elemsize:每个元素的大小
- synctest:用于同步测试
- closed:标识channel 是否关闭
- timer:为 channel 提供数据的定时器
- elemtype:channel 中元素的类型
- sendx:环形队列中发送操作的索引
- recvx:环形队列中接收操作的索引
- recvq:等待接收的 goroutine 队列
- sendq:等待发送的 goroutine 队列
- lock:保护
hchan
所有字段的互斥锁
waitq
1 | type waitq struct { |
waitq
中包含first
和last
两个指向sudog
的指针,它本身是一个sudog
队列。
chantype
chantype
并不是一个在 Go 源代码中用type
关键字定义的具体结构体。它是由编译器在编译期间内部构建的一个类型信息结构,用来描述 channel 的元数据。
g
1 | type g struct { |
g
是goroutine 的内部表示(定义在src/runtime/runtime2.go
文件中,字段太多,不在此处一一列举),可以把g
理解为一个 goroutine 的所有状态的集合,就像操作系统中的 PCB 和 TCB 一样。
sudog
1 | type sudog struct { |
sudog
是”sudo-g”的缩写,作用是代表一个在同步对象上(如 channel 或 mutex)阻塞的 goroutine(它的结构体定义也在src/runtime/runtime2.go
文件中)。它与g
是多对多的关系。
当一个 goroutine 需要在 channel 上发送或接收数据,但操作无法立即完成时(例如,向一个满了的 channel 发送,或从一个空的 channel 接收),runtime 不会把整个庞大的g
结构体放入 channel 的等待队列,这样做既浪费空间,也增加了管理的复杂性。取而代之的是,runtime 会创建一个轻量级的sudog
结构体。
- g:goroutine
- next:队列中的下一个节点
- prev:队列中的前一个节点
- elem:读取/写入 channel 的数据的容器
- isSelect:标识当前协程是否处在 select 多路复用的流程中
- c:标识与当前 sudog 交互的channel
构造 channel
makechan
makechan
函数是make(chan T, size)
在运行时的真正实现。
1 | func makechan(t *chantype, size int) *hchan |
从方法签名中可以看到,输入包含两个参数:
t *chantype
:一个由编译器传入的、描述 channel 类型的内部结构。它包含了最关键的信息:channel 元素的类型t.Elem
size int
:channel 的缓冲区容量,即调用make
函数时传入的第二个参数
输出则是一个初始化好的、代表 channel 的运行时对象的指针。
进入函数首先进行参数校验,这部分代码确保传入的参数是有效且安全的,防止出现溢出或不符合硬件要求的内存分配。
1 | elem := t.Elem |
if elem.Size_ >= 1<<16
:检查 channel 类型中每个元素的大小是否超过了hchan
结构中elemsize
字段的表示范围。elemsize
是一个uint16,其最大值为2^16-1,这个检查确保元素大小不会溢出该字段。if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign
:内存对齐检查。maxAlign
是平台支持的最大对齐值。此检查确保hchan
结构体自身的大小是对齐的,并且元素的对其要求没有超出平台限制。1
2
3
4mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSie || size < 0 {
panic(plainError("makechan: size out of range"))
}mem, overflow := ...
:计算缓冲区所需的总内存大小,math.MulUintptr
会安全地进行乘法,并返回一个overflow
标识。if overflow || ...
:进行内存范围检查:overflow
:乘法计算本身溢出mem > maxAlloc-hchanSize
:检查请求的内存是否过大。maxAlloc
是 runtime 允许一次性分配的最大内存。这里用maxAlloc-hchanSize
是因为hchan
结构体和buf
缓冲区是在一次分配中完成的,所以要保证总大小不受限制size < 0
:channel 的容量不能为负数
接着就进入到核心的内存分配策略,这是makechan
最核心的部分,它根据缓冲区大小和元素类型采用了三种不同的内存分配策略,以达到最优的性能和 GC 效率。
1 | var c *hchan |
Case 1:无需缓冲区
- 这包含了创建无缓冲的 channel 和元素大小为 0 这两种情况。此时只会分配
hchan
结构体本身的内存,第二个参数nil
告诉 GC 这块内存(目前)不包含任何需要扫描的指针 c.buf = c.raceaddr()
是一个特殊处理,因为此时没有分配buf
,但是,race detector需要一个唯一的内存地址来进行同步事件的记录。因此,这里将buf
指向了hchan
内部的一个地址,为竞争检测器提供了一个“假的”缓冲区地址。
Case 2:有缓冲区但元素不含指针- 这里一次性分配了
hchan
和buf
所需的全部内存。并将buf
指向了hchan
结构体末尾的内存地址。 - 由于元素不含指针,因此也将
mallocgc
的第二个参数设置为了nil
,将整块内存都标记为了不需扫描,GC 扫描时就可以将整块内存跳过。
Case 3:有缓冲区,且元素包含指针 - 这种情况下就会进行两次独立的内存分配:
c = new(hchan)
为hchan
结构体本身分配内存c.buf = mallocgc(mem, elem, true)
为缓冲区buf
单独分配内存
- 这样做的目的也是为了 GC。因为
buf
中存放了包含指针的元素,GC 必须能扫描这块内存区域,通过mallocgc
中传入elem(*_type)
就为这块内存提供了类型信息,此时 GC 扫描时就能找到buf
中的所有指针。如果合并分配内存,GC 的管理就会变得很复杂。
在内存分配完成后,最后对hchan
的所有字段进行赋值。
1 | c.elemsize = uint16(elem.Size_) |
写流程
chansend
这个函数实现了向一个 channel 发送数据的所有逻辑。
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool |
可以看到,输入包含以下参数:
c *hchan
:目标 channel 的运行时表示ep unsafe.Pointer
:只想要发送的元素的内存地址。注意这里传递的是指针而不是值本身block bool
:如果为true
则代表这是一个阻塞发送,即常规的ch <- x
;否则代表这是一个非阻塞发送,通常用于select
语句的case
callerpc uintptr
:调用者的程序计数器,主要用于竞争检测器
返回值表示发送是否立即成功。
1 | if c == nil { |
if c == nil
,向一个nil
channel 发送数据会造成永久阻塞。因此如果是阻塞发送,则直接调用gopark
让当前的 goroutine 永远休眠;否则直接返回false
。if !block && c.closed == 0 && full(c)
,这是一个性能优化,它的目标是在不加锁的情况下,快速判断一个非阻塞的发送操作是否会失败,因为加锁是一个相对昂贵的动作。注释中解释了这背后的逻辑,即使在c.closed
和full(c)
的间隙中 channel 被关闭了,这个判断依然是逻辑自洽的,它表示某个时间点 channel 是“开放且满”的状态,因此返回false
是完全正确的。
如果Fast Path 没有命中,就需要加锁来访问 channel 的状态。
1 | lock(&c.lock) |
再次检查 channel 是否关闭。如果确实关闭了,则需要panic
。
接下来则会按优先级尝试两种最理想的发送方式。
1 | if sg := c.recvq.dequeue(); sg != nil { |
最高优先级的则是当接收等待队列recvq
中有正在等待的goroutine 的情况。这是最高效的通信方式,send
函数会直接将要发送的数据从发送方的内存(ep
)拷贝到接收方的内存(sg.elem
),并且这个过程完全绕过了 channel 的buf
缓冲区。func() { unlock(&c.lock) }
则是一个回调,确保在唤醒对方之前释放锁。
这里
recvq
中存在goroutine
,其实也意味着当前的channel 是空的,否则如果有数据的话就会直接被正在等待的goroutine
读走了。
1 | if c.qcount < c.dataqsiz { |
如果没有等待的接受者,检查 channel 的缓冲区 buf 是否还有空间(c.qcount < c.dataqsiz
)。如果存在空间,则会按顺序完成以下动作:
qp := chanbuf(c, c.sendx)
:计算出channelbuf
环形队列中下一个可用的存储槽的内存地址typedmemmove(c.elemtype, qp, ep)
:将要发送的数据从ep
拷贝到缓冲区槽qp
中c.sendx++
:更新发送索引sendx
,如果到达末尾则绕回 0c.qcount++
:将缓冲区中的元素计数加一unlock(&c.lock)
:释放锁,操作完成- 发送成功,返回
true
1 | if !block { |
如果以上两种情况都不满足,发送方就必须阻塞自己。
if !block
:如果此时仍未发送成功,并且这是一个非阻塞调用,那么就只能放弃了。释放锁并返回false
。- 准备阻塞:
gp := getg()
:获取当前 goroutine 的g
结构体mysg := acquireSudog()
:从池中获取一个sudog
结构体,作为当前 goroutine 在等待队列中的“排队凭证”- 填充
sudog
:将 goroutine、要发送的数据的指针(ep
)等关键信息存入mysg
c.sendq.enqueue(mysg)
:讲这个sudog
加入到 channel 的发送等待队列(sendq
)中
- 调用
gopark
让 goroutine 进入休眠。gopark
会在休眠前执行chenparkcommit
函数,这个函数会释放 channel 的锁。这是绝对必要的,如果不释放锁,就没有其他 goroutine 能够访问这个 channel,也没人能唤醒我们,导致死锁。 KeepAlive(ep)
:这是一个给编译器的指令,确保ep
指向的栈上变量在 goroutine 休眠期间不会被 GC 回收。
1 | // ... (woken up here) |
当一个接收者从 channel 取走数据后,它会唤醒一个在sendq
中等待的发送者。代码将从gopark
(阻塞点)之后继续执行。
closed := !mysg.success
:唤醒我们的接收者在完成操作之后,会设置mysg.success
标志。如果是一个接收者成功取走了数据,success
将会是true
。但如果是closechan
函数将我们唤醒,则会是false
。通过这个标志我们就能知道是被正常接收了,还是因为 channel 被关闭了。- 释放
sudog
,清理g
上的等待状态。 if closed
:如果是因为关闭而被唤醒,那么发送操作必须panic
。- 如果一切正常,则返回
true
。
send
1 | // send process a send operation on an empty channel c. |
这个send
函数专门处理一种特定的场景:当一个 goroutine 向一个空的 channel 发送数据时,有另一个 goroutine 已经在该 channel 上等待接收数据。在这种情况下,数据可以直接从发送者复制给接收者,而无需经过 channel 的内部缓冲区。
可以看到,输入包含以下参数:
c *hchan
:指向 channelsg *sudog
:一个在 channel 上等待的 goroutineep unsafe.Pointer
:指向要发送的数据unlockf func()
:用于给 channel 解锁skip int
:用于goready
函数,表示在恢复接收者goroutine 的堆栈跟踪时要跳过的帧数
1 | if sg.elem != nil { |
sendDirect
会将数据从发送者ep
直接拷贝到接收者sg
。它会根据 channel 的元素类型c.elemtype
来执行内存拷贝。
拷贝完成后,sg.elem
被设置为nil
,表示数据已经成功传递。
1 | gp := sg.g |
接下来则会做唤醒接收者goroutine 的操作:
gp := sg.g
:获取等待接收的 goroutine 的指针unlockf()
:在唤醒接收者之前,先释放 channel 的锁。这是为了避免死锁和提高并发性能。如果先唤醒再解锁,被唤醒的 goroutine 则可能会立即尝试获取锁,从而导致性能下降gp.param = unsafe.Pointer(sg)
:设置接收者 goroutine 的一个参数,用于传递这次通信的一些信息sg.success = true
:标记数据接收成功if sg.releasetime != 0 ...
:记录 goroutine 的释放时间goready(gp, skip+1)
:goready
函数会将接收者goroutine(gp)
的状态从“等待”变为“可运行”,并将其放入调度器的运行队列中。这样,调度器在下一次调度时就可以执行这个 goroutine 了。
sendDirect
1 | // Sends and receives on unbuffered or empty-buffered channels are the |
这里专门封装了一个函数来进行内存的拷贝,那为什么不直接用memcpy
或者typedmemmove
呢?
答案就在这个函数的注释里。GC 有一个基本假设:对一个 goroutine 的栈进行写入,只有当该 goroutine 自己在运行时,由它自己完成。而如果想要违反这一假设,就需要使用“写屏障”。如果采用typedmemmove
这个函数,虽然会调用一个写屏障(bulkBarrierPreWrite
),但那个写屏障是为“写入到堆”设计的。而在我们的场景里,目标地址(dst
)在另一个 goroutine 的栈上,所以这个写屏障不适用。因此,我们的解决方案是:调用底层的、无 GC 感知的memmove
函数来进行拷贝,并在此之前手动调用一个专门的写屏障typeBitsBulkBarrier
。这个函数会扫描src
指向的内存区域,如果根据t
的类型信息发现里面包含了任何指针,它就会对这些指针所指向的对象进行标记。这相当于告诉 GC:这些对象现在被接收者 goroutine 的栈引用了,它们是存活的,不要回收它们。
同时,在dst := sg.elem
中有一个重要的细节:在 Go 中,栈是可以收缩的。根据注释,一旦我们读出了这个值,就必须立即使用它,期间不能有任何可能导致当前 goroutine 被抢占的操作。因为如果发生抢占,接收者的栈可能会被移动(收缩),导致我们刚取出的dst
指针失效。幸运的是,这几行代码足够短,编译器能保证它们是原子执行的,不会被中断。
读流程
chanrecv
这个函数的目标是从指定的 channel 中接收一个值,并将其写入到 ep
指向的内存地址。同时,它也是 <-ch
操作的底层实现。
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) |
c *hchan
:指向 channel 的内部表示ep unsafe.Pointer
:指向用于存放接收到数据的内存位置。如果为nil
,则接收到的数据被丢弃block bool
:如果为true
,则表示这是一个阻塞操作(如val := <-ch
);否则表示这是一个非阻塞操作(如select
语句)- 返回值
(selected, received bool)
:selected
表示是否成功与 channel 进行了交互,被用于驱动select
语句。对于非阻塞调用,如果 channel 为空,selected
会是false
。received
表示是否接收到了一个真实的值。
1 | // ... |
首先是初始检查和“快速失败”阶段:
- 对于
nil
channel,尝试从它接收会永久阻塞。如果是非阻塞模式,则直接返回; - 针对非阻塞调用的一个重要优化,它在获取锁之前检查 channel 是否为空。
- 如果是打开的并且为空,那它没有可接收的数据,直接返回;
- 如果是关闭的,则需要再次检查是否为空,因为可能有数据在检查是否关闭和是否为空的间隙中被发送。如果为空,则
typedmemclr
清零目标内存(返回零值);否则需要进入下面的慢路径进行处理。
1 | lock(&c.lock) |
接下来,获取 channel 的锁并处理核心逻辑。可以看到,分别处理了 channel 已关闭和未关闭的逻辑:
- Case 1:channel 已关闭
- 如果 channel 被关闭且缓冲区为空,则解锁并返回零值
- 如果缓冲区仍有数据,则会继续向下执行,进入缓冲区接收数据的逻辑
- Case 2:channel 未关闭
- 检查发送等待队列
c.sendq
,如果dequeue
返回了一个sudog
,则会直接调用recv
函数,它会直接从等待的发送者那里拷贝数据到ep
,然后唤醒那个发送者 goroutine。这里的优化逻辑与chansend
中类似。
- 检查发送等待队列
1 | if c.qcount > 0 { |
如果前面的情况都不满足,则会检查缓冲区c.qcount
。如果有数据,那么计算出它在环形队列中的地址qp
,使用typedmemmove
将数据从源地址qp
拷贝到目标地址ep
,并清理源地址中的数据,更新索引和缓冲区计数,最后解锁返回。
1 | if !block { |
如果执行到了这部分,则说明 channel 是打开的,但是缓冲区里面没有数据,也没有等待中的发送者。
- 此时如果是非阻塞模式,则不再等待,直接返回。
- 如果是阻塞模式,则当前 goroutine 将进入等待状态。此时会将当前 goroutine 包装成一个
sudog
,并放入等待队列recvq
中。最后调用 gopark,它会释放 channel 的锁并让当前 goroutine 休眠,此时 CPU 则会切换去执行其他的 goroutine。
1 | // someone woke us up |
当gopark
返回后,代码会从这里继续执行。mysg.success
字段会被发送方设置,如果为true
,表示成功接收了一个值,如果 channel 在我们等待时被关闭了,关闭操作会唤醒我们并将success
设置为false
。最后,做一些清理工作,释放sudog
,返回。
recv
与send
类似,recv
的调用时机也是当sendq
中有正在等待的 goroutine 时。
1 | if c.dataqsiz == 0 { |
当 channel 无缓冲区时,可直接进行内存拷贝,将数据从发送者直接拷贝到接收者的内存空间。
1 | else { |
否则,则可以证明此时的缓冲区是满的。
- 首先让
qp
指向 channel 缓冲区中最旧的一个元素(即队头c.recvx
),将这个最旧的元素从缓冲区拷贝到接收者的目标地址 - 将等待中的发送者的数据拷贝到这个空位上
- 更新索引,让
sendx
和recvx
相同,维持缓冲区已满的状态
最后则是一些公共的收尾逻辑,与send
大致相同,不再赘述。
recvDirect
1 | func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { |
与sendDirect
类似,引入写屏障来进行跨 goroutine 的内存写入,不再赘述。
关闭流程
closechan
这个函数是close(ch)
的底层实现,用于关闭 channel,并释放发送队列和接收队列中的资源。
1 | if c == nil { |
首先进行一些检查工作:
close(nil)
:检查关闭 nil channel 的非法操作lock(&c.lock)
:获取锁,确保在关闭过程中没有其他 goroutine 对这个 channel 进行操作- 重复关闭检查
1 | // ... |
标记为关闭状态。
1 | var glist glist |
释放 channel 关联的接收队列。
- 创建一个 goroutine 列表
glist
,用于保存需要释放的所有 goroutine。 typedmemclr(c.elemtype, sg.elem)
:如果等待的接收者提供了一个内存地址用来接收值,现在需要将这块内存清理掉。sg.success = false
:做标记,当 goroutine 被唤醒后,会检查这个标志(可见chansend
和chanrecv
中的逻辑),据此判断它有没有收到一个真实的值。- 将这个 goroutine 放入临时列表
glist
中。
1 | // release all writers (they will panic) |
同样,释放 channel 关联的发送队列。只是对于发送者而言,如果检查发现success
为false
,它就知道 channel 在它等待时被关闭了,然后直接 panic。(具体逻辑见chansend
)。
1 | unlock(&c.lock) |
先解锁,再唤醒glist
中的所有 goroutine。这里只是为了减少锁的持有时间,提升并发性能。调用goready
可以将每个等待的 goroutine 放回调度器的可运行队列中。这些 goroutine 在下一次被调度时,就会执行chansend
/chanrecv
中的后续逻辑。
旁路 & 辅助函数
chanbuf
1 | // chanbuf(c, i) is pointer to the i'th slot in the buffer. |
一个用于返回环形队列缓冲区指定位置元素的辅助方法。
full
1 | // full reports whether a send on c would block (that is, the channel is full). |
从注释中可以看到,这个函数仅能表示调用瞬间的状态,用于快速检查缓冲区是否为满。对c.recvq.first
的检查是因为如果接收队列为空,那么发送者来了就得等,就可以认为 channel 是“满”的。
empty
1 | // empty reports whether a read from c would block (that is, the channel is |
类似full
,这里判断的是 channel 是否为空。而之所以这里用了atomic
的方式来读取,是因为empty
主要应用在chanrecv
当中,而在chanrecv
中对于代码的执行顺序有着严格的要求,即empty
必须在closed
的检查之前完成。atomic
除了可以保证原子性外,还能防止进行指令重排。
chanlen
1 | func chanlen(c *hchan) int { |
获取当前的缓冲区长度。
chancap
1 | func chancap(c *hchan) int { |
获取当前的缓冲区容量。
enqueue
1 | func (q *waitq) enqueue(sgp *sudog) { |
环形队列入队函数,链表操作,比较常规。
dequeue
1 | func (q *waitq) dequeue() *sudog { |
这里的实现不仅仅将sudog
进行出队动作,还考虑了并发条件下的竞争问题,主要关注if sgp.isSelect
分支。
首先,想象一个select
语句:
1 | select { |
如果ch1
和ch2
同时为空,当前 goroutine 必须同时等待这两个 channel。为了实现这一点,会创建一个sudog
,然后将其同时放入ch1
和ch2
的recvq
当中。这里就产生了一个问题:一个sudog
同时存在于多个地方。
那么此时,如果存在两个goroutine:a 和 b 同时调用ch <- val
进行取值,那么它们会发现自己的recvq
的头都是这个sudog
。现在,这两个 goroutine 就陷入了竞争,都想通过dequeue
来唤醒这个sudog
。但根据select
的规则,只有一个能赢,isSelect
分支的目的就是为了判断谁能抢占这个sudog
。isSelect
是一个标志,当一个sudog
因为select
而被放入等待队列时,这个标志就会被设置为true
,代表它是一个被抢占的资源。接下来,则会判断sudog
指向的g
中的一个原子字段selectDone
,可以把它想象成一个 goroutine 的“状态锁”,初始值为 0,表示还没有被抢占。CompareAndSwap(0, 1)
是原子交换操作,如果当前值为 0,则改为 1,然后返回true
;否则什么都不做,返回false
。
所以也就是说,看谁能成功将这个sudog.g
的selectDone
字段修改为 1,那么它就抢占成功了,此时就会将这个sudog
返回给它;否则抢占失败,继续循环,尝试获取队列中的下一个元素。
而注释中的窗口期内,输家在调用dequeue
时,仍然会看到这个sudog
,如果没有selectDone
这个原子锁,输家也会错误地认为自己抢占成功,可能会导致重复唤醒这个sudog
,或者出现其他的数据竞争问题。
扩展
为什么需要在
makechan
函数中做内存对齐检查?
首先,我们需要理解什么是内存对齐。
简单来说,CPU 访问内存不是逐个字节进行的,它通常以“字”(word)为单位进行读取,一个“字”在 64 位系统上通常是 8 字节。- 对齐访问(Aligned Access):如果一个 8 字节的数据存储在一个地址是 8 的倍数的位置,CPU 只需要一次内存操作就能读取整个数据。这是最高效的。
- 非对齐访问(Unaligned Access):如果这个 8 字节的数据存储在一个非 8 的倍数的地址,它就会跨越两个内存“字”的边界。这时,CPU 必须执行两次内存操作,分别读取两部分,然后在内部将它们拼接起来。这会带来显著的性能下降。
更糟糕的是,在某些 CPU 架构上,非对齐访问是不被允许的,会直接导致硬件异常。此外,原子操作也必须在对齐的内存上进行才能保证原子性。
因此,Go 的类型系统为每种类型都定义了一个对齐要求(_type.Align_
),编译器和运行时必须遵守这个要求来分配内存。深入分析内存对齐检查的两部分
maxAlign
是 Go 运行时定义的一个常量,代表了当前平台内存分配器所能保证的最大对齐基数。在 64 位的系统上,maxAlign
通常是 8。这意味着mallocgc
分配的任何内存块,其起始地址都至少是8 的倍数。第一部分:
hchanSize % maxAlign != 0
这个检查是在验证hchan
结构体自身的大小(hchanSize
)是否是平台最大对齐值(maxAlign
)的整数倍。那为什么它对“合并分配”如此重要?
让我们来回顾一下“合并分配”的优化:mallocgc(hchansize + mem, ...)
。这会分配一段连续的内存,其内存布局如下:1
2
3
4[ hchan 结构体 (hchanSize 字节) | 缓冲区 buf (mem 字节) ]
^ ^
| |
c (起始地址) c + hchanSize (buf 的起始地址)在
hchan
中,我们需要分配的内存大小可以大致用mallocgc(hchansize + unsafe.Sizeof(hchan.buf))
来表示。由于mallocgc
返回的起始地址 c 是保证对齐到maxAlign
的,如果hchanSize
自身也是maxAlign
的倍数,那么c+hchanSize
自然也是maxAlign
的倍数,这就保证了紧跟在hchan
结构体后面的缓冲区buf
的起始地址也是对齐的。第二部分:
elem.Align_ > maxAlign
这个检查是在验证 channel 中元素的对齐要求(elem.Align_
)是否超过了平台内存分配器所能保证的最大对齐值(maxAlign
)。这是一个安全性检查,它确保我们不会创建一个无法满足其对其要求的 channel。
正常来说,Go 中内置的数据类型都没有超过 8 字节对齐的,这里的检查更多是对通过unsafe
创建出来的类型进行检查。hchansize
的计算公式解析
可以看到,hchanSize
的计算公式为unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{})) & (maxAlign - 1))
,那为什么是这样算呢?
第一部分非常直接,unsafe.Sizeof(hchan{})
就是hchan
结构体在内存中占用的原始、未经任何处理的字节数。这个大小取决于结构体内部字段的类型和数量,以及编译器的布局策略。
第二部分的作用是计算为了达到对齐的要求,需要在原始大小上额外添加的填充字节数。具体而言就是将unsafe.Sizeof(hchan{})
取反后与maxAlign - 1
做&
操作,就可以得到这个需要填充的字节数。什么是写屏障?
写屏障是一小段由编译器插入的代码,它在每次“指针写入”操作时执行。它的主要作用是通知 GC:“我刚刚把一个指针写到了内存的这个位置,你可能需要更新你的可达对象信息了”。这可以防止 GC 错误地回收仍然在被引用的对象。chanrecv 中的 fast path 检查顺序问题
在代码中采用的顺序是:首先检查empty
,然后检查atomic.Load(&c.closed)
。
为什么这样是安全的?
这里的关键逻辑依赖于一个事实:一个 channel 一旦被关闭,就永远不可能被重新打开。 因此,如果我们在 T2 时刻观察到 channel 是打开的,我们就可以百分之百地推断出它在之前的 T1 时刻也一定是打开的。如果颠倒过来,则可以设想这样一个场景:假设有一个 channel,现在它是打开的并且有一个元素,有三个 goroutine 正在工作,a 正在执行这个 fast path,b 准备调用
close(ch)
,c 准备从 channel 接收数据。
时间线:- T1:a 执行第一个检查:
atomic.Load(&c.closed)
。此时结果为true
- T2:上下文切换到 b,它调用了
close(ch)
,现在 channel 被关闭 - T3:c 开始运行,它从缓冲区中取走了元素,现在
empty(c) == true
- T4:继续执行 a,此时
empty
检查通过
那么此时可以看到,channel 的真实状态是“open and not empty”,但这里却判断为了“open and empty”。
- T1:a 执行第一个检查:
为什么只对非阻塞的情况做 fast path 的处理?
因为对于非阻塞调用而言,它的核心需求是想立刻知道结果,那么就可以快速尝试一下,如果没有数据就马上返回。而阻塞的情况如果没有的话,还是得等数据进来,因此没有做 fast path 的必要。