05-并发模型与锁机制
章节概述
Go 语言的并发模型基于 CSP(Communicating Sequential Processes)理论,提供了丰富的同步原语。本章将深入解析 Mutex、RWMutex、Atomic 等锁机制的源码实现,帮助读者理解 Go 并发编程的核心原理。
学习目标
- 理解 Mutex 的状态位设计和自旋机制
- 掌握 RWMutex 的读写分离实现
- 了解 Atomic 操作的底层原理
- 学会信号量机制的使用
- 能够识别和解决并发问题
️ 并发模型架构
同步原语层次
┌─────────────────────────────────────────────────────────┐
│ Go 并发模型 │
├─────────────────────────────────────────────────────────┤
│ Channel 通信 Mutex 互斥锁 RWMutex 读写锁 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 消息传递 │ │ 排他访问 │ │ 读写分离 │ │
│ │ 无锁通信 │ │ 自旋+信号量 │ │ 读多写少 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Atomic 原子操作 WaitGroup 等待 Once 单次执行 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 硬件级操作 │ │ 协程同步 │ │ 初始化保护 │ │
│ │ 无锁编程 │ │ 计数等待 │ │ 单例模式 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
锁的层次结构
┌─────────────────────────────────────────────────────────┐
│ 锁的层次结构 │
├─────────────────────────────────────────────────────────┤
│ 用户层: sync.Mutex, sync.RWMutex, sync.Once │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 高层同步原语,提供易用的接口 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Runtime层: runtime.sema, runtime.mutex │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 底层同步机制,提供阻塞和唤醒功能 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 硬件层: atomic operations, CPU instructions │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 原子操作,保证操作的原子性 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Mutex 源码分析
Mutex 结构体
文件位置:src/sync/mutex.go
type Mutex struct {
state int32 // 状态位
sema uint32 // 信号量
}
const (
mutexLocked = 1 << iota // 锁是否被持有
mutexWoken // 是否有唤醒的 goroutine
mutexStarving // 是否处于饥饿模式
mutexWaiterShift = iota // 等待者数量偏移
)
状态位解析
// state 字段的位布局
// 0: 锁是否被持有 (mutexLocked)
// 1: 是否有唤醒的 goroutine (mutexWoken)
// 2: 是否处于饥饿模式 (mutexStarving)
// 3-31: 等待者数量 (mutexWaiterShift)
Lock 方法实现
func (m *Mutex) Lock() {
// 快速路径:如果锁未被持有,直接获取
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 慢路径:竞争获取锁
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// 1. 如果锁未被占用且不在饥饿模式,尝试获取锁
if old&(mutexLocked|mutexStarving) == 0 {
new := old | mutexLocked
if old&mutexWoken != 0 {
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
break
}
old = m.state
continue
}
// 2. 自旋等待
if old&mutexLocked != 0 && old&mutexStarving == 0 && canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
doSpin()
iter++
old = m.state
continue
}
// 3. 进入等待队列
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexStarving != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 阻塞等待
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
Unlock 方法实现
func (m *Mutex) Unlock() {
// 快速路径:直接释放锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
// 正常模式:唤醒一个等待者
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式:直接交给下一个等待者
runtime_Semrelease(&m.sema, true, 1)
}
}
RWMutex 源码分析
RWMutex 结构体
文件位置:src/sync/rwmutex.go
type RWMutex struct {
w Mutex // 写锁
writerSem uint32 // 写者信号量
readerSem uint32 // 读者信号量
readerCount int32 // 读者数量
readerWait int32 // 等待的读者数量
}
读锁实现
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有写者等待,阻塞读者
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 读者数量为负,说明有写者等待
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 最后一个读者,唤醒写者
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
写锁实现
func (rw *RWMutex) Lock() {
// 获取写锁
rw.w.Lock()
// 等待所有读者完成
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
func (rw *RWMutex) Unlock() {
// 恢复读者计数
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒所有等待的读者
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放写锁
rw.w.Unlock()
}
Atomic 操作
原子操作类型
文件位置:src/runtime/internal/atomic
// 原子加法
func Xadd(ptr *uint32, delta int32) int32
// 原子比较并交换
func Cas(ptr *uint32, old, new uint32) bool
// 原子加载
func Load(ptr *uint32) uint32
// 原子存储
func Store(ptr *uint32, val uint32)
// 原子交换
func Xchg(ptr *uint32, new uint32) uint32
原子操作实现
// 汇编实现示例
TEXT ·Cas(SB),NOSPLIT,$0
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET
原子操作使用
package main
import (
"fmt"
"runtime"
"sync/atomic"
"time"
)
func main() {
var counter int64
var wg sync.WaitGroup
// 启动多个 goroutine 并发计数
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
atomic.AddInt64(&counter, 1)
}
}()
}
wg.Wait()
fmt.Printf("Counter: %d\n", counter)
}
信号量机制
信号量结构
文件位置:src/runtime/sema.go
type semaRoot struct {
lock mutex
treap *sudog
nwait uint32
}
var semtable [251]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
信号量操作
func runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
root := semroot(addr)
s := acquireSudog()
s.g = getg()
s.isSelect = false
s.next = nil
s.prev = nil
s.elem = unsafe.Pointer(addr)
s.waitlink = nil
s.c = nil
root.lock()
root.nwait++
root.treap = s
root.unlock()
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
}
func runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
root.lock()
root.nwait--
s := root.treap
if s != nil {
root.treap = s.next
root.unlock()
s.next = nil
s.prev = nil
s.elem = nil
s.c = nil
goready(s.g, 3)
} else {
root.unlock()
}
}
️ 实战代码
1. 自实现轻量级锁
package main
import (
"fmt"
"runtime"
"sync/atomic"
"time"
)
// 自旋锁实现
type SpinLock struct {
state int32
}
func (sl *SpinLock) Lock() {
for !atomic.CompareAndSwapInt32(&sl.state, 0, 1) {
// 自旋等待
runtime.Gosched()
}
}
func (sl *SpinLock) Unlock() {
atomic.StoreInt32(&sl.state, 0)
}
// 读写锁实现
type SimpleRWMutex struct {
readCount int32
writeCount int32
readLock SpinLock
writeLock SpinLock
}
func (rw *SimpleRWMutex) RLock() {
rw.readLock.Lock()
defer rw.readLock.Unlock()
atomic.AddInt32(&rw.readCount, 1)
}
func (rw *SimpleRWMutex) RUnlock() {
rw.readLock.Lock()
defer rw.readLock.Unlock()
atomic.AddInt32(&rw.readCount, -1)
}
func (rw *SimpleRWMutex) Lock() {
rw.writeLock.Lock()
defer rw.writeLock.Unlock()
atomic.AddInt32(&rw.writeCount, 1)
// 等待所有读者完成
for atomic.LoadInt32(&rw.readCount) > 0 {
runtime.Gosched()
}
}
func (rw *SimpleRWMutex) Unlock() {
atomic.AddInt32(&rw.writeCount, -1)
rw.writeLock.Unlock()
}
func main() {
// 测试自旋锁
var sl SpinLock
var counter int32
for i := 0; i < 1000; i++ {
go func() {
sl.Lock()
counter++
sl.Unlock()
}()
}
time.Sleep(1 * time.Second)
fmt.Printf("Counter: %d\n", counter)
// 测试读写锁
var rw SimpleRWMutex
var data int32
// 启动读者
for i := 0; i < 100; i++ {
go func() {
rw.RLock()
_ = data
rw.RUnlock()
}()
}
// 启动写者
for i := 0; i < 10; i++ {
go func() {
rw.Lock()
data++
rw.Unlock()
}()
}
time.Sleep(1 * time.Second)
fmt.Printf("Data: %d\n", data)
}
2. 并发安全计数器
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 使用 atomic 的计数器
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}
func (c *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *AtomicCounter) Store(value int64) {
atomic.StoreInt64(&c.value, value)
}
func (c *AtomicCounter) CompareAndSwap(old, new int64) bool {
return atomic.CompareAndSwapInt64(&c.value, old, new)
}
// 使用 Mutex 的计数器
type MutexCounter struct {
value int64
mutex sync.Mutex
}
func (c *MutexCounter) Add(delta int64) int64 {
c.mutex.Lock()
defer c.mutex.Unlock()
c.value += delta
return c.value
}
func (c *MutexCounter) Load() int64 {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.value
}
// 性能测试
func benchmarkCounter(counter interface{}, count int) time.Duration {
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
switch c := counter.(type) {
case *AtomicCounter:
c.Add(1)
case *MutexCounter:
c.Add(1)
}
}()
}
wg.Wait()
return time.Since(start)
}
func main() {
count := 100000
// 测试 atomic 计数器
atomicCounter := &AtomicCounter{}
duration1 := benchmarkCounter(atomicCounter, count)
fmt.Printf("Atomic counter: %v, value: %d\n", duration1, atomicCounter.Load())
// 测试 Mutex 计数器
mutexCounter := &MutexCounter{}
duration2 := benchmarkCounter(mutexCounter, count)
fmt.Printf("Mutex counter: %v, value: %d\n", duration2, mutexCounter.Load())
// 性能对比
fmt.Printf("Atomic is %.2fx faster\n", float64(duration2)/float64(duration1))
}
3. 并发安全 Map
package main
import (
"fmt"
"sync"
"time"
)
// 使用 RWMutex 的并发安全 Map
type SafeMap struct {
data map[string]interface{}
mutex sync.RWMutex
}
func NewSafeMap() *SafeMap {
return &SafeMap{
data: make(map[string]interface{}),
}
}
func (m *SafeMap) Set(key string, value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.data[key] = value
}
func (m *SafeMap) Get(key string) (interface{}, bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
value, ok := m.data[key]
return value, ok
}
func (m *SafeMap) Delete(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.data, key)
}
func (m *SafeMap) Len() int {
m.mutex.RLock()
defer m.mutex.RUnlock()
return len(m.data)
}
// 使用 atomic 的并发安全 Map
type AtomicMap struct {
data map[string]interface{}
mutex sync.Mutex
}
func NewAtomicMap() *AtomicMap {
return &AtomicMap{
data: make(map[string]interface{}),
}
}
func (m *AtomicMap) Set(key string, value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.data[key] = value
}
func (m *AtomicMap) Get(key string) (interface{}, bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
value, ok := m.data[key]
return value, ok
}
func main() {
// 测试 SafeMap
safeMap := NewSafeMap()
// 启动写者
for i := 0; i < 100; i++ {
go func(id int) {
safeMap.Set(fmt.Sprintf("key%d", id), id)
}(i)
}
// 启动读者
for i := 0; i < 100; i++ {
go func(id int) {
value, ok := safeMap.Get(fmt.Sprintf("key%d", id))
if ok {
fmt.Printf("Read key%d: %v\n", id, value)
}
}(i)
}
time.Sleep(1 * time.Second)
fmt.Printf("SafeMap length: %d\n", safeMap.Len())
}
性能分析
锁性能对比
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
func benchmarkMutex(count int) time.Duration {
var mutex sync.Mutex
var counter int64
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mutex.Lock()
counter++
mutex.Unlock()
}()
}
wg.Wait()
return time.Since(start)
}
func benchmarkAtomic(count int) time.Duration {
var counter int64
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
return time.Since(start)
}
func benchmarkChannel(count int) time.Duration {
ch := make(chan int, 1)
ch <- 0
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter := <-ch
counter++
ch <- counter
}()
}
wg.Wait()
return time.Since(start)
}
func main() {
count := 100000
// 测试 Mutex
duration1 := benchmarkMutex(count)
fmt.Printf("Mutex: %v\n", duration1)
// 测试 Atomic
duration2 := benchmarkAtomic(count)
fmt.Printf("Atomic: %v\n", duration2)
// 测试 Channel
duration3 := benchmarkChannel(count)
fmt.Printf("Channel: %v\n", duration3)
// 性能对比
fmt.Printf("Atomic is %.2fx faster than Mutex\n", float64(duration1)/float64(duration2))
fmt.Printf("Atomic is %.2fx faster than Channel\n", float64(duration3)/float64(duration2))
}
面试题库
基础问题
Mutex 的状态位设计?
- 0位:锁是否被持有
- 1位:是否有唤醒的 goroutine
- 2位:是否处于饥饿模式
- 3-31位:等待者数量
RWMutex 的读写分离原理?
- 读者计数:readerCount
- 写者等待:writerSem
- 读者等待:readerSem
- 写者优先策略
Atomic 操作的优势?
- 硬件级原子性
- 无锁操作
- 性能高
- 适合简单计数
进阶问题
Mutex 的自旋机制?
- 在用户态空转等待
- 减少系统调用开销
- 适合短时间持有锁的场景
饥饿模式的作用?
- 防止长时间等待的 goroutine 饿死
- 保证公平性
- 提高整体性能
如何选择同步原语?
- 简单计数:atomic
- 复杂逻辑:mutex
- 读多写少:rwmutex
- 通信:channel
源码问题
Mutex 的 lockSlow 流程?
- 检查锁状态
- 自旋等待
- 进入等待队列
- 处理饥饿模式
信号量的实现原理?
- 基于 sudog 结构
- 使用 treap 树管理
- 通过 gopark 阻塞
- 通过 goready 唤醒
扩展阅读
相关章节
- 01-GMP调度模型深度解析 - 锁与调度的关系
- 02-Channel源码剖析 - Channel 与锁的协作
- 07-Runtime全景融合 - 整体架构协作机制
下一章预告:我们将深入 Go 的网络模型和 Netpoll 机制,了解异步 I/O 和 epoll 的实现原理。