【编程难度第二名】一致性协议(Raft / Paxos) - 行业里最少人能写出来的代码
本系列文章
➤ [NO.1 调度器]
➤ NO.2 一致性协议(Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)
一、为什么一致性协议如此困难?
1.1 核心挑战
行业里最少人能真正写出来的代码。
代表系统:
- etcd (Raft)
- Consul (Raft)
- Zookeeper (Zab,类 Paxos)
- TiKV (Multi-Raft)
- Google Spanner (Paxos)
为什么难:
- 需要理解分布式一致性理论
- 需要处理网络分区
- 需要处理脑裂问题
- 需要保证强一致性
- Bug 很难复现和调试
但为什么比调度器简单一点:
- 问题域相对单一(就是保证一致性)
- 状态相对简单(主要是选举和日志复制)
- 有明确的算法可以参考
二、核心难点深度解析
2.1 难点一:Leader 选举(Leader Election)
在分布式系统中,需要一个 Leader 来协调所有操作。但网络分区可能导致灾难:
经典脑裂场景:
场景:5 个节点的集群
时刻 T1: 网络分区,分成 [A,B] 和 [C,D,E] 两组
时刻 T2: [A,B] 选出 Leader A
时刻 T3: [C,D,E] 选出 Leader C
时刻 T4: 现在有两个 Leader! (脑裂)
时刻 T5: 客户端写入到 Leader A,另一个写入到 Leader C
时刻 T6: 网络恢复,数据不一致!
Raft 的解决方案
核心机制:
- Term(任期): 像"朝代",每次选举 Term 递增,旧 Term 的 Leader 自动失效
- 多数派原则: 必须获得超过半数节点的选票才能成为 Leader
- 选举超时随机化: 避免多个节点同时发起选举导致分票
关键代码逻辑(简化版):
type Node struct {
id string
currentTerm int64
votedFor string
state NodeState // Follower, Candidate, Leader
votes map[string]bool
// 日志相关
log []LogEntry
lastLogIndex int64
lastLogTerm int64
// 通信
peers []Peer
electionWon chan bool
}
type NodeState int
const (
Follower NodeState = iota
Candidate
Leader
)
func (n *Node) StartElection() {
// 1. 增加任期
n.currentTerm++
// 2. 转为候选人,给自己投票
n.state = Candidate
n.votedFor = n.id
n.votes = map[string]bool{n.id: true}
log.Info("starting election",
"nodeID", n.id,
"term", n.currentTerm)
// 3. 向所有其他节点请求投票
for _, peer := range n.peers {
go n.requestVote(peer, n.currentTerm)
}
// 4. 设置选举超时(随机 150-300ms)
timeout := randomTimeout(150, 300)
timer := time.NewTimer(timeout)
select {
case <-timer.C:
// 超时,重新选举
log.Warn("election timeout, restarting",
"nodeID", n.id,
"term", n.currentTerm)
n.StartElection()
case <-n.electionWon:
// 赢得选举,成为 Leader
log.Info("election won",
"nodeID", n.id,
"term", n.currentTerm,
"votes", len(n.votes))
n.becomeLeader()
}
}
func (n *Node) requestVote(peer Peer, term int64) {
resp, err := peer.RequestVote(RequestVoteArgs{
Term: term,
CandidateId: n.id,
LastLogIndex: n.lastLogIndex,
LastLogTerm: n.lastLogTerm,
})
if err != nil {
log.Error("request vote failed",
"peer", peer.id,
"err", err)
return
}
// 处理投票响应
if resp.VoteGranted {
n.mu.Lock()
n.votes[peer.id] = true
voteCount := len(n.votes)
n.mu.Unlock()
log.Info("received vote",
"from", peer.id,
"totalVotes", voteCount)
// 检查是否获得多数票
if voteCount > len(n.peers)/2 {
select {
case n.electionWon <- true:
default:
}
}
} else {
log.Info("vote denied",
"from", peer.id,
"theirTerm", resp.Term)
// 如果对方的 term 更高,更新自己的 term
if resp.Term > n.currentTerm {
n.currentTerm = resp.Term
n.state = Follower
n.votedFor = ""
}
}
}
func (n *Node) HandleRequestVote(args RequestVoteArgs) RequestVoteReply {
n.mu.Lock()
defer n.mu.Unlock()
log.Info("received vote request",
"from", args.CandidateId,
"term", args.Term,
"myTerm", n.currentTerm)
// 1. 如果对方的 term 小于自己,拒绝
if args.Term < n.currentTerm {
return RequestVoteReply{
Term: n.currentTerm,
VoteGranted: false,
}
}
// 2. 如果对方的 term 大于自己,更新自己的 term
if args.Term > n.currentTerm {
n.currentTerm = args.Term
n.state = Follower
n.votedFor = ""
}
// 3. 检查是否已经投票
if n.votedFor != "" && n.votedFor != args.CandidateId {
log.Info("already voted for another candidate",
"votedFor", n.votedFor)
return RequestVoteReply{
Term: n.currentTerm,
VoteGranted: false,
}
}
// 4. 检查候选人的日志是否至少和自己一样新
// (这是保证一致性的关键!)
logOk := args.LastLogTerm > n.lastLogTerm ||
(args.LastLogTerm == n.lastLogTerm &&
args.LastLogIndex >= n.lastLogIndex)
if !logOk {
log.Info("candidate log not up-to-date",
"candidateLastTerm", args.LastLogTerm,
"myLastTerm", n.lastLogTerm)
return RequestVoteReply{
Term: n.currentTerm,
VoteGranted: false,
}
}
// 5. 投票
n.votedFor = args.CandidateId
n.resetElectionTimeout()
log.Info("granted vote",
"to", args.CandidateId,
"term", n.currentTerm)
return RequestVoteReply{
Term: n.currentTerm,
VoteGranted: true,
}
}
为什么这样设计?
Term 机制:
- 像"朝代纪年",每次选举 Term 递增
- 旧 Term 的 Leader 自动失效
- 收到更高 Term 的消息,立即更新并转为 Follower
多数派原则:
- [A,B] 只有 2 票,达不到多数(需要 3 票)
- 所以 A 不会成为真正的 Leader
- [C,D,E] 有 3 票,C 可以成为 Leader
- 关键: 任何两个多数派必有交集,保证不会有两个 Leader
日志新旧检查:
- 只有拥有最新日志的节点才能成为 Leader
- 保证新 Leader 不会丢失已提交的数据
- 比较规则: Term 优先,Index 其次
2.2 难点二:日志复制(Log Replication)
这是一致性协议的核心。所有写操作必须通过 Leader,并复制到多数派节点。
复杂的复制流程
场景:写入一条日志
时刻 T1: Leader 收到写请求"SET x=1"
时刻 T2: Leader 写入本地日志 [index=5, term=3, cmd="SET x=1"]
时刻 T3: Leader 向所有 Follower 发送 AppendEntries
时刻 T4: Follower A 收到,写入成功
时刻 T5: Follower B 收到,写入成功
时刻 T6: Follower C 网络超时,未收到
时刻 T7: Leader 收到 2 个成功响应(加上自己共 3 个,多数派)
时刻 T8: Leader 提交这条日志,返回客户端成功
时刻 T9: Follower C 网络恢复,收到 AppendEntries
时刻 T10: Follower C 写入日志,追赶进度
关键代码实现
type LogEntry struct {
Index int64
Term int64
Command interface{}
}
type Node struct {
// ... 其他字段
// 日志
log []LogEntry
lastLogIndex int64
commitIndex int64
lastApplied int64
// Leader 维护的状态
nextIndex map[string]int64 // 每个 Follower 的下一条日志索引
matchIndex map[string]int64 // 每个 Follower 已复制的最高日志索引
}
func (n *Node) AppendEntry(ctx context.Context, cmd interface{}) error {
if n.state != Leader {
return errors.New("not leader")
}
n.mu.Lock()
// 1. 写入本地日志
entry := LogEntry{
Index: n.lastLogIndex + 1,
Term: n.currentTerm,
Command: cmd,
}
n.log = append(n.log, entry)
n.lastLogIndex++
log.Info("appended entry",
"index", entry.Index,
"term", entry.Term)
n.mu.Unlock()
// 2. 并发向所有 Follower 发送
successCount := 1 // Leader 自己
var mu sync.Mutex
var wg sync.WaitGroup
for _, peer := range n.peers {
wg.Add(1)
go func(p Peer) {
defer wg.Done()
n.mu.RLock()
prevLogIndex := n.nextIndex[p.id] - 1
prevLogTerm := n.log[prevLogIndex].Term
n.mu.RUnlock()
// 发送 AppendEntries RPC
resp, err := p.AppendEntries(ctx, AppendEntriesArgs{
Term: n.currentTerm,
LeaderId: n.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: []LogEntry{entry},
LeaderCommit: n.commitIndex,
})
if err != nil {
log.Error("append entries failed",
"peer", p.id,
"err", err)
return
}
if resp.Success {
mu.Lock()
successCount++
n.matchIndex[p.id] = entry.Index
n.nextIndex[p.id] = entry.Index + 1
mu.Unlock()
log.Info("append entries success",
"peer", p.id,
"index", entry.Index)
} else {
// 日志不匹配,需要回退
n.mu.Lock()
n.nextIndex[p.id]--
n.mu.Unlock()
log.Warn("append entries conflict",
"peer", p.id,
"prevIndex", prevLogIndex)
}
}(peer)
}
// 等待所有响应
wg.Wait()
// 3. 检查是否达到多数派
if successCount > len(n.peers)/2 {
// 提交日志
n.mu.Lock()
n.commitIndex = entry.Index
n.mu.Unlock()
log.Info("entry committed",
"index", entry.Index,
"replicas", successCount)
// 应用到状态机
n.applyLog(entry)
return nil
}
log.Error("failed to reach quorum",
"successCount", successCount,
"required", len(n.peers)/2+1)
return errors.New("failed to reach quorum")
}
func (n *Node) HandleAppendEntries(args AppendEntriesArgs) AppendEntriesReply {
n.mu.Lock()
defer n.mu.Unlock()
log.Info("received append entries",
"from", args.LeaderId,
"term", args.Term,
"prevIndex", args.PrevLogIndex,
"entries", len(args.Entries))
// 1. 检查 term
if args.Term < n.currentTerm {
log.Info("rejecting outdated append entries",
"theirTerm", args.Term,
"myTerm", n.currentTerm)
return AppendEntriesReply{
Term: n.currentTerm,
Success: false,
}
}
// 2. 更新 term,转为 Follower
if args.Term > n.currentTerm {
n.currentTerm = args.Term
n.state = Follower
n.votedFor = ""
}
// 3. 重置选举超时(收到 Leader 心跳)
n.resetElectionTimeout()
// 4. 检查日志一致性
if args.PrevLogIndex > 0 {
if n.lastLogIndex < args.PrevLogIndex {
// 日志太短
log.Warn("log too short",
"myLastIndex", n.lastLogIndex,
"requiredIndex", args.PrevLogIndex)
return AppendEntriesReply{
Term: n.currentTerm,
Success: false,
}
}
prevEntry := n.log[args.PrevLogIndex]
if prevEntry.Term != args.PrevLogTerm {
// 日志不匹配,删除冲突的日志
log.Warn("log conflict detected",
"index", args.PrevLogIndex,
"myTerm", prevEntry.Term,
"expectedTerm", args.PrevLogTerm)
n.log = n.log[:args.PrevLogIndex]
n.lastLogIndex = args.PrevLogIndex - 1
return AppendEntriesReply{
Term: n.currentTerm,
Success: false,
}
}
}
// 5. 追加新日志
for _, entry := range args.Entries {
n.log = append(n.log, entry)
n.lastLogIndex++
log.Info("appended entry",
"index", entry.Index,
"term", entry.Term)
}
// 6. 更新 commitIndex
if args.LeaderCommit > n.commitIndex {
oldCommitIndex := n.commitIndex
n.commitIndex = min(args.LeaderCommit, n.lastLogIndex)
log.Info("updated commit index",
"old", oldCommitIndex,
"new", n.commitIndex)
// 应用已提交的日志到状态机
for i := n.lastApplied + 1; i <= n.commitIndex; i++ {
n.applyLog(n.log[i])
}
n.lastApplied = n.commitIndex
}
return AppendEntriesReply{
Term: n.currentTerm,
Success: true,
}
}
func (n *Node) applyLog(entry LogEntry) {
log.Info("applying log to state machine",
"index", entry.Index,
"command", entry.Command)
// 应用到状态机
n.stateMachine.Apply(entry.Command)
}
关键设计点:
PrevLogIndex/PrevLogTerm:
- 确保日志的连续性
- Follower 必须拥有前一条日志,才能追加新日志
- 如果不匹配,Leader 会不断回退直到找到一致的点
多数派提交:
- 只有复制到多数派,才能提交
- 提交后的日志永不丢失
- 未提交的日志可能被覆盖
心跳机制:
- Leader 定期发送空的 AppendEntries 作为心跳
- 防止 Follower 选举超时
- 同时推进 commitIndex
日志修复:
- Follower 落后时,Leader 会不断重试
- 通过 nextIndex 和 matchIndex 跟踪进度
- 冲突时回退到一致的点
2.3 难点三:网络分区(Network Partition)
这是最难处理的场景,也是 Bug 最多的地方。
经典网络分区案例
初始状态:5 个节点 [A, B, C, D, E],Leader 是 A
时刻 T1: 网络分区,分成两组
- 多数派: [A, B, C] (3 个节点)
- 少数派: [D, E] (2 个节点)
时刻 T2: 客户端 1 向 A 写入 "SET x=1"
- A 复制到 B, C,达到多数派
- A 提交日志,返回客户端成功
- D, E 收不到,日志落后
时刻 T3: [D, E] 选举超时
- D 发起选举,term 变为 4
- E 投票给 D
- D 成为"假 Leader"(只有 2 票,不是多数派)
时刻 T4: 客户端 2 向 D 写入 "SET x=2"
- D 尝试复制给 E,成功(2 个节点)
- D 无法提交(达不到多数派 3)
- 写入一直 Pending!
时刻 T5: 网络恢复
- D 发现 A 的 term 可能更新
- 或者 D 尝试复制到 A/B/C,发现自己不是真正的 Leader
- D 转为 Follower
- D 的日志"SET x=2"被 A 的日志覆盖(因为未提交)
- 一致性恢复!
时刻 T6: 客户端 2 收到错误"not leader"
- 重试,连接到真正的 Leader A
- 重新写入"SET x=2"
- 成功
Raft 如何保证安全性?
Leader 完整性(Leader Completeness):
只有拥有最新已提交日志的节点才能成为 Leader
证明:
1. 日志只有复制到多数派才能提交
2. Leader 选举需要多数派投票
3. 任何两个多数派必有交集
4. 因此新 Leader 必然拥有所有已提交的日志
多数派原则(Quorum):
任何操作(提交、选举)都需要多数派
效果:
- 不会有两个 Leader(两个多数派会冲突)
- 已提交的日志不会丢失(至少一个多数派成员会成为新 Leader)
- 网络分区时,少数派无法做任何决策
Term 递增(Term Monotonicity):
每次选举 Term 递增
效果:
- 旧 Term 的 Leader 自动失效
- 收到更高 Term 的消息,立即转为 Follower
- 防止"过时的 Leader"继续工作
2.4 难点四:为什么 Bug 难调试?
Bug 的特点
极低概率触发:
问题: Raft 实现中的一个 Bug 可能导致:
- 99.9% 的时间都正常
- 只在特定的网络分区场景下触发
- 复现条件极其苛刻
示例场景:
1. 5 个节点,网络分区
2. 同时有 3 个写请求
3. 其中一个节点网络抖动 200ms
4. 另一个节点 CPU 100%(GC)
5. 此时 Leader 切换
6. 某个日志复制到 2 个节点但未提交
7. 新 Leader 上任,这条日志被覆盖
8. 数据丢失!(如果实现有 Bug)
非确定性:
同样的代码:
- 运行 100 次,99 次正常
- 第 100 次数据不一致
- 无法稳定复现
时序依赖:
Bug 依赖精确的时序:
- 消息 A 必须在消息 B 之前到达
- 节点 C 必须在 T1 时刻挂掉
- 网络分区必须持续 X 秒
调试方法
1. Jepsen 混沌测试
# Jepsen 会模拟各种异常
lein run test --nodes n1,n2,n3,n4,n5 \
--nemesis partition \
--time-limit 300 \
--rate 100
# 检查一致性
java -jar knossos.jar linearizability history.edn
2. 详细日志记录
type RaftLogger struct {
file *os.File
}
func (l *RaftLogger) LogStateChange(
nodeID string,
oldState, newState NodeState,
term int64,
) {
entry := LogEntry{
Timestamp: time.Now(),
NodeID: nodeID,
Event: "StateChange",
Details: map[string]interface{}{
"oldState": oldState,
"newState": newState,
"term": term,
},
}
// 写入日志文件
json.NewEncoder(l.file).Encode(entry)
}
func (l *RaftLogger) LogRPC(
from, to string,
rpcType string,
args, reply interface{},
) {
entry := LogEntry{
Timestamp: time.Now(),
From: from,
To: to,
RPC: rpcType,
Args: args,
Reply: reply,
}
json.NewEncoder(l.file).Encode(entry)
}
3. 时间线分析工具
# 分析日志,生成时间线
def analyze_raft_log(log_file):
events = []
for line in log_file:
event = json.loads(line)
events.append(event)
# 按时间排序
events.sort(key=lambda e: e['timestamp'])
# 检查不一致
check_linearizability(events)
check_safety(events)
# 生成可视化
generate_timeline_html(events)
4. 模拟网络故障
type FaultyNetwork struct {
// 模拟网络延迟
DelayRange [2]time.Duration // [min, max]
// 模拟丢包率
DropRate float64
// 模拟网络分区
Partitions [][]string
}
func (n *FaultyNetwork) Send(from, to string, msg Message) error {
// 1. 检查是否被分区隔离
if n.isPartitioned(from, to) {
return errors.New("network partitioned")
}
// 2. 模拟丢包
if rand.Float64() < n.DropRate {
return errors.New("packet dropped")
}
// 3. 模拟延迟
delay := n.randomDelay()
time.Sleep(delay)
// 4. 发送
return n.realSend(from, to, msg)
}
三、工业级实现要点
3.1 日志压缩(Snapshot)
日志不能无限增长,需要定期做快照。
type Snapshot struct {
LastIncludedIndex int64
LastIncludedTerm int64
Data []byte
}
func (n *Node) CreateSnapshot() error {
n.mu.Lock()
defer n.mu.Unlock()
// 检查是否需要做快照
if n.lastApplied - n.lastSnapshotIndex < snapshotThreshold {
return nil
}
log.Info("creating snapshot",
"lastApplied", n.lastApplied,
"lastSnapshot", n.lastSnapshotIndex)
// 1. 获取状态机快照
snapshotData := n.stateMachine.Snapshot()
// 2. 创建快照对象
snapshot := Snapshot{
LastIncludedIndex: n.lastApplied,
LastIncludedTerm: n.log[n.lastApplied].Term,
Data: snapshotData,
}
// 3. 保存快照到磁盘
err := n.persister.SaveSnapshot(snapshot)
if err != nil {
return err
}
// 4. 删除旧日志
oldLogLen := len(n.log)
n.log = n.log[n.lastApplied-n.lastSnapshotIndex:]
n.lastSnapshotIndex = n.lastApplied
log.Info("snapshot created",
"index", snapshot.LastIncludedIndex,
"oldLogLen", oldLogLen,
"newLogLen", len(n.log))
return nil
}
func (n *Node) InstallSnapshot(snapshot Snapshot) error {
n.mu.Lock()
defer n.mu.Unlock()
log.Info("installing snapshot",
"lastIncludedIndex", snapshot.LastIncludedIndex)
// 1. 检查快照是否更新
if snapshot.LastIncludedIndex <= n.lastSnapshotIndex {
return nil // 旧快照,忽略
}
// 2. 应用快照到状态机
err := n.stateMachine.Restore(snapshot.Data)
if err != nil {
return err
}
// 3. 更新日志
n.lastSnapshotIndex = snapshot.LastIncludedIndex
n.lastApplied = snapshot.LastIncludedIndex
n.commitIndex = snapshot.LastIncludedIndex
// 删除快照之前的日志
if snapshot.LastIncludedIndex < n.lastLogIndex {
n.log = n.log[snapshot.LastIncludedIndex-n.lastSnapshotIndex:]
} else {
n.log = []LogEntry{}
n.lastLogIndex = snapshot.LastIncludedIndex
}
return nil
}
3.2 成员变更(Membership Change)
动态添加/删除节点是极其复杂的操作。
为什么难?
问题:直接从 3 节点变成 5 节点
旧配置: [A, B, C],多数派 = 2
新配置: [A, B, C, D, E],多数派 = 3
危险场景:
- [A, B] 认为自己是多数派(旧配置 2/3)
- [C, D, E] 认为自己是多数派(新配置 3/5)
- 出现两个 Leader!
解决方案:Joint Consensus(联合共识)
type Configuration struct {
Old []string // 旧配置
New []string // 新配置
}
func (n *Node) ChangeConfiguration(newMembers []string) error {
// 阶段 1: 进入联合配置 C(old,new)
jointConfig := Configuration{
Old: n.config.Old,
New: newMembers,
}
log.Info("entering joint consensus",
"old", jointConfig.Old,
"new", jointConfig.New)
// 提交联合配置
err := n.AppendEntry(context.Background(), jointConfig)
if err != nil {
return err
}
// 联合配置生效:
// - 日志复制需要 old 的多数 AND new 的多数
// - Leader 选举需要 old 的多数 AND new 的多数
// 阶段 2: 提交新配置 C(new)
time.Sleep(100 * time.Millisecond) // 等待联合配置生效
finalConfig := Configuration{
Old: newMembers,
New: nil,
}
log.Info("entering new configuration",
"config", finalConfig.Old)
err = n.AppendEntry(context.Background(), finalConfig)
if err != nil {
return err
}
// 新配置生效
n.config = finalConfig
n.peers = buildPeers(newMembers)
return nil
}
func (n *Node) isQuorum(voters map[string]bool) bool {
if n.config.New == nil {
// 普通配置
count := 0
for _, member := range n.config.Old {
if voters[member] {
count++
}
}
return count > len(n.config.Old)/2
}
// 联合配置:需要同时满足 old 和 new 的多数
oldCount := 0
for _, member := range n.config.Old {
if voters[member] {
oldCount++
}
}
newCount := 0
for _, member := range n.config.New {
if voters[member] {
newCount++
}
}
return oldCount > len(n.config.Old)/2 &&
newCount > len(n.config.New)/2
}
3.3 Read Index 优化
读操作也需要保证线性一致性。
问题:
场景:客户端读取 key="x"
1. 客户端连接到 Leader
2. Leader 直接返回本地状态机的值
3. 但是 Leader 可能已经不是真正的 Leader了!(网络分区)
4. 返回的是旧数据!
解决方案:Read Index
func (n *Node) Read(ctx context.Context, key string) (string, error) {
if n.state != Leader {
return "", errors.New("not leader")
}
// 1. 记录当前的 commitIndex(Read Index)
readIndex := n.commitIndex
log.Info("read request",
"key", key,
"readIndex", readIndex)
// 2. 发送心跳到多数派(确认自己还是 Leader)
if !n.checkQuorum(ctx) {
return "", errors.New("lost leadership")
}
// 3. 等待 applyIndex >= readIndex
for n.lastApplied < readIndex {
time.Sleep(10 * time.Millisecond)
}
// 4. 读取状态机
value := n.stateMachine.Get(key)
log.Info("read completed",
"key", key,
"value", value)
return value, nil
}
func (n *Node) checkQuorum(ctx context.Context) bool {
responseCount := 1 // Leader 自己
var mu sync.Mutex
var wg sync.WaitGroup
for _, peer := range n.peers {
wg.Add(1)
go func(p Peer) {
defer wg.Done()
// 发送心跳
resp, err := p.AppendEntries(ctx, AppendEntriesArgs{
Term: n.currentTerm,
LeaderId: n.id,
PrevLogIndex: n.lastLogIndex,
PrevLogTerm: n.log[n.lastLogIndex].Term,
Entries: nil, // 空心跳
LeaderCommit: n.commitIndex,
})
if err == nil && resp.Success {
mu.Lock()
responseCount++
mu.Unlock()
}
}(peer)
}
wg.Wait()
// 检查是否达到多数派
return responseCount > len(n.peers)/2
}
Lease Read 优化:
// 更激进的优化:Leader 维护一个租约
type Node struct {
// ... 其他字段
leaseExpire time.Time
}
func (n *Node) renewLease() {
// 发送心跳成功后,续约 lease
n.leaseExpire = time.Now().Add(leaseTimeout)
}
func (n *Node) LeaseRead(key string) (string, error) {
if n.state != Leader {
return "", errors.New("not leader")
}
// 检查 lease 是否过期
if time.Now().After(n.leaseExpire) {
return "", errors.New("lease expired")
}
// 直接读取,无需等待
return n.stateMachine.Get(key), nil
}
四、实战建议
4.1 不要从零实现 Raft!
强烈建议使用成熟的库:
- etcd/raft: etcd 使用的 Raft 库,生产级
- Hashicorp/raft: Consul 使用的 Raft 库,功能完善
- tikv/raft-rs: Rust 实现,性能极高
示例:使用 etcd/raft
import "go.etcd.io/etcd/raft/v3"
type KVStore struct {
node raft.Node
store map[string]string
mu sync.RWMutex
}
func (kv *KVStore) Start() {
c := &raft.Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: raft.NewMemoryStorage(),
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
kv.node = raft.StartNode(c, []raft.Peer{{ID: 0x01}})
go kv.run()
}
func (kv *KVStore) run() {
for {
select {
case <-kv.node.Tick():
// 定时器触发
case rd := <-kv.node.Ready():
// 处理 Ready
// 1. 保存状态
if !raft.IsEmptySnap(rd.Snapshot) {
kv.processSnapshot(rd.Snapshot)
}
kv.storage.Append(rd.Entries)
// 2. 发送消息
for _, msg := range rd.Messages {
kv.send(msg)
}
// 3. 应用已提交的日志
for _, entry := range rd.CommittedEntries {
kv.apply(entry)
}
// 4. 通知 raft 已处理
kv.node.Advance()
}
}
}
func (kv *KVStore) Set(key, value string) error {
cmd := Command{
Type: "SET",
Key: key,
Value: value,
}
data, _ := json.Marshal(cmd)
return kv.node.Propose(context.Background(), data)
}
func (kv *KVStore) apply(entry raft.Entry) {
var cmd Command
json.Unmarshal(entry.Data, &cmd)
kv.mu.Lock()
defer kv.mu.Unlock()
switch cmd.Type {
case "SET":
kv.store[cmd.Key] = cmd.Value
case "DELETE":
delete(kv.store, cmd.Key)
}
}
4.2 但要理解核心概念
即使使用成熟的库,你也必须理解这些概念:
1. Term(任期)
- 逻辑时钟
- 检测过期的 Leader
- 保证一致性的基础
2. Leader Election(选举)
- 超时随机化
- 多数派投票
- 日志新旧检查
3. Log Replication(日志复制)
- PrevLogIndex/PrevLogTerm
- 多数派提交
- 日志修复
4. Quorum(多数派)
- 防止脑裂
- 保证已提交日志不丢失
- 任何操作都需要多数派
5. Network Partition(网络分区)
- 最难的测试场景
- 必须用 Jepsen 等工具测试
- 理解 Safety 和 Liveness 的权衡
4.3 测试策略
1. 单元测试
func TestElection(t *testing.T) {
// 创建 5 个节点
nodes := createTestCluster(5)
// 启动所有节点
for _, node := range nodes {
node.Start()
}
// 等待选举
time.Sleep(1 * time.Second)
// 检查只有一个 Leader
leaders := countLeaders(nodes)
assert.Equal(t, 1, leaders)
}
func TestLogReplication(t *testing.T) {
nodes := createTestCluster(5)
// 找到 Leader
leader := findLeader(nodes)
// 写入数据
leader.Set("x", "1")
// 等待复制
time.Sleep(100 * time.Millisecond)
// 检查所有节点
for _, node := range nodes {
value := node.Get("x")
assert.Equal(t, "1", value)
}
}
2. 混沌测试(Jepsen)
(defn raft-test
[]
(assoc tests/noop-test
:name "raft"
:db (db "raft-node")
:client (client)
:nemesis (nemesis/partition-random-halves)
:generator (gen/phases
(->> (gen/mix [r w])
(gen/stagger 1/10)
(gen/nemesis
(gen/seq (cycle [(gen/sleep 10)
{:type :info, :f :start}
(gen/sleep 10)
{:type :info, :f :stop}])))
(gen/time-limit 300))
(gen/nemesis
(gen/once {:type :info, :f :stop}))
(gen/clients
(gen/once {:type :invoke, :f :read})))
:checker (checker/compose
{:perf (checker/perf)
:linear (checker/linearizable)})))
3. 性能测试
func BenchmarkWrite(b *testing.B) {
cluster := createTestCluster(5)
leader := findLeader(cluster)
b.ResetTimer()
for i := 0; i < b.N; i++ {
leader.Set(fmt.Sprintf("key-%d", i), "value")
}
}
func BenchmarkRead(b *testing.B) {
cluster := createTestCluster(5)
leader := findLeader(cluster)
// 预写数据
leader.Set("x", "1")
b.ResetTimer()
for i := 0; i < b.N; i++ {
leader.Get("x")
}
}
五、总结
5.1 为什么一致性协议排第二?
难度仅次于调度器:
- 理论门槛高: 需要理解分布式一致性理论
- Bug 难调试: 低概率、非确定性、时序依赖
- 实现复杂: 选举、复制、网络分区、成员变更
- 测试困难: 需要混沌测试,模拟各种异常
但比调度器简单:
- 问题域单一: 就是保证一致性
- 状态相对简单: 主要是选举和日志复制
- 有明确算法: Raft 论文、TLA+ 形式化证明
- 有成熟的库: etcd/raft、Hashicorp/raft
5.2 关键要点
核心概念:
- Term(任期)
- Leader Election(选举)
- Log Replication(日志复制)
- Quorum(多数派)
- Network Partition(网络分区)
安全性保证:
- Leader 完整性
- 状态机安全性
- 日志匹配性
性能优化:
- Snapshot(快照)
- Pipeline(流水线)
- Read Index
- Lease Read
5.3 学习路径
- 理解基础: 阅读 Raft 论文
- 看代码: 阅读 etcd/raft 源码
- 写 Demo: 实现简单的 KV 存储
- 混沌测试: 用 Jepsen 测试
- 生产使用: 在真实项目中使用
一致性协议是分布式系统的基石,值得深入学习!
本系列文章
➤ [NO.1 调度器]
➤ NO.2 一致性协议(Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)