HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

【编程难度第二名】一致性协议(Raft / Paxos) - 行业里最少人能写出来的代码

本系列文章

➤ [NO.1 调度器]

➤ NO.2 一致性协议(Paxos / Raft)

➤ [NO.3 高性能异步系统](消息队列、回调、重试)

➤ [NO.4 交易系统](钱的事不能错)

➤ [NO.5 普通业务系统](绝大多数人做的)

一、为什么一致性协议如此困难?

1.1 核心挑战

行业里最少人能真正写出来的代码。

代表系统:

  • etcd (Raft)
  • Consul (Raft)
  • Zookeeper (Zab,类 Paxos)
  • TiKV (Multi-Raft)
  • Google Spanner (Paxos)

为什么难:

  • 需要理解分布式一致性理论
  • 需要处理网络分区
  • 需要处理脑裂问题
  • 需要保证强一致性
  • Bug 很难复现和调试

但为什么比调度器简单一点:

  • 问题域相对单一(就是保证一致性)
  • 状态相对简单(主要是选举和日志复制)
  • 有明确的算法可以参考

二、核心难点深度解析

2.1 难点一:Leader 选举(Leader Election)

在分布式系统中,需要一个 Leader 来协调所有操作。但网络分区可能导致灾难:

经典脑裂场景:

场景:5 个节点的集群
时刻 T1: 网络分区,分成 [A,B] 和 [C,D,E] 两组
时刻 T2: [A,B] 选出 Leader A
时刻 T3: [C,D,E] 选出 Leader C
时刻 T4: 现在有两个 Leader! (脑裂)
时刻 T5: 客户端写入到 Leader A,另一个写入到 Leader C
时刻 T6: 网络恢复,数据不一致!

Raft 的解决方案

核心机制:

  1. Term(任期): 像"朝代",每次选举 Term 递增,旧 Term 的 Leader 自动失效
  2. 多数派原则: 必须获得超过半数节点的选票才能成为 Leader
  3. 选举超时随机化: 避免多个节点同时发起选举导致分票

关键代码逻辑(简化版):

type Node struct {
    id          string
    currentTerm int64
    votedFor    string
    state       NodeState  // Follower, Candidate, Leader
    votes       map[string]bool

    // 日志相关
    log          []LogEntry
    lastLogIndex int64
    lastLogTerm  int64

    // 通信
    peers       []Peer
    electionWon chan bool
}

type NodeState int

const (
    Follower NodeState = iota
    Candidate
    Leader
)

func (n *Node) StartElection() {
    // 1. 增加任期
    n.currentTerm++

    // 2. 转为候选人,给自己投票
    n.state = Candidate
    n.votedFor = n.id
    n.votes = map[string]bool{n.id: true}

    log.Info("starting election",
        "nodeID", n.id,
        "term", n.currentTerm)

    // 3. 向所有其他节点请求投票
    for _, peer := range n.peers {
        go n.requestVote(peer, n.currentTerm)
    }

    // 4. 设置选举超时(随机 150-300ms)
    timeout := randomTimeout(150, 300)
    timer := time.NewTimer(timeout)

    select {
    case <-timer.C:
        // 超时,重新选举
        log.Warn("election timeout, restarting",
            "nodeID", n.id,
            "term", n.currentTerm)
        n.StartElection()

    case <-n.electionWon:
        // 赢得选举,成为 Leader
        log.Info("election won",
            "nodeID", n.id,
            "term", n.currentTerm,
            "votes", len(n.votes))
        n.becomeLeader()
    }
}

func (n *Node) requestVote(peer Peer, term int64) {
    resp, err := peer.RequestVote(RequestVoteArgs{
        Term:         term,
        CandidateId:  n.id,
        LastLogIndex: n.lastLogIndex,
        LastLogTerm:  n.lastLogTerm,
    })

    if err != nil {
        log.Error("request vote failed",
            "peer", peer.id,
            "err", err)
        return
    }

    // 处理投票响应
    if resp.VoteGranted {
        n.mu.Lock()
        n.votes[peer.id] = true
        voteCount := len(n.votes)
        n.mu.Unlock()

        log.Info("received vote",
            "from", peer.id,
            "totalVotes", voteCount)

        // 检查是否获得多数票
        if voteCount > len(n.peers)/2 {
            select {
            case n.electionWon <- true:
            default:
            }
        }
    } else {
        log.Info("vote denied",
            "from", peer.id,
            "theirTerm", resp.Term)

        // 如果对方的 term 更高,更新自己的 term
        if resp.Term > n.currentTerm {
            n.currentTerm = resp.Term
            n.state = Follower
            n.votedFor = ""
        }
    }
}

func (n *Node) HandleRequestVote(args RequestVoteArgs) RequestVoteReply {
    n.mu.Lock()
    defer n.mu.Unlock()

    log.Info("received vote request",
        "from", args.CandidateId,
        "term", args.Term,
        "myTerm", n.currentTerm)

    // 1. 如果对方的 term 小于自己,拒绝
    if args.Term < n.currentTerm {
        return RequestVoteReply{
            Term:        n.currentTerm,
            VoteGranted: false,
        }
    }

    // 2. 如果对方的 term 大于自己,更新自己的 term
    if args.Term > n.currentTerm {
        n.currentTerm = args.Term
        n.state = Follower
        n.votedFor = ""
    }

    // 3. 检查是否已经投票
    if n.votedFor != "" && n.votedFor != args.CandidateId {
        log.Info("already voted for another candidate",
            "votedFor", n.votedFor)
        return RequestVoteReply{
            Term:        n.currentTerm,
            VoteGranted: false,
        }
    }

    // 4. 检查候选人的日志是否至少和自己一样新
    // (这是保证一致性的关键!)
    logOk := args.LastLogTerm > n.lastLogTerm ||
             (args.LastLogTerm == n.lastLogTerm &&
              args.LastLogIndex >= n.lastLogIndex)

    if !logOk {
        log.Info("candidate log not up-to-date",
            "candidateLastTerm", args.LastLogTerm,
            "myLastTerm", n.lastLogTerm)
        return RequestVoteReply{
            Term:        n.currentTerm,
            VoteGranted: false,
        }
    }

    // 5. 投票
    n.votedFor = args.CandidateId
    n.resetElectionTimeout()

    log.Info("granted vote",
        "to", args.CandidateId,
        "term", n.currentTerm)

    return RequestVoteReply{
        Term:        n.currentTerm,
        VoteGranted: true,
    }
}

为什么这样设计?

Term 机制:

  • 像"朝代纪年",每次选举 Term 递增
  • 旧 Term 的 Leader 自动失效
  • 收到更高 Term 的消息,立即更新并转为 Follower

多数派原则:

  • [A,B] 只有 2 票,达不到多数(需要 3 票)
  • 所以 A 不会成为真正的 Leader
  • [C,D,E] 有 3 票,C 可以成为 Leader
  • 关键: 任何两个多数派必有交集,保证不会有两个 Leader

日志新旧检查:

  • 只有拥有最新日志的节点才能成为 Leader
  • 保证新 Leader 不会丢失已提交的数据
  • 比较规则: Term 优先,Index 其次

2.2 难点二:日志复制(Log Replication)

这是一致性协议的核心。所有写操作必须通过 Leader,并复制到多数派节点。

复杂的复制流程

场景:写入一条日志
时刻 T1: Leader 收到写请求"SET x=1"
时刻 T2: Leader 写入本地日志 [index=5, term=3, cmd="SET x=1"]
时刻 T3: Leader 向所有 Follower 发送 AppendEntries
时刻 T4: Follower A 收到,写入成功
时刻 T5: Follower B 收到,写入成功
时刻 T6: Follower C 网络超时,未收到
时刻 T7: Leader 收到 2 个成功响应(加上自己共 3 个,多数派)
时刻 T8: Leader 提交这条日志,返回客户端成功
时刻 T9: Follower C 网络恢复,收到 AppendEntries
时刻 T10: Follower C 写入日志,追赶进度

关键代码实现

type LogEntry struct {
    Index   int64
    Term    int64
    Command interface{}
}

type Node struct {
    // ... 其他字段

    // 日志
    log          []LogEntry
    lastLogIndex int64
    commitIndex  int64
    lastApplied  int64

    // Leader 维护的状态
    nextIndex    map[string]int64  // 每个 Follower 的下一条日志索引
    matchIndex   map[string]int64  // 每个 Follower 已复制的最高日志索引
}

func (n *Node) AppendEntry(ctx context.Context, cmd interface{}) error {
    if n.state != Leader {
        return errors.New("not leader")
    }

    n.mu.Lock()

    // 1. 写入本地日志
    entry := LogEntry{
        Index:   n.lastLogIndex + 1,
        Term:    n.currentTerm,
        Command: cmd,
    }
    n.log = append(n.log, entry)
    n.lastLogIndex++

    log.Info("appended entry",
        "index", entry.Index,
        "term", entry.Term)

    n.mu.Unlock()

    // 2. 并发向所有 Follower 发送
    successCount := 1  // Leader 自己
    var mu sync.Mutex
    var wg sync.WaitGroup

    for _, peer := range n.peers {
        wg.Add(1)
        go func(p Peer) {
            defer wg.Done()

            n.mu.RLock()
            prevLogIndex := n.nextIndex[p.id] - 1
            prevLogTerm := n.log[prevLogIndex].Term
            n.mu.RUnlock()

            // 发送 AppendEntries RPC
            resp, err := p.AppendEntries(ctx, AppendEntriesArgs{
                Term:         n.currentTerm,
                LeaderId:     n.id,
                PrevLogIndex: prevLogIndex,
                PrevLogTerm:  prevLogTerm,
                Entries:      []LogEntry{entry},
                LeaderCommit: n.commitIndex,
            })

            if err != nil {
                log.Error("append entries failed",
                    "peer", p.id,
                    "err", err)
                return
            }

            if resp.Success {
                mu.Lock()
                successCount++
                n.matchIndex[p.id] = entry.Index
                n.nextIndex[p.id] = entry.Index + 1
                mu.Unlock()

                log.Info("append entries success",
                    "peer", p.id,
                    "index", entry.Index)
            } else {
                // 日志不匹配,需要回退
                n.mu.Lock()
                n.nextIndex[p.id]--
                n.mu.Unlock()

                log.Warn("append entries conflict",
                    "peer", p.id,
                    "prevIndex", prevLogIndex)
            }
        }(peer)
    }

    // 等待所有响应
    wg.Wait()

    // 3. 检查是否达到多数派
    if successCount > len(n.peers)/2 {
        // 提交日志
        n.mu.Lock()
        n.commitIndex = entry.Index
        n.mu.Unlock()

        log.Info("entry committed",
            "index", entry.Index,
            "replicas", successCount)

        // 应用到状态机
        n.applyLog(entry)

        return nil
    }

    log.Error("failed to reach quorum",
        "successCount", successCount,
        "required", len(n.peers)/2+1)

    return errors.New("failed to reach quorum")
}

func (n *Node) HandleAppendEntries(args AppendEntriesArgs) AppendEntriesReply {
    n.mu.Lock()
    defer n.mu.Unlock()

    log.Info("received append entries",
        "from", args.LeaderId,
        "term", args.Term,
        "prevIndex", args.PrevLogIndex,
        "entries", len(args.Entries))

    // 1. 检查 term
    if args.Term < n.currentTerm {
        log.Info("rejecting outdated append entries",
            "theirTerm", args.Term,
            "myTerm", n.currentTerm)
        return AppendEntriesReply{
            Term:    n.currentTerm,
            Success: false,
        }
    }

    // 2. 更新 term,转为 Follower
    if args.Term > n.currentTerm {
        n.currentTerm = args.Term
        n.state = Follower
        n.votedFor = ""
    }

    // 3. 重置选举超时(收到 Leader 心跳)
    n.resetElectionTimeout()

    // 4. 检查日志一致性
    if args.PrevLogIndex > 0 {
        if n.lastLogIndex < args.PrevLogIndex {
            // 日志太短
            log.Warn("log too short",
                "myLastIndex", n.lastLogIndex,
                "requiredIndex", args.PrevLogIndex)
            return AppendEntriesReply{
                Term:    n.currentTerm,
                Success: false,
            }
        }

        prevEntry := n.log[args.PrevLogIndex]
        if prevEntry.Term != args.PrevLogTerm {
            // 日志不匹配,删除冲突的日志
            log.Warn("log conflict detected",
                "index", args.PrevLogIndex,
                "myTerm", prevEntry.Term,
                "expectedTerm", args.PrevLogTerm)

            n.log = n.log[:args.PrevLogIndex]
            n.lastLogIndex = args.PrevLogIndex - 1

            return AppendEntriesReply{
                Term:    n.currentTerm,
                Success: false,
            }
        }
    }

    // 5. 追加新日志
    for _, entry := range args.Entries {
        n.log = append(n.log, entry)
        n.lastLogIndex++

        log.Info("appended entry",
            "index", entry.Index,
            "term", entry.Term)
    }

    // 6. 更新 commitIndex
    if args.LeaderCommit > n.commitIndex {
        oldCommitIndex := n.commitIndex
        n.commitIndex = min(args.LeaderCommit, n.lastLogIndex)

        log.Info("updated commit index",
            "old", oldCommitIndex,
            "new", n.commitIndex)

        // 应用已提交的日志到状态机
        for i := n.lastApplied + 1; i <= n.commitIndex; i++ {
            n.applyLog(n.log[i])
        }
        n.lastApplied = n.commitIndex
    }

    return AppendEntriesReply{
        Term:    n.currentTerm,
        Success: true,
    }
}

func (n *Node) applyLog(entry LogEntry) {
    log.Info("applying log to state machine",
        "index", entry.Index,
        "command", entry.Command)

    // 应用到状态机
    n.stateMachine.Apply(entry.Command)
}

关键设计点:

PrevLogIndex/PrevLogTerm:

  • 确保日志的连续性
  • Follower 必须拥有前一条日志,才能追加新日志
  • 如果不匹配,Leader 会不断回退直到找到一致的点

多数派提交:

  • 只有复制到多数派,才能提交
  • 提交后的日志永不丢失
  • 未提交的日志可能被覆盖

心跳机制:

  • Leader 定期发送空的 AppendEntries 作为心跳
  • 防止 Follower 选举超时
  • 同时推进 commitIndex

日志修复:

  • Follower 落后时,Leader 会不断重试
  • 通过 nextIndex 和 matchIndex 跟踪进度
  • 冲突时回退到一致的点

2.3 难点三:网络分区(Network Partition)

这是最难处理的场景,也是 Bug 最多的地方。

经典网络分区案例

初始状态:5 个节点 [A, B, C, D, E],Leader 是 A

时刻 T1: 网络分区,分成两组
  - 多数派: [A, B, C] (3 个节点)
  - 少数派: [D, E] (2 个节点)

时刻 T2: 客户端 1 向 A 写入 "SET x=1"
  - A 复制到 B, C,达到多数派
  - A 提交日志,返回客户端成功
  - D, E 收不到,日志落后

时刻 T3: [D, E] 选举超时
  - D 发起选举,term 变为 4
  - E 投票给 D
  - D 成为"假 Leader"(只有 2 票,不是多数派)

时刻 T4: 客户端 2 向 D 写入 "SET x=2"
  - D 尝试复制给 E,成功(2 个节点)
  - D 无法提交(达不到多数派 3)
  - 写入一直 Pending!

时刻 T5: 网络恢复
  - D 发现 A 的 term 可能更新
  - 或者 D 尝试复制到 A/B/C,发现自己不是真正的 Leader
  - D 转为 Follower
  - D 的日志"SET x=2"被 A 的日志覆盖(因为未提交)
  - 一致性恢复!

时刻 T6: 客户端 2 收到错误"not leader"
  - 重试,连接到真正的 Leader A
  - 重新写入"SET x=2"
  - 成功

Raft 如何保证安全性?

Leader 完整性(Leader Completeness):

只有拥有最新已提交日志的节点才能成为 Leader

证明:
1. 日志只有复制到多数派才能提交
2. Leader 选举需要多数派投票
3. 任何两个多数派必有交集
4. 因此新 Leader 必然拥有所有已提交的日志

多数派原则(Quorum):

任何操作(提交、选举)都需要多数派

效果:
- 不会有两个 Leader(两个多数派会冲突)
- 已提交的日志不会丢失(至少一个多数派成员会成为新 Leader)
- 网络分区时,少数派无法做任何决策

Term 递增(Term Monotonicity):

每次选举 Term 递增

效果:
- 旧 Term 的 Leader 自动失效
- 收到更高 Term 的消息,立即转为 Follower
- 防止"过时的 Leader"继续工作

2.4 难点四:为什么 Bug 难调试?

Bug 的特点

极低概率触发:

问题: Raft 实现中的一个 Bug 可能导致:
- 99.9% 的时间都正常
- 只在特定的网络分区场景下触发
- 复现条件极其苛刻

示例场景:
1. 5 个节点,网络分区
2. 同时有 3 个写请求
3. 其中一个节点网络抖动 200ms
4. 另一个节点 CPU 100%(GC)
5. 此时 Leader 切换
6. 某个日志复制到 2 个节点但未提交
7. 新 Leader 上任,这条日志被覆盖
8. 数据丢失!(如果实现有 Bug)

非确定性:

同样的代码:
- 运行 100 次,99 次正常
- 第 100 次数据不一致
- 无法稳定复现

时序依赖:

Bug 依赖精确的时序:
- 消息 A 必须在消息 B 之前到达
- 节点 C 必须在 T1 时刻挂掉
- 网络分区必须持续 X 秒

调试方法

1. Jepsen 混沌测试

# Jepsen 会模拟各种异常
lein run test --nodes n1,n2,n3,n4,n5 \
    --nemesis partition \
    --time-limit 300 \
    --rate 100

# 检查一致性
java -jar knossos.jar linearizability history.edn

2. 详细日志记录

type RaftLogger struct {
    file *os.File
}

func (l *RaftLogger) LogStateChange(
    nodeID string,
    oldState, newState NodeState,
    term int64,
) {
    entry := LogEntry{
        Timestamp: time.Now(),
        NodeID:    nodeID,
        Event:     "StateChange",
        Details: map[string]interface{}{
            "oldState": oldState,
            "newState": newState,
            "term":     term,
        },
    }

    // 写入日志文件
    json.NewEncoder(l.file).Encode(entry)
}

func (l *RaftLogger) LogRPC(
    from, to string,
    rpcType string,
    args, reply interface{},
) {
    entry := LogEntry{
        Timestamp: time.Now(),
        From:      from,
        To:        to,
        RPC:       rpcType,
        Args:      args,
        Reply:     reply,
    }

    json.NewEncoder(l.file).Encode(entry)
}

3. 时间线分析工具

# 分析日志,生成时间线
def analyze_raft_log(log_file):
    events = []

    for line in log_file:
        event = json.loads(line)
        events.append(event)

    # 按时间排序
    events.sort(key=lambda e: e['timestamp'])

    # 检查不一致
    check_linearizability(events)
    check_safety(events)

    # 生成可视化
    generate_timeline_html(events)

4. 模拟网络故障

type FaultyNetwork struct {
    // 模拟网络延迟
    DelayRange [2]time.Duration  // [min, max]

    // 模拟丢包率
    DropRate float64

    // 模拟网络分区
    Partitions [][]string
}

func (n *FaultyNetwork) Send(from, to string, msg Message) error {
    // 1. 检查是否被分区隔离
    if n.isPartitioned(from, to) {
        return errors.New("network partitioned")
    }

    // 2. 模拟丢包
    if rand.Float64() < n.DropRate {
        return errors.New("packet dropped")
    }

    // 3. 模拟延迟
    delay := n.randomDelay()
    time.Sleep(delay)

    // 4. 发送
    return n.realSend(from, to, msg)
}

三、工业级实现要点

3.1 日志压缩(Snapshot)

日志不能无限增长,需要定期做快照。

type Snapshot struct {
    LastIncludedIndex int64
    LastIncludedTerm  int64
    Data              []byte
}

func (n *Node) CreateSnapshot() error {
    n.mu.Lock()
    defer n.mu.Unlock()

    // 检查是否需要做快照
    if n.lastApplied - n.lastSnapshotIndex < snapshotThreshold {
        return nil
    }

    log.Info("creating snapshot",
        "lastApplied", n.lastApplied,
        "lastSnapshot", n.lastSnapshotIndex)

    // 1. 获取状态机快照
    snapshotData := n.stateMachine.Snapshot()

    // 2. 创建快照对象
    snapshot := Snapshot{
        LastIncludedIndex: n.lastApplied,
        LastIncludedTerm:  n.log[n.lastApplied].Term,
        Data:              snapshotData,
    }

    // 3. 保存快照到磁盘
    err := n.persister.SaveSnapshot(snapshot)
    if err != nil {
        return err
    }

    // 4. 删除旧日志
    oldLogLen := len(n.log)
    n.log = n.log[n.lastApplied-n.lastSnapshotIndex:]
    n.lastSnapshotIndex = n.lastApplied

    log.Info("snapshot created",
        "index", snapshot.LastIncludedIndex,
        "oldLogLen", oldLogLen,
        "newLogLen", len(n.log))

    return nil
}

func (n *Node) InstallSnapshot(snapshot Snapshot) error {
    n.mu.Lock()
    defer n.mu.Unlock()

    log.Info("installing snapshot",
        "lastIncludedIndex", snapshot.LastIncludedIndex)

    // 1. 检查快照是否更新
    if snapshot.LastIncludedIndex <= n.lastSnapshotIndex {
        return nil  // 旧快照,忽略
    }

    // 2. 应用快照到状态机
    err := n.stateMachine.Restore(snapshot.Data)
    if err != nil {
        return err
    }

    // 3. 更新日志
    n.lastSnapshotIndex = snapshot.LastIncludedIndex
    n.lastApplied = snapshot.LastIncludedIndex
    n.commitIndex = snapshot.LastIncludedIndex

    // 删除快照之前的日志
    if snapshot.LastIncludedIndex < n.lastLogIndex {
        n.log = n.log[snapshot.LastIncludedIndex-n.lastSnapshotIndex:]
    } else {
        n.log = []LogEntry{}
        n.lastLogIndex = snapshot.LastIncludedIndex
    }

    return nil
}

3.2 成员变更(Membership Change)

动态添加/删除节点是极其复杂的操作。

为什么难?

问题:直接从 3 节点变成 5 节点

旧配置: [A, B, C],多数派 = 2
新配置: [A, B, C, D, E],多数派 = 3

危险场景:
- [A, B] 认为自己是多数派(旧配置 2/3)
- [C, D, E] 认为自己是多数派(新配置 3/5)
- 出现两个 Leader!

解决方案:Joint Consensus(联合共识)

type Configuration struct {
    Old []string  // 旧配置
    New []string  // 新配置
}

func (n *Node) ChangeConfiguration(newMembers []string) error {
    // 阶段 1: 进入联合配置 C(old,new)
    jointConfig := Configuration{
        Old: n.config.Old,
        New: newMembers,
    }

    log.Info("entering joint consensus",
        "old", jointConfig.Old,
        "new", jointConfig.New)

    // 提交联合配置
    err := n.AppendEntry(context.Background(), jointConfig)
    if err != nil {
        return err
    }

    // 联合配置生效:
    // - 日志复制需要 old 的多数 AND new 的多数
    // - Leader 选举需要 old 的多数 AND new 的多数

    // 阶段 2: 提交新配置 C(new)
    time.Sleep(100 * time.Millisecond)  // 等待联合配置生效

    finalConfig := Configuration{
        Old: newMembers,
        New: nil,
    }

    log.Info("entering new configuration",
        "config", finalConfig.Old)

    err = n.AppendEntry(context.Background(), finalConfig)
    if err != nil {
        return err
    }

    // 新配置生效
    n.config = finalConfig
    n.peers = buildPeers(newMembers)

    return nil
}

func (n *Node) isQuorum(voters map[string]bool) bool {
    if n.config.New == nil {
        // 普通配置
        count := 0
        for _, member := range n.config.Old {
            if voters[member] {
                count++
            }
        }
        return count > len(n.config.Old)/2
    }

    // 联合配置:需要同时满足 old 和 new 的多数
    oldCount := 0
    for _, member := range n.config.Old {
        if voters[member] {
            oldCount++
        }
    }

    newCount := 0
    for _, member := range n.config.New {
        if voters[member] {
            newCount++
        }
    }

    return oldCount > len(n.config.Old)/2 &&
           newCount > len(n.config.New)/2
}

3.3 Read Index 优化

读操作也需要保证线性一致性。

问题:

场景:客户端读取 key="x"
1. 客户端连接到 Leader
2. Leader 直接返回本地状态机的值
3. 但是 Leader 可能已经不是真正的 Leader了!(网络分区)
4. 返回的是旧数据!

解决方案:Read Index

func (n *Node) Read(ctx context.Context, key string) (string, error) {
    if n.state != Leader {
        return "", errors.New("not leader")
    }

    // 1. 记录当前的 commitIndex(Read Index)
    readIndex := n.commitIndex

    log.Info("read request",
        "key", key,
        "readIndex", readIndex)

    // 2. 发送心跳到多数派(确认自己还是 Leader)
    if !n.checkQuorum(ctx) {
        return "", errors.New("lost leadership")
    }

    // 3. 等待 applyIndex >= readIndex
    for n.lastApplied < readIndex {
        time.Sleep(10 * time.Millisecond)
    }

    // 4. 读取状态机
    value := n.stateMachine.Get(key)

    log.Info("read completed",
        "key", key,
        "value", value)

    return value, nil
}

func (n *Node) checkQuorum(ctx context.Context) bool {
    responseCount := 1  // Leader 自己
    var mu sync.Mutex
    var wg sync.WaitGroup

    for _, peer := range n.peers {
        wg.Add(1)
        go func(p Peer) {
            defer wg.Done()

            // 发送心跳
            resp, err := p.AppendEntries(ctx, AppendEntriesArgs{
                Term:         n.currentTerm,
                LeaderId:     n.id,
                PrevLogIndex: n.lastLogIndex,
                PrevLogTerm:  n.log[n.lastLogIndex].Term,
                Entries:      nil,  // 空心跳
                LeaderCommit: n.commitIndex,
            })

            if err == nil && resp.Success {
                mu.Lock()
                responseCount++
                mu.Unlock()
            }
        }(peer)
    }

    wg.Wait()

    // 检查是否达到多数派
    return responseCount > len(n.peers)/2
}

Lease Read 优化:

// 更激进的优化:Leader 维护一个租约
type Node struct {
    // ... 其他字段
    leaseExpire time.Time
}

func (n *Node) renewLease() {
    // 发送心跳成功后,续约 lease
    n.leaseExpire = time.Now().Add(leaseTimeout)
}

func (n *Node) LeaseRead(key string) (string, error) {
    if n.state != Leader {
        return "", errors.New("not leader")
    }

    // 检查 lease 是否过期
    if time.Now().After(n.leaseExpire) {
        return "", errors.New("lease expired")
    }

    // 直接读取,无需等待
    return n.stateMachine.Get(key), nil
}

四、实战建议

4.1 不要从零实现 Raft!

强烈建议使用成熟的库:

  • etcd/raft: etcd 使用的 Raft 库,生产级
  • Hashicorp/raft: Consul 使用的 Raft 库,功能完善
  • tikv/raft-rs: Rust 实现,性能极高

示例:使用 etcd/raft

import "go.etcd.io/etcd/raft/v3"

type KVStore struct {
    node   raft.Node
    store  map[string]string
    mu     sync.RWMutex
}

func (kv *KVStore) Start() {
    c := &raft.Config{
        ID:              0x01,
        ElectionTick:    10,
        HeartbeatTick:   1,
        Storage:         raft.NewMemoryStorage(),
        MaxSizePerMsg:   4096,
        MaxInflightMsgs: 256,
    }

    kv.node = raft.StartNode(c, []raft.Peer{{ID: 0x01}})

    go kv.run()
}

func (kv *KVStore) run() {
    for {
        select {
        case <-kv.node.Tick():
            // 定时器触发

        case rd := <-kv.node.Ready():
            // 处理 Ready

            // 1. 保存状态
            if !raft.IsEmptySnap(rd.Snapshot) {
                kv.processSnapshot(rd.Snapshot)
            }
            kv.storage.Append(rd.Entries)

            // 2. 发送消息
            for _, msg := range rd.Messages {
                kv.send(msg)
            }

            // 3. 应用已提交的日志
            for _, entry := range rd.CommittedEntries {
                kv.apply(entry)
            }

            // 4. 通知 raft 已处理
            kv.node.Advance()
        }
    }
}

func (kv *KVStore) Set(key, value string) error {
    cmd := Command{
        Type:  "SET",
        Key:   key,
        Value: value,
    }

    data, _ := json.Marshal(cmd)

    return kv.node.Propose(context.Background(), data)
}

func (kv *KVStore) apply(entry raft.Entry) {
    var cmd Command
    json.Unmarshal(entry.Data, &cmd)

    kv.mu.Lock()
    defer kv.mu.Unlock()

    switch cmd.Type {
    case "SET":
        kv.store[cmd.Key] = cmd.Value
    case "DELETE":
        delete(kv.store, cmd.Key)
    }
}

4.2 但要理解核心概念

即使使用成熟的库,你也必须理解这些概念:

1. Term(任期)

  • 逻辑时钟
  • 检测过期的 Leader
  • 保证一致性的基础

2. Leader Election(选举)

  • 超时随机化
  • 多数派投票
  • 日志新旧检查

3. Log Replication(日志复制)

  • PrevLogIndex/PrevLogTerm
  • 多数派提交
  • 日志修复

4. Quorum(多数派)

  • 防止脑裂
  • 保证已提交日志不丢失
  • 任何操作都需要多数派

5. Network Partition(网络分区)

  • 最难的测试场景
  • 必须用 Jepsen 等工具测试
  • 理解 Safety 和 Liveness 的权衡

4.3 测试策略

1. 单元测试

func TestElection(t *testing.T) {
    // 创建 5 个节点
    nodes := createTestCluster(5)

    // 启动所有节点
    for _, node := range nodes {
        node.Start()
    }

    // 等待选举
    time.Sleep(1 * time.Second)

    // 检查只有一个 Leader
    leaders := countLeaders(nodes)
    assert.Equal(t, 1, leaders)
}

func TestLogReplication(t *testing.T) {
    nodes := createTestCluster(5)

    // 找到 Leader
    leader := findLeader(nodes)

    // 写入数据
    leader.Set("x", "1")

    // 等待复制
    time.Sleep(100 * time.Millisecond)

    // 检查所有节点
    for _, node := range nodes {
        value := node.Get("x")
        assert.Equal(t, "1", value)
    }
}

2. 混沌测试(Jepsen)

(defn raft-test
  []
  (assoc tests/noop-test
    :name      "raft"
    :db        (db "raft-node")
    :client    (client)
    :nemesis   (nemesis/partition-random-halves)
    :generator (gen/phases
                 (->> (gen/mix [r w])
                      (gen/stagger 1/10)
                      (gen/nemesis
                        (gen/seq (cycle [(gen/sleep 10)
                                         {:type :info, :f :start}
                                         (gen/sleep 10)
                                         {:type :info, :f :stop}])))
                      (gen/time-limit 300))
                 (gen/nemesis
                   (gen/once {:type :info, :f :stop}))
                 (gen/clients
                   (gen/once {:type :invoke, :f :read})))
    :checker   (checker/compose
                 {:perf   (checker/perf)
                  :linear (checker/linearizable)})))

3. 性能测试

func BenchmarkWrite(b *testing.B) {
    cluster := createTestCluster(5)
    leader := findLeader(cluster)

    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        leader.Set(fmt.Sprintf("key-%d", i), "value")
    }
}

func BenchmarkRead(b *testing.B) {
    cluster := createTestCluster(5)
    leader := findLeader(cluster)

    // 预写数据
    leader.Set("x", "1")

    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        leader.Get("x")
    }
}

五、总结

5.1 为什么一致性协议排第二?

难度仅次于调度器:

  1. 理论门槛高: 需要理解分布式一致性理论
  2. Bug 难调试: 低概率、非确定性、时序依赖
  3. 实现复杂: 选举、复制、网络分区、成员变更
  4. 测试困难: 需要混沌测试,模拟各种异常

但比调度器简单:

  1. 问题域单一: 就是保证一致性
  2. 状态相对简单: 主要是选举和日志复制
  3. 有明确算法: Raft 论文、TLA+ 形式化证明
  4. 有成熟的库: etcd/raft、Hashicorp/raft

5.2 关键要点

核心概念:

  • Term(任期)
  • Leader Election(选举)
  • Log Replication(日志复制)
  • Quorum(多数派)
  • Network Partition(网络分区)

安全性保证:

  • Leader 完整性
  • 状态机安全性
  • 日志匹配性

性能优化:

  • Snapshot(快照)
  • Pipeline(流水线)
  • Read Index
  • Lease Read

5.3 学习路径

  1. 理解基础: 阅读 Raft 论文
  2. 看代码: 阅读 etcd/raft 源码
  3. 写 Demo: 实现简单的 KV 存储
  4. 混沌测试: 用 Jepsen 测试
  5. 生产使用: 在真实项目中使用

一致性协议是分布式系统的基石,值得深入学习!

本系列文章

➤ [NO.1 调度器]

➤ NO.2 一致性协议(Paxos / Raft)

➤ [NO.3 高性能异步系统](消息队列、回调、重试)

➤ [NO.4 交易系统](钱的事不能错)

➤ [NO.5 普通业务系统](绝大多数人做的)