HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • Go 架构进阶

    • Go 架构进阶学习手册 - 总目录
    • 01-GMP调度模型深度解析
    • 02-Channel源码剖析
    • 03-内存模型与GC机制
    • 04-垃圾回收器全链路
    • 05-并发模型与锁机制
    • 06-网络模型与Netpoll
    • 07-Runtime全景融合
    • 08-性能优化实战
    • 09-微服务架构实践

05-并发模型与锁机制

章节概述

Go 语言的并发模型基于 CSP(Communicating Sequential Processes)理论,提供了丰富的同步原语。本章将深入解析 Mutex、RWMutex、Atomic 等锁机制的源码实现,帮助读者理解 Go 并发编程的核心原理。

学习目标

  • 理解 Mutex 的状态位设计和自旋机制
  • 掌握 RWMutex 的读写分离实现
  • 了解 Atomic 操作的底层原理
  • 学会信号量机制的使用
  • 能够识别和解决并发问题

️ 并发模型架构

同步原语层次

┌─────────────────────────────────────────────────────────┐
│                    Go 并发模型                          │
├─────────────────────────────────────────────────────────┤
│  Channel 通信     Mutex 互斥锁    RWMutex 读写锁       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐    │
│  │ 消息传递    │  │ 排他访问    │  │ 读写分离    │    │
│  │ 无锁通信    │  │ 自旋+信号量 │  │ 读多写少    │    │
│  └─────────────┘  └─────────────┘  └─────────────┘    │
│                                                         │
│  Atomic 原子操作   WaitGroup 等待   Once 单次执行      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐    │
│  │ 硬件级操作  │  │ 协程同步    │  │ 初始化保护  │    │
│  │ 无锁编程    │  │ 计数等待    │  │ 单例模式    │    │
│  └─────────────┘  └─────────────┘  └─────────────┘    │
└─────────────────────────────────────────────────────────┘

锁的层次结构

┌─────────────────────────────────────────────────────────┐
│                   锁的层次结构                          │
├─────────────────────────────────────────────────────────┤
│  用户层: sync.Mutex, sync.RWMutex, sync.Once           │
│  ┌─────────────────────────────────────────────────────┐ │
│  │ 高层同步原语,提供易用的接口                        │ │
│  └─────────────────────────────────────────────────────┘ │
│                                                         │
│  Runtime层: runtime.sema, runtime.mutex                 │
│  ┌─────────────────────────────────────────────────────┐ │
│  │ 底层同步机制,提供阻塞和唤醒功能                    │ │
│  └─────────────────────────────────────────────────────┘ │
│                                                         │
│  硬件层: atomic operations, CPU instructions            │
│  ┌─────────────────────────────────────────────────────┐ │
│  │ 原子操作,保证操作的原子性                          │ │
│  └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

Mutex 源码分析

Mutex 结构体

文件位置:src/sync/mutex.go

type Mutex struct {
    state int32  // 状态位
    sema  uint32 // 信号量
}

const (
    mutexLocked = 1 << iota // 锁是否被持有
    mutexWoken              // 是否有唤醒的 goroutine
    mutexStarving           // 是否处于饥饿模式
    mutexWaiterShift = iota // 等待者数量偏移
)

状态位解析

// state 字段的位布局
// 0: 锁是否被持有 (mutexLocked)
// 1: 是否有唤醒的 goroutine (mutexWoken)
// 2: 是否处于饥饿模式 (mutexStarving)
// 3-31: 等待者数量 (mutexWaiterShift)

Lock 方法实现

func (m *Mutex) Lock() {
    // 快速路径:如果锁未被持有,直接获取
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    
    // 慢路径:竞争获取锁
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    
    for {
        // 1. 如果锁未被占用且不在饥饿模式,尝试获取锁
        if old&(mutexLocked|mutexStarving) == 0 {
            new := old | mutexLocked
            if old&mutexWoken != 0 {
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                break
            }
            old = m.state
            continue
        }
        
        // 2. 自旋等待
        if old&mutexLocked != 0 && old&mutexStarving == 0 && canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            doSpin()
            iter++
            old = m.state
            continue
        }
        
        // 3. 进入等待队列
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexStarving != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }
        
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break
            }
            
            // 阻塞等待
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

Unlock 方法实现

func (m *Mutex) Unlock() {
    // 快速路径:直接释放锁
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    
    if new&mutexStarving == 0 {
        // 正常模式:唤醒一个等待者
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式:直接交给下一个等待者
        runtime_Semrelease(&m.sema, true, 1)
    }
}

RWMutex 源码分析

RWMutex 结构体

文件位置:src/sync/rwmutex.go

type RWMutex struct {
    w           Mutex  // 写锁
    writerSem   uint32 // 写者信号量
    readerSem   uint32 // 读者信号量
    readerCount int32  // 读者数量
    readerWait  int32  // 等待的读者数量
}

读锁实现

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 有写者等待,阻塞读者
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // 读者数量为负,说明有写者等待
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")
    }
    
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 最后一个读者,唤醒写者
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

写锁实现

func (rw *RWMutex) Lock() {
    // 获取写锁
    rw.w.Lock()
    
    // 等待所有读者完成
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

func (rw *RWMutex) Unlock() {
    // 恢复读者计数
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    
    // 唤醒所有等待的读者
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    
    // 释放写锁
    rw.w.Unlock()
}

Atomic 操作

原子操作类型

文件位置:src/runtime/internal/atomic

// 原子加法
func Xadd(ptr *uint32, delta int32) int32

// 原子比较并交换
func Cas(ptr *uint32, old, new uint32) bool

// 原子加载
func Load(ptr *uint32) uint32

// 原子存储
func Store(ptr *uint32, val uint32)

// 原子交换
func Xchg(ptr *uint32, new uint32) uint32

原子操作实现

// 汇编实现示例
TEXT ·Cas(SB),NOSPLIT,$0
    MOVQ ptr+0(FP), BX
    MOVL old+8(FP), AX
    MOVL new+12(FP), CX
    LOCK
    CMPXCHGL CX, 0(BX)
    SETEQ ret+16(FP)
    RET

原子操作使用

package main

import (
    "fmt"
    "runtime"
    "sync/atomic"
    "time"
)

func main() {
    var counter int64
    var wg sync.WaitGroup
    
    // 启动多个 goroutine 并发计数
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                atomic.AddInt64(&counter, 1)
            }
        }()
    }
    
    wg.Wait()
    fmt.Printf("Counter: %d\n", counter)
}

信号量机制

信号量结构

文件位置:src/runtime/sema.go

type semaRoot struct {
    lock  mutex
    treap *sudog
    nwait uint32
}

var semtable [251]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

信号量操作

func runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    root := semroot(addr)
    s := acquireSudog()
    s.g = getg()
    s.isSelect = false
    s.next = nil
    s.prev = nil
    s.elem = unsafe.Pointer(addr)
    s.waitlink = nil
    s.c = nil
    
    root.lock()
    root.nwait++
    root.treap = s
    root.unlock()
    
    goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
}

func runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    root.lock()
    root.nwait--
    s := root.treap
    if s != nil {
        root.treap = s.next
        root.unlock()
        s.next = nil
        s.prev = nil
        s.elem = nil
        s.c = nil
        goready(s.g, 3)
    } else {
        root.unlock()
    }
}

️ 实战代码

1. 自实现轻量级锁

package main

import (
    "fmt"
    "runtime"
    "sync/atomic"
    "time"
)

// 自旋锁实现
type SpinLock struct {
    state int32
}

func (sl *SpinLock) Lock() {
    for !atomic.CompareAndSwapInt32(&sl.state, 0, 1) {
        // 自旋等待
        runtime.Gosched()
    }
}

func (sl *SpinLock) Unlock() {
    atomic.StoreInt32(&sl.state, 0)
}

// 读写锁实现
type SimpleRWMutex struct {
    readCount  int32
    writeCount int32
    readLock   SpinLock
    writeLock  SpinLock
}

func (rw *SimpleRWMutex) RLock() {
    rw.readLock.Lock()
    defer rw.readLock.Unlock()
    
    atomic.AddInt32(&rw.readCount, 1)
}

func (rw *SimpleRWMutex) RUnlock() {
    rw.readLock.Lock()
    defer rw.readLock.Unlock()
    
    atomic.AddInt32(&rw.readCount, -1)
}

func (rw *SimpleRWMutex) Lock() {
    rw.writeLock.Lock()
    defer rw.writeLock.Unlock()
    
    atomic.AddInt32(&rw.writeCount, 1)
    
    // 等待所有读者完成
    for atomic.LoadInt32(&rw.readCount) > 0 {
        runtime.Gosched()
    }
}

func (rw *SimpleRWMutex) Unlock() {
    atomic.AddInt32(&rw.writeCount, -1)
    rw.writeLock.Unlock()
}

func main() {
    // 测试自旋锁
    var sl SpinLock
    var counter int32
    
    for i := 0; i < 1000; i++ {
        go func() {
            sl.Lock()
            counter++
            sl.Unlock()
        }()
    }
    
    time.Sleep(1 * time.Second)
    fmt.Printf("Counter: %d\n", counter)
    
    // 测试读写锁
    var rw SimpleRWMutex
    var data int32
    
    // 启动读者
    for i := 0; i < 100; i++ {
        go func() {
            rw.RLock()
            _ = data
            rw.RUnlock()
        }()
    }
    
    // 启动写者
    for i := 0; i < 10; i++ {
        go func() {
            rw.Lock()
            data++
            rw.Unlock()
        }()
    }
    
    time.Sleep(1 * time.Second)
    fmt.Printf("Data: %d\n", data)
}

2. 并发安全计数器

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// 使用 atomic 的计数器
type AtomicCounter struct {
    value int64
}

func (c *AtomicCounter) Add(delta int64) int64 {
    return atomic.AddInt64(&c.value, delta)
}

func (c *AtomicCounter) Load() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *AtomicCounter) Store(value int64) {
    atomic.StoreInt64(&c.value, value)
}

func (c *AtomicCounter) CompareAndSwap(old, new int64) bool {
    return atomic.CompareAndSwapInt64(&c.value, old, new)
}

// 使用 Mutex 的计数器
type MutexCounter struct {
    value int64
    mutex sync.Mutex
}

func (c *MutexCounter) Add(delta int64) int64 {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    c.value += delta
    return c.value
}

func (c *MutexCounter) Load() int64 {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    return c.value
}

// 性能测试
func benchmarkCounter(counter interface{}, count int) time.Duration {
    start := time.Now()
    
    var wg sync.WaitGroup
    for i := 0; i < count; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            switch c := counter.(type) {
            case *AtomicCounter:
                c.Add(1)
            case *MutexCounter:
                c.Add(1)
            }
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func main() {
    count := 100000
    
    // 测试 atomic 计数器
    atomicCounter := &AtomicCounter{}
    duration1 := benchmarkCounter(atomicCounter, count)
    fmt.Printf("Atomic counter: %v, value: %d\n", duration1, atomicCounter.Load())
    
    // 测试 Mutex 计数器
    mutexCounter := &MutexCounter{}
    duration2 := benchmarkCounter(mutexCounter, count)
    fmt.Printf("Mutex counter: %v, value: %d\n", duration2, mutexCounter.Load())
    
    // 性能对比
    fmt.Printf("Atomic is %.2fx faster\n", float64(duration2)/float64(duration1))
}

3. 并发安全 Map

package main

import (
    "fmt"
    "sync"
    "time"
)

// 使用 RWMutex 的并发安全 Map
type SafeMap struct {
    data map[string]interface{}
    mutex sync.RWMutex
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        data: make(map[string]interface{}),
    }
}

func (m *SafeMap) Set(key string, value interface{}) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.data[key] = value
}

func (m *SafeMap) Get(key string) (interface{}, bool) {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    value, ok := m.data[key]
    return value, ok
}

func (m *SafeMap) Delete(key string) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    delete(m.data, key)
}

func (m *SafeMap) Len() int {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    return len(m.data)
}

// 使用 atomic 的并发安全 Map
type AtomicMap struct {
    data map[string]interface{}
    mutex sync.Mutex
}

func NewAtomicMap() *AtomicMap {
    return &AtomicMap{
        data: make(map[string]interface{}),
    }
}

func (m *AtomicMap) Set(key string, value interface{}) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.data[key] = value
}

func (m *AtomicMap) Get(key string) (interface{}, bool) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    value, ok := m.data[key]
    return value, ok
}

func main() {
    // 测试 SafeMap
    safeMap := NewSafeMap()
    
    // 启动写者
    for i := 0; i < 100; i++ {
        go func(id int) {
            safeMap.Set(fmt.Sprintf("key%d", id), id)
        }(i)
    }
    
    // 启动读者
    for i := 0; i < 100; i++ {
        go func(id int) {
            value, ok := safeMap.Get(fmt.Sprintf("key%d", id))
            if ok {
                fmt.Printf("Read key%d: %v\n", id, value)
            }
        }(i)
    }
    
    time.Sleep(1 * time.Second)
    fmt.Printf("SafeMap length: %d\n", safeMap.Len())
}

性能分析

锁性能对比

package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

func benchmarkMutex(count int) time.Duration {
    var mutex sync.Mutex
    var counter int64
    
    start := time.Now()
    var wg sync.WaitGroup
    
    for i := 0; i < count; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mutex.Lock()
            counter++
            mutex.Unlock()
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func benchmarkAtomic(count int) time.Duration {
    var counter int64
    
    start := time.Now()
    var wg sync.WaitGroup
    
    for i := 0; i < count; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func benchmarkChannel(count int) time.Duration {
    ch := make(chan int, 1)
    ch <- 0
    
    start := time.Now()
    var wg sync.WaitGroup
    
    for i := 0; i < count; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter := <-ch
            counter++
            ch <- counter
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func main() {
    count := 100000
    
    // 测试 Mutex
    duration1 := benchmarkMutex(count)
    fmt.Printf("Mutex: %v\n", duration1)
    
    // 测试 Atomic
    duration2 := benchmarkAtomic(count)
    fmt.Printf("Atomic: %v\n", duration2)
    
    // 测试 Channel
    duration3 := benchmarkChannel(count)
    fmt.Printf("Channel: %v\n", duration3)
    
    // 性能对比
    fmt.Printf("Atomic is %.2fx faster than Mutex\n", float64(duration1)/float64(duration2))
    fmt.Printf("Atomic is %.2fx faster than Channel\n", float64(duration3)/float64(duration2))
}

面试题库

基础问题

  1. Mutex 的状态位设计?

    • 0位:锁是否被持有
    • 1位:是否有唤醒的 goroutine
    • 2位:是否处于饥饿模式
    • 3-31位:等待者数量
  2. RWMutex 的读写分离原理?

    • 读者计数:readerCount
    • 写者等待:writerSem
    • 读者等待:readerSem
    • 写者优先策略
  3. Atomic 操作的优势?

    • 硬件级原子性
    • 无锁操作
    • 性能高
    • 适合简单计数

进阶问题

  1. Mutex 的自旋机制?

    • 在用户态空转等待
    • 减少系统调用开销
    • 适合短时间持有锁的场景
  2. 饥饿模式的作用?

    • 防止长时间等待的 goroutine 饿死
    • 保证公平性
    • 提高整体性能
  3. 如何选择同步原语?

    • 简单计数:atomic
    • 复杂逻辑:mutex
    • 读多写少:rwmutex
    • 通信:channel

源码问题

  1. Mutex 的 lockSlow 流程?

    • 检查锁状态
    • 自旋等待
    • 进入等待队列
    • 处理饥饿模式
  2. 信号量的实现原理?

    • 基于 sudog 结构
    • 使用 treap 树管理
    • 通过 gopark 阻塞
    • 通过 goready 唤醒

扩展阅读

  • Go 并发源码分析
  • Go 内存模型
  • Go 并发模式
  • Go 性能优化

相关章节

  • 01-GMP调度模型深度解析 - 锁与调度的关系
  • 02-Channel源码剖析 - Channel 与锁的协作
  • 07-Runtime全景融合 - 整体架构协作机制

下一章预告:我们将深入 Go 的网络模型和 Netpoll 机制,了解异步 I/O 和 epoll 的实现原理。

Prev
04-垃圾回收器全链路
Next
06-网络模型与Netpoll