02-Channel源码剖析
章节概述
Channel 是 Go 语言并发编程的核心,它提供了 goroutine 之间的通信机制。本章将深入解析 Channel 的源码实现,包括数据结构、发送接收机制、select 多路复用等核心功能。
学习目标
- 理解 hchan 结构体的设计原理
- 掌握 send/recv 的完整流程
- 了解阻塞与唤醒机制
- 学会 select 多路复用的实现
- 能够识别和解决 Channel 相关的死锁问题
️ 核心数据结构
hchan 结构体
文件位置:src/runtime/chan.go
type hchan struct {
qcount uint // 队列中当前元素数量
dataqsiz uint // 环形缓冲区大小
buf unsafe.Pointer // 环形缓冲区指针
elemsize uint16 // 元素大小
closed uint32 // 关闭标志
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
lock mutex // 保护整个结构体的互斥锁
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
c *hchan
// ... 其他字段
}
关键字段解析
- buf:环形缓冲区,用于缓存数据
- qcount:当前队列中的元素数量
- dataqsiz:缓冲区大小,0 表示无缓冲
- sendx/recvx:发送和接收的索引位置
- recvq/sendq:等待队列,存储阻塞的 goroutine
- lock:保护 Channel 操作的互斥锁
Channel 创建
makechan 函数
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 检查参数合法性
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 无缓冲 Channel 或元素大小为 0
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = unsafe.Pointer(c) // 指向自己
case elem.ptrdata == 0:
// 元素不包含指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
创建过程分析
- 参数检查:验证元素类型和大小
- 内存分配:根据元素类型选择分配策略
- 结构初始化:设置 Channel 的基本属性
发送操作源码
chansend 函数
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 快速路径:非阻塞且 Channel 未满
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 检查 Channel 是否已关闭
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 1. 如果有等待的接收者,直接发送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 2. 如果缓冲区未满,加入缓冲区
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 3. 缓冲区已满,阻塞等待
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// 被唤醒后清理
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
发送流程分析
- 快速路径:非阻塞且 Channel 未满时直接返回
- 直接发送:有等待的接收者时直接发送数据
- 缓冲发送:缓冲区未满时加入缓冲区
- 阻塞等待:缓冲区已满时阻塞等待
接收操作源码
chanrecv 函数
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 快速路径:非阻塞且 Channel 为空
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 检查 Channel 是否已关闭且为空
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 1. 如果有等待的发送者,直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 2. 如果缓冲区有数据,从缓冲区接收
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 3. 缓冲区为空,阻塞等待
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 被唤醒后清理
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
接收流程分析
- 快速路径:非阻塞且 Channel 为空时直接返回
- 直接接收:有等待的发送者时直接接收数据
- 缓冲接收:缓冲区有数据时从缓冲区接收
- 阻塞等待:缓冲区为空时阻塞等待
Select 多路复用
selectgo 函数
文件位置:src/runtime/select.go
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 16]uint16)(unsafe.Pointer(order0))
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:]
// 1. 随机化顺序,保证公平性
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// 2. 按 Channel 地址排序,避免死锁
for i := 0; i < ncases; i++ {
j := pollorder[i]
c := scases[j].c
for k := i + 1; k < ncases; k++ {
kk := pollorder[k]
cc := scases[kk].c
if c != nil && cc != nil && c < cc {
pollorder[i], pollorder[k] = pollorder[k], pollorder[i]
}
}
}
// 3. 快速检查是否有就绪的 Channel
for i := 0; i < ncases; i++ {
cas := &scases[pollorder[i]]
c := cas.c
switch cas.kind {
case caseNil:
continue
case caseRecv:
if c.dataqsiz > 0 {
if c.qcount > 0 {
goto asyncsend
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
goto asyncrecv
}
}
if c.closed != 0 {
goto rclose
}
case caseSend:
if c.closed != 0 {
goto sclose
}
if c.dataqsiz > 0 {
if c.qcount < c.dataqsiz {
goto asyncsend
}
} else {
if sg := c.recvq.dequeue(); sg != nil {
goto asyncrecv
}
}
case caseDefault:
goto retc
}
}
// 4. 没有就绪的 Channel,阻塞等待
gp := getg()
if gp.selectDone != 0 {
throw("select: double select")
}
gp.selectDone = 1
gp.selectLock = &scases[0].c.lock
for i := 0; i < ncases; i++ {
cas := &scases[i]
c := cas.c
if c == nil {
continue
}
lock(&c.lock)
if cas.kind == caseRecv {
if c.closed != 0 {
unlock(&c.lock)
goto rclose
}
if c.qcount > 0 {
unlock(&c.lock)
goto asyncrecv
}
if sg := c.sendq.dequeue(); sg != nil {
unlock(&c.lock)
goto asyncrecv
}
} else {
if c.closed != 0 {
unlock(&c.lock)
goto sclose
}
if c.qcount < c.dataqsiz {
unlock(&c.lock)
goto asyncsend
}
if sg := c.recvq.dequeue(); sg != nil {
unlock(&c.lock)
goto asyncrecv
}
}
// 注册到等待队列
mysg := acquireSudog()
mysg.g = gp
mysg.isSelect = true
mysg.c = c
mysg.elem = cas.elem
mysg.waitlink = gp.selectDone
gp.selectDone = mysg
if cas.kind == caseRecv {
c.recvq.enqueue(mysg)
} else {
c.sendq.enqueue(mysg)
}
unlock(&c.lock)
}
// 阻塞等待
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
// 被唤醒后处理
gp.selectDone = 0
gp.selectLock = nil
// 清理其他 Channel 的等待
for i := 0; i < ncases; i++ {
cas := &scases[i]
if cas.kind == caseNil {
continue
}
c := cas.c
if c == nil {
continue
}
lock(&c.lock)
if cas.kind == caseRecv {
c.recvq.dequeue()
} else {
c.sendq.dequeue()
}
unlock(&c.lock)
}
return -1, false
}
Select 机制分析
- 随机化顺序:保证公平性,避免饥饿
- 锁排序:按 Channel 地址排序,避免死锁
- 快速检查:先检查是否有就绪的 Channel
- 阻塞等待:没有就绪时注册到所有 Channel 的等待队列
死锁分析
常见死锁场景
1. 无缓冲 Channel 死锁
// 错误示例:无缓冲 Channel 死锁
func deadlockExample() {
ch := make(chan int) // 无缓冲
ch <- 1 // 阻塞等待接收者
<-ch // 永远不会执行到这里
}
// 正确示例:使用 goroutine
func correctExample() {
ch := make(chan int)
go func() {
ch <- 1
}()
<-ch
}
2. 主 goroutine 阻塞
// 错误示例:主 goroutine 阻塞
func main() {
ch := make(chan int)
<-ch // 主 goroutine 阻塞,程序死锁
}
// 正确示例:使用 select 超时
func main() {
ch := make(chan int)
select {
case <-ch:
fmt.Println("received")
case <-time.After(1 * time.Second):
fmt.Println("timeout")
}
}
3. 循环等待
// 错误示例:循环等待
func circularWait() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
<-ch1
ch2 <- 1
}()
go func() {
<-ch2
ch1 <- 1
}()
// 两个 goroutine 互相等待
}
️ 实战代码
1. Mini Channel 实现
package main
import (
"fmt"
"sync"
"time"
)
// 简化的 Channel 实现
type MiniChan struct {
buf []interface{}
size int
count int
sendIdx int
recvIdx int
closed bool
lock sync.Mutex
sendCond *sync.Cond
recvCond *sync.Cond
}
func NewMiniChan(size int) *MiniChan {
ch := &MiniChan{
buf: make([]interface{}, size),
size: size,
}
ch.sendCond = sync.NewCond(&ch.lock)
ch.recvCond = sync.NewCond(&ch.lock)
return ch
}
func (ch *MiniChan) Send(value interface{}) bool {
ch.lock.Lock()
defer ch.lock.Unlock()
for {
if ch.closed {
panic("send on closed channel")
}
if ch.count < ch.size {
// 缓冲区未满,直接发送
ch.buf[ch.sendIdx] = value
ch.sendIdx = (ch.sendIdx + 1) % ch.size
ch.count++
ch.recvCond.Signal()
return true
}
// 缓冲区已满,等待
ch.sendCond.Wait()
}
}
func (ch *MiniChan) Recv() (interface{}, bool) {
ch.lock.Lock()
defer ch.lock.Unlock()
for {
if ch.count > 0 {
// 缓冲区有数据,直接接收
value := ch.buf[ch.recvIdx]
ch.buf[ch.recvIdx] = nil
ch.recvIdx = (ch.recvIdx + 1) % ch.size
ch.count--
ch.sendCond.Signal()
return value, true
}
if ch.closed {
return nil, false
}
// 缓冲区为空,等待
ch.recvCond.Wait()
}
}
func (ch *MiniChan) Close() {
ch.lock.Lock()
defer ch.lock.Unlock()
if ch.closed {
return
}
ch.closed = true
ch.sendCond.Broadcast()
ch.recvCond.Broadcast()
}
func main() {
// 测试无缓冲 Channel
ch1 := NewMiniChan(0)
go func() {
ch1.Send("hello")
}()
value, ok := ch1.Recv()
fmt.Printf("Received: %v, ok: %v\n", value, ok)
// 测试有缓冲 Channel
ch2 := NewMiniChan(2)
ch2.Send("world")
ch2.Send("golang")
value1, ok1 := ch2.Recv()
value2, ok2 := ch2.Recv()
fmt.Printf("Received: %v, ok: %v\n", value1, ok1)
fmt.Printf("Received: %v, ok: %v\n", value2, ok2)
// 测试关闭
ch2.Close()
value3, ok3 := ch2.Recv()
fmt.Printf("After close: %v, ok: %v\n", value3, ok3)
}
2. Select 模拟实现
package main
import (
"fmt"
"math/rand"
"time"
)
// 简化的 Select 实现
type SelectCase struct {
ch *MiniChan
value interface{}
kind int // 0: recv, 1: send, 2: default
}
const (
caseRecv = iota
caseSend
caseDefault
)
func Select(cases []SelectCase) (int, interface{}, bool) {
// 1. 随机化顺序
rand.Seed(time.Now().UnixNano())
for i := len(cases) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
cases[i], cases[j] = cases[j], cases[i]
}
// 2. 快速检查
for i, cas := range cases {
switch cas.kind {
case caseRecv:
if value, ok := cas.ch.Recv(); ok {
return i, value, true
}
case caseSend:
if cas.ch.Send(cas.value) {
return i, nil, true
}
case caseDefault:
return i, nil, false
}
}
// 3. 阻塞等待(简化实现)
for {
for i, cas := range cases {
switch cas.kind {
case caseRecv:
if value, ok := cas.ch.Recv(); ok {
return i, value, true
}
case caseSend:
if cas.ch.Send(cas.value) {
return i, nil, true
}
}
}
time.Sleep(1 * time.Millisecond)
}
}
func main() {
ch1 := NewMiniChan(1)
ch2 := NewMiniChan(1)
// 测试 Select
go func() {
time.Sleep(100 * time.Millisecond)
ch1.Send("from ch1")
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2.Send("from ch2")
}()
cases := []SelectCase{
{ch: ch1, kind: caseRecv},
{ch: ch2, kind: caseRecv},
{kind: caseDefault},
}
for i := 0; i < 3; i++ {
chosen, value, ok := Select(cases)
fmt.Printf("Chosen: %d, Value: %v, OK: %v\n", chosen, value, ok)
}
}
3. Channel 性能测试
package main
import (
"fmt"
"sync"
"time"
)
func benchmarkChannel(size int, count int) time.Duration {
ch := make(chan int, size)
start := time.Now()
var wg sync.WaitGroup
wg.Add(2)
// 发送者
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
ch <- i
}
close(ch)
}()
// 接收者
go func() {
defer wg.Done()
for range ch {
// 接收数据
}
}()
wg.Wait()
return time.Since(start)
}
func main() {
sizes := []int{0, 1, 10, 100, 1000}
count := 100000
for _, size := range sizes {
duration := benchmarkChannel(size, count)
fmt.Printf("Buffer size: %d, Duration: %v\n", size, duration)
}
}
性能分析
Channel 性能特点
- 无缓冲 Channel:延迟最低,但吞吐量受限
- 有缓冲 Channel:平衡延迟和吞吐量
- 大缓冲 Channel:吞吐量高,但延迟增加
优化建议
- 合理设置缓冲区大小:根据业务需求选择
- 避免频繁创建 Channel:复用 Channel 对象
- 使用 select 超时:避免永久阻塞
- 批量处理:减少 Channel 操作次数
面试题库
基础问题
Channel 的底层实现原理是什么?
- 基于环形缓冲区和等待队列
- 使用互斥锁保护并发访问
- 通过 sudog 结构管理阻塞的 goroutine
无缓冲和有缓冲 Channel 的区别?
- 无缓冲:同步通信,必须有接收者才能发送
- 有缓冲:异步通信,缓冲区未满时可以发送
Channel 的三种状态是什么?
- 阻塞等待:无数据可读或无空间可写
- 缓存发送:有缓冲区空间
- 已关闭:close 后不能再发送
进阶问题
为什么 Channel 是线程安全的?
- 使用互斥锁保护所有操作
- 原子操作更新状态
- 等待队列管理阻塞的 goroutine
Select 如何保证公平性?
- 随机化 case 顺序
- 避免某个 case 总是被优先选择
- 防止饥饿现象
Channel 死锁的常见原因?
- 无缓冲 Channel 没有接收者
- 主 goroutine 阻塞
- 循环等待
- 所有 goroutine 都在等待
源码问题
hchan 结构体的关键字段?
- buf:环形缓冲区
- sendx/recvx:发送和接收索引
- sendq/recvq:等待队列
- lock:互斥锁
Channel 的发送和接收流程?
- 快速路径:非阻塞检查
- 直接发送/接收:有等待者时
- 缓冲发送/接收:使用缓冲区
- 阻塞等待:加入等待队列
扩展阅读
相关章节
- 01-GMP调度模型深度解析 - Channel 与调度器的协作
- 05-并发模型与锁机制 - 同步原语的使用
- 07-Runtime全景融合 - 整体架构协作机制
下一章预告:我们将深入 Go 的内存管理机制,了解 mcache、mcentral、mheap 三层架构的设计原理。