Go Runtime调度器GMP模型
章节概述
Go语言的调度器是其高并发能力的核心。本章将深入剖析GMP模型的设计原理、工作机制,以及它如何与Linux调度器协同工作,让你真正理解为什么Go能够轻松支持百万级Goroutine。
学习目标:
- 深入理解GMP模型的设计原理
- 掌握Goroutine调度的完整流程
- 理解Go调度器与Linux调度器的关系
- 学会使用工具分析和优化Goroutine调度
核心概念
1. 为什么需要Go调度器?
操作系统线程的问题:
1. 创建开销大
- 初始栈空间: 2MB
- 创建时间: 微秒级
2. 上下文切换开销大
- 寄存器保存/恢复
- TLB刷新
- CPU cache失效
- 成本: 1-3微秒
3. 数量受限
- 受内存限制
- 系统调度开销大
- 通常只能创建几千个
Goroutine的优势:
1. 轻量级
- 初始栈: 2KB (可动态扩展)
- 创建时间: 纳秒级
2. 快速切换
- 用户态切换
- 只保存3个寄存器
- 成本: 约200纳秒
3. 海量支持
- 单机可创建数百万个
- 调度效率高
2. GMP模型架构
┌────────────────────────────────────────────────┐
│ Go Runtime │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ G │ │ G │ │ G │ │ G │ ... │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │
│ ┌──▼──────────▼─────────▼─────────▼───┐ │
│ │ P (Processor) │ │
│ │ - 本地队列 (最多256个G) │ │
│ │ - mcache (内存分配缓存) │ │
│ └──────────────┬───────────────────────┘ │
│ │ │
│ ┌──────────────▼───────────────┐ │
│ │ M (Machine/OS Thread) │ │
│ │ - 运行P的Goroutine │ │
│ │ - 关联一个OS线程 │ │
│ └──────────────┬───────────────┘ │
└─────────────────┼──────────────────────────────┘
│
┌─────────────────▼──────────────────────────────┐
│ Linux Kernel │
│ - CFS调度器调度M(OS线程) │
│ - M在CPU核心上执行 │
└─────────────────────────────────────────────────┘
三个核心概念:
G (Goroutine):
- 代表一个Goroutine
- 包含栈、指令指针、等待信息等
- 调度的基本单位
P (Processor):
- 逻辑处理器,代表执行Go代码的资源
- 数量由GOMAXPROCS决定
- 维护一个本地Goroutine队列
- 提供内存分配缓存(mcache)
M (Machine):
- 对应一个操作系统线程
- 必须关联一个P才能执行Go代码
- M会在多个P之间切换
3. 调度流程
创建Goroutine:
go func() { ... }
↓
创建G结构体
↓
优先放入P的本地队列
↓
如果本地队列满,放入全局队列
执行Goroutine:
M从P的本地队列获取G
↓
执行G的代码
↓
遇到阻塞/时间片用完
↓
G放回队列,M获取下一个G
Work Stealing:
P的本地队列空
↓
尝试从全局队列获取
↓
仍然没有
↓
从其他P的本地队列"偷取"一半
↓
仍然没有
↓
M进入空闲状态
4. 调度时机
主动调度:
// 1. Goroutine执行结束
func example() {
// ...
} // 函数返回,G结束
// 2. 主动让出CPU
runtime.Gosched()
// 3. 阻塞操作
ch := make(chan int)
<-ch // 阻塞,G进入等待状态
time.Sleep(time.Second) // 阻塞
被动调度(抢占):
1. 基于协作的抢占(Go 1.13及之前)
- 在函数调用时检查抢占标志
- 缺点:紧密循环无法抢占
2. 基于信号的抢占(Go 1.14+)
- sysmon线程检测长时间运行的G
- 发送SIGURG信号
- G在信号处理中保存状态并让出
源码解析
1. G的结构
文件: runtime/runtime2.go
type g struct {
stack stack // 栈空间
stackguard0 uintptr // 栈溢出检查
_panic *_panic // panic链表
_defer *_defer // defer链表
m *m // 当前运行在哪个M上
sched gobuf // 调度信息(PC, SP等)
atomicstatus uint32 // G的状态
goid int64 // Goroutine ID
waitsince int64 // G等待开始时间
waitreason waitReason // 等待原因
// ...
}
// G的调度信息
type gobuf struct {
sp uintptr // 栈指针
pc uintptr // 程序计数器
g guintptr // G自身
ret sys.Uintreg // 返回值
// ...
}
// G的状态
const (
_Gidle = iota // 0: 刚创建,未初始化
_Grunnable // 1: 可运行,在队列中
_Grunning // 2: 正在运行
_Gsyscall // 3: 执行系统调用
_Gwaiting // 4: 等待中(如等待channel)
_Gdead // 6: 已结束
// ...
)
2. P的结构
type p struct {
id int32
status uint32 // P的状态
m muintptr // 关联的M
mcache *mcache // 内存分配缓存
// 本地Goroutine队列
runqhead uint32
runqtail uint32
runq [256]guintptr // 最多256个G
// runnext是下一个要运行的G,优先级高
runnext guintptr
// 空闲G链表
gFree struct {
gList
n int32
}
// ...
}
// P的状态
const (
_Pidle = iota // 0: 空闲
_Prunning // 1: 运行中
_Psyscall // 2: 执行系统调用
_Pgcstop // 3: GC暂停
_Pdead // 4: 已销毁
)
3. M的结构
type m struct {
g0 *g // 用于执行调度代码的g
curg *g // 当前正在运行的G
p puintptr // 关联的P
nextp puintptr // 唤醒后要关联的P
oldp puintptr // 执行系统调用前关联的P
id int64
spinning bool // 是否正在寻找G
blocked bool // 是否阻塞
park note // 睡眠/唤醒
alllink *m // 所有M的链表
// 线程本地存储
tls [6]uintptr
// ...
}
4. 调度器的核心函数
文件: runtime/proc.go
// 调度器入口:寻找可运行的G
func schedule() {
_g_ := getg()
var gp *g
var inheritTime bool
top:
pp := _g_.m.p.ptr()
// 1. 检查是否需要GC
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
// 2. 每执行61次,从全局队列获取G(保证公平性)
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
// 3. 从P的本地队列获取G
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
// 4. 从全局队列或其他P偷取
if gp == nil {
gp, inheritTime = findrunnable()
}
// 5. 执行G
execute(gp, inheritTime)
}
// 执行Goroutine
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 关联G和M
_g_.m.curg = gp
gp.m = _g_.m
// 切换状态
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
// 抢占检查
gp.stackguard0 = gp.stack.lo + _StackGuard
// 切换到G的栈并执行
gogo(&gp.sched)
}
// Work Stealing
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// 1. 从全局队列获取
if sched.runqsize > 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 2. 从网络轮询器获取就绪的G
if netpollinited() && atomic.Load(&netpollWaiters) > 0 {
if list := netpoll(0); !list.empty() {
gp := list.pop()
injectglist(&list)
return gp, false
}
}
// 3. 从其他P偷取
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 偷取一半的G
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
}
}
// 4. 再次检查全局队列
if sched.runqsize > 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 5. 进入空闲状态
stopm()
goto top
}
5. 创建Goroutine
// go func() { ... } 编译后会调用
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
// 在系统栈上执行newproc1
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
// 将新G放入P的本地队列
runqput(_p_, newg, true)
// 如果有空闲P且没有spinning的M,唤醒一个M
if mainStarted {
wakep()
}
})
}
// 创建G的实现
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
_g_ := getg()
_p_ := _g_.m.p.ptr()
// 1. 从P的空闲G链表获取G,如果没有则新建
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin) // 分配2KB栈
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
// 2. 初始化G的栈空间
totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize
sp := newg.stack.hi - totalSize
// 3. 初始化调度信息
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 4. 设置入口函数
gostartcallfn(&newg.sched, fn)
// 5. 设置状态为可运行
casgstatus(newg, _Gdead, _Grunnable)
// 6. 分配goid
if _p_.goidcache == _p_.goidcacheend {
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
return newg
}
️ 观测和调优
1. 查看调度信息
GODEBUG环境变量:
# 查看调度器跟踪
GODEBUG=schedtrace=1000 ./your_program
# 输出示例:
# SCHED 0ms: gomaxprocs=8 idleprocs=6 threads=4 spinningthreads=0 idlethreads=0 runqueue=0 [0 0 0 0 0 0 0 0]
# 解释:
# gomaxprocs=8: GOMAXPROCS设置为8
# idleprocs=6: 6个P空闲
# threads=4: 4个OS线程
# spinningthreads=0: 0个spinning的M
# runqueue=0: 全局队列中有0个G
# [0 0 0 0 0 0 0 0]: 每个P的本地队列长度
详细版本:
GODEBUG=schedtrace=1000,scheddetail=1 ./your_program
# 输出每个G/M/P的详细信息
2. runtime包查询
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
// 查看GOMAXPROCS
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
// 查看Goroutine数量
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
// 查看OS线程数量
fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
// 启动大量Goroutine
for i := 0; i < 10000; i++ {
go func() {
time.Sleep(10 * time.Second)
}()
}
time.Sleep(1 * time.Second)
fmt.Printf("NumGoroutine after creation: %d\n", runtime.NumGoroutine())
// 查看调度统计
var stats runtime.MemStats
runtime.ReadMemStats(&stats)
fmt.Printf("NumGC: %d\n", stats.NumGC)
}
3. pprof分析
package main
import (
"net/http"
_ "net/http/pprof"
"runtime"
"time"
)
func main() {
// 启动pprof
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
// 创建大量Goroutine
for i := 0; i < 100; i++ {
go worker(i)
}
select {}
}
func worker(id int) {
for {
// 模拟工作
runtime.Gosched()
time.Sleep(time.Millisecond)
}
}
查看Goroutine信息:
# Goroutine profile
go tool pprof http://localhost:6060/debug/pprof/goroutine
# 命令:
# (pprof) top # 查看最多的Goroutine
# (pprof) list worker # 查看worker函数
# (pprof) web # 生成图形化视图
4. trace分析
package main
import (
"os"
"runtime/trace"
"time"
)
func main() {
// 开启trace
f, _ := os.Create("trace.out")
defer f.Close()
trace.Start(f)
defer trace.Stop()
// 运行一些Goroutine
for i := 0; i < 10; i++ {
go work()
}
time.Sleep(1 * time.Second)
}
func work() {
for i := 0; i < 1000000; i++ {
_ = i * i
}
}
查看trace:
go tool trace trace.out
# 浏览器会打开,可以看到:
# - Goroutine创建和销毁
# - P的执行情况
# - M的阻塞和唤醒
# - GC事件
实践示例
示例1:GOMAXPROCS的影响
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func cpuBound() {
for i := 0; i < 100000000; i++ {
_ = i * i
}
}
func testWithGOMAXPROCS(n int) {
runtime.GOMAXPROCS(n)
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cpuBound()
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("GOMAXPROCS=%d: %v\n", n, elapsed)
}
func main() {
fmt.Printf("CPU核心数: %d\n\n", runtime.NumCPU())
testWithGOMAXPROCS(1)
testWithGOMAXPROCS(2)
testWithGOMAXPROCS(4)
testWithGOMAXPROCS(8)
}
示例2:观察Work Stealing
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup
// 在P0上创建大量Goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 模拟工作
for j := 0; j < 1000000; j++ {
_ = j * j
}
if id%100 == 0 {
fmt.Printf("Goroutine %d done\n", id)
}
}(i)
}
wg.Wait()
}
示例3:Goroutine泄漏检测
package main
import (
"fmt"
"runtime"
"time"
)
func leakGoroutine() {
// 永远阻塞的channel
ch := make(chan int)
go func() {
<-ch // 永远等待
}()
}
func main() {
fmt.Printf("Initial goroutines: %d\n", runtime.NumGoroutine())
for i := 0; i < 10; i++ {
leakGoroutine()
}
time.Sleep(1 * time.Second)
fmt.Printf("After leak: %d\n", runtime.NumGoroutine())
// 使用pprof检查
// goroutine数量会持续增长
}
常见问题
Q1: GOMAXPROCS应该设置为多少?
A:
- 默认值:等于CPU核心数(
runtime.NumCPU()
) - CPU密集型:保持默认即可
- IO密集型:可以适当增大
- 容器环境:需要根据CPU quota设置
注意: 过大的GOMAXPROCS会增加调度开销。
Q2: 如何避免Goroutine泄漏?
A: 常见泄漏场景和解决方案:
// 场景1:channel永远阻塞
// 错误
ch := make(chan int)
go func() {
<-ch // 如果没人发送,永远阻塞
}()
// 正确:使用context或timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-ch:
case <-ctx.Done():
return
}
// 场景2:未关闭的HTTP请求
// 错误
resp, _ := http.Get(url)
// 忘记Close
// 正确
resp, _ := http.Get(url)
defer resp.Body.Close()
Q3: Goroutine数量多少合适?
A:
- 没有固定上限,取决于具体场景
- 一般可以创建数万到数百万个
- 关键是避免创建无限多的Goroutine
- 使用worker pool模式控制并发度
Q4: 如何优化Goroutine调度性能?
A: 优化建议:
- 减少不必要的Goroutine
- 避免频繁的channel操作
- 减少锁竞争
- 使用sync.Pool复用对象
- 避免在热路径上分配内存
复习题
选择题
GMP模型中的P代表什么?
- A. Process
- B. Processor
- C. Program
- D. Priority
Goroutine的初始栈大小是多少?
- A. 2MB
- B. 1MB
- C. 2KB
- D. 4KB
GOMAXPROCS控制什么?
- A. Goroutine数量
- B. OS线程数量
- C. P的数量
- D. M的数量
Work Stealing是什么?
- A. 从其他P偷取G
- B. 偷取CPU时间
- C. 偷取内存
- D. 偷取锁
Goroutine切换的开销约为?
- A. 1-3微秒
- B. 200纳秒
- C. 10毫秒
- D. 100纳秒
简答题
解释GMP模型的三个核心概念及其关系。
描述Goroutine的调度时机有哪些?
什么是Work Stealing?它解决了什么问题?
Go调度器与Linux调度器有什么区别?
如何检测和避免Goroutine泄漏?
实战题
性能分析题: 一个Web服务,并发请求下性能不佳。使用pprof发现有大量Goroutine阻塞在channel上。请分析可能的原因并给出优化方案。
调优题: 在一个8核服务器上运行Go程序,发现CPU使用率只有25%。请给出可能的原因和排查步骤。
设计题: 设计一个支持优雅关闭的worker pool,能够控制并发Goroutine数量。
扩展阅读
推荐资源
深入方向
- Go调度器的演进历史
- 基于信号的抢占机制
- Network poller的实现
- 与其他语言调度器的对比
下一章预告: 我们将探讨系统性能分析的方法论,包括如何系统地定位和解决性能瓶颈。