HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Kafka

    • Kafka 学习手册 - 总览与导读
    • 01-核心概念与架构
    • 02-存储模块-日志与索引
    • 03-复制与ISR机制
    • 04-元数据管理与KRaft
    • 05-消费者组协调
    • 06-事务与Exactly-Once语义
    • 07-性能优化与调优
    • 08-高可用与容灾
    • 09-面试高频问题详解
    • 10-实战项目-Mini-Kafka实现

03-复制与ISR机制

📋 本章概览

本章深入探讨Kafka的复制机制和ISR(In-Sync Replicas)管理,这是Kafka实现高可用性和数据一致性的核心机制。我们将从Leader/Follower模型、ISR维护、高水位机制、副本选举等方面,全面解析Kafka如何保证数据的可靠性和一致性。

🎯 学习目标

  • 理解Kafka的Leader/Follower复制模型
  • 掌握ISR(In-Sync Replicas)的维护机制
  • 了解高水位(HW)和日志末端偏移量(LEO)的概念
  • 学习LeaderEpoch防回退机制
  • 掌握副本选举流程和故障恢复

🏗️ 复制架构概览

基本复制模型

Topic: user-events, Partition: 0, Replication Factor: 3

┌─────────────────────────────────────────────────────────────────┐
│                        Kafka 集群                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │   Broker 1  │    │   Broker 2  │    │   Broker 3  │         │
│  │             │    │             │    │             │         │
│  │ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │         │
│  │ │  LEADER │ │◄───┤ │FOLLOWER │ │    │ │FOLLOWER │ │         │
│  │ │         │ │    │ │         │ │    │ │         │ │         │
│  │ │ Offset: │ │    │ │ Offset: │ │    │ │ Offset: │ │         │
│  │ │ 0,1,2,3 │ │    │ │ 0,1,2,3 │ │    │ │ 0,1,2   │ │         │
│  │ │ HW: 3   │ │    │ │ HW: 3   │ │    │ │ HW: 2   │ │         │
│  │ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                 │
│  ISR: [Broker1, Broker2]  (Broker3落后,不在ISR中)             │
└─────────────────────────────────────────────────────────────────┘

核心概念

概念定义作用
Leader分区的主副本处理所有读写请求
Follower分区的从副本从Leader同步数据
ISR同步副本集合与Leader保持同步的副本列表
HW高水位已提交消息的最高偏移量
LEO日志末端偏移量下一条消息的偏移量
LeaderEpochLeader纪元防止数据回退的版本号

🔄 Leader/Follower复制流程

消息写入流程

// Leader处理消息写入
type PartitionLeader struct {
    topic        string
    partition    int
    replicas     []ReplicaInfo
    isr          []int32  // ISR中的Broker ID列表
    hw           int64    // 高水位
    leo          int64    // 日志末端偏移量
    leaderEpoch  int32    // Leader纪元
}

type ReplicaInfo struct {
    brokerID    int32
    isLeader    bool
    isInISR     bool
    lastCaughtUpTime int64
    logEndOffset     int64
}

// 处理生产请求
func (pl *PartitionLeader) HandleProduceRequest(records []Record) (*ProduceResponse, error) {
    // 1. 验证ISR大小
    if len(pl.isr) < pl.minInSyncReplicas {
        return nil, fmt.Errorf("ISR大小不足: %d < %d", len(pl.isr), pl.minInSyncReplicas)
    }
    
    // 2. 追加到本地日志
    baseOffset, err := pl.appendToLocalLog(records)
    if err != nil {
        return nil, err
    }
    
    // 3. 更新LEO
    pl.leo = baseOffset + int64(len(records))
    
    // 4. 并行复制到Followers
    replicationResults := pl.replicateToFollowers(records, baseOffset)
    
    // 5. 更新ISR和HW
    pl.updateISRAndHW(replicationResults)
    
    // 6. 返回响应
    return &ProduceResponse{
        BaseOffset: baseOffset,
        LastOffset: pl.leo - 1,
        HighWatermark: pl.hw,
    }, nil
}

// 复制到Followers
func (pl *PartitionLeader) replicateToFollowers(records []Record, baseOffset int64) map[int32]ReplicationResult {
    results := make(map[int32]ReplicationResult)
    var wg sync.WaitGroup
    
    for _, replica := range pl.replicas {
        if replica.isLeader {
            continue // 跳过Leader自己
        }
        
        wg.Add(1)
        go func(r ReplicaInfo) {
            defer wg.Done()
            
            // 发送Fetch请求到Follower
            result := pl.sendFetchRequest(r.brokerID, baseOffset, records)
            results[r.brokerID] = result
        }(replica)
    }
    
    wg.Wait()
    return results
}

// 发送Fetch请求
func (pl *PartitionLeader) sendFetchRequest(brokerID int32, baseOffset int64, records []Record) ReplicationResult {
    // 构建Fetch请求
    fetchRequest := &FetchRequest{
        Topic:     pl.topic,
        Partition: pl.partition,
        Offset:    baseOffset,
        MaxBytes:  1024 * 1024, // 1MB
        LeaderEpoch: pl.leaderEpoch,
    }
    
    // 发送到Follower
    client := pl.getBrokerClient(brokerID)
    response, err := client.Fetch(fetchRequest)
    
    if err != nil {
        return ReplicationResult{
            BrokerID: brokerID,
            Success:  false,
            Error:    err,
        }
    }
    
    return ReplicationResult{
        BrokerID: brokerID,
        Success:  true,
        LogEndOffset: response.LogEndOffset,
        HighWatermark: response.HighWatermark,
    }
}

Follower同步流程

// Follower处理Fetch请求
type PartitionFollower struct {
    topic        string
    partition    int
    leaderID     int32
    leaderEpoch  int32
    hw           int64
    leo          int64
    log          *PartitionLog
}

// 处理Fetch请求
func (pf *PartitionFollower) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error) {
    // 1. 验证LeaderEpoch
    if req.LeaderEpoch < pf.leaderEpoch {
        return nil, fmt.Errorf("过期的LeaderEpoch: %d < %d", req.LeaderEpoch, pf.leaderEpoch)
    }
    
    // 2. 更新LeaderEpoch
    if req.LeaderEpoch > pf.leaderEpoch {
        pf.leaderEpoch = req.LeaderEpoch
    }
    
    // 3. 从指定偏移量读取消息
    records, err := pf.log.Fetch(req.Offset, req.MaxBytes)
    if err != nil {
        return nil, err
    }
    
    // 4. 更新LEO
    if len(records) > 0 {
        pf.leo = records[len(records)-1].Offset + 1
    }
    
    // 5. 返回响应
    return &FetchResponse{
        Topic:     pf.topic,
        Partition: pf.partition,
        Records:   records,
        LogEndOffset: pf.leo,
        HighWatermark: pf.hw,
    }, nil
}

// Follower主动拉取
func (pf *PartitionFollower) startReplication() {
    ticker := time.NewTicker(100 * time.Millisecond) // 100ms拉取一次
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if err := pf.fetchFromLeader(); err != nil {
                log.Printf("从Leader拉取失败: %v", err)
            }
        }
    }
}

func (pf *PartitionFollower) fetchFromLeader() error {
    // 构建Fetch请求
    fetchRequest := &FetchRequest{
        Topic:     pf.topic,
        Partition: pf.partition,
        Offset:    pf.leo, // 从当前LEO开始拉取
        MaxBytes:  1024 * 1024,
        LeaderEpoch: pf.leaderEpoch,
    }
    
    // 发送到Leader
    leaderClient := pf.getLeaderClient()
    response, err := leaderClient.Fetch(fetchRequest)
    if err != nil {
        return err
    }
    
    // 处理响应
    if len(response.Records) > 0 {
        // 追加到本地日志
        if err := pf.log.Append(response.Records); err != nil {
            return err
        }
        
        // 更新LEO
        pf.leo = response.LogEndOffset
        
        // 更新HW
        pf.hw = response.HighWatermark
    }
    
    return nil
}

📊 ISR维护机制

ISR更新逻辑

// ISR管理器
type ISRManager struct {
    topic        string
    partition    int
    replicas     map[int32]*ReplicaInfo
    isr          []int32
    minInSyncReplicas int
    replicaMaxLagMs   int64
}

// 更新ISR
func (im *ISRManager) updateISR() {
    newISR := make([]int32, 0)
    currentTime := time.Now().UnixMilli()
    
    for brokerID, replica := range im.replicas {
        // 检查副本是否应该加入ISR
        if im.shouldBeInISR(replica, currentTime) {
            newISR = append(newISR, brokerID)
        }
    }
    
    // 排序ISR列表
    sort.Slice(newISR, func(i, j int) bool {
        return newISR[i] < newISR[j]
    })
    
    // 更新ISR
    if !im.isISREqual(im.isr, newISR) {
        oldISR := im.isr
        im.isr = newISR
        
        log.Printf("ISR更新: %v -> %v", oldISR, newISR)
        
        // 触发ISR变更事件
        im.onISRChange(oldISR, newISR)
    }
}

// 判断副本是否应该在ISR中
func (im *ISRManager) shouldBeInISR(replica *ReplicaInfo, currentTime int64) bool {
    // 1. 副本必须是活跃的
    if !replica.isAlive {
        return false
    }
    
    // 2. 检查副本是否落后太多
    if replica.lastCaughtUpTime > 0 {
        lag := currentTime - replica.lastCaughtUpTime
        if lag > im.replicaMaxLagMs {
            return false
        }
    }
    
    // 3. 检查副本的LEO是否足够接近Leader
    leader := im.getLeader()
    if leader != nil {
        leoLag := leader.logEndOffset - replica.logEndOffset
        if leoLag > im.maxLogLag {
            return false
        }
    }
    
    return true
}

// 处理副本状态更新
func (im *ISRManager) updateReplicaStatus(brokerID int32, logEndOffset int64, highWatermark int64) {
    replica, exists := im.replicas[brokerID]
    if !exists {
        return
    }
    
    // 更新副本状态
    replica.logEndOffset = logEndOffset
    replica.highWatermark = highWatermark
    replica.lastCaughtUpTime = time.Now().UnixMilli()
    
    // 检查是否需要更新ISR
    im.updateISR()
}

ISR收缩和扩展

// ISR收缩处理
func (im *ISRManager) handleISRShrink(oldISR, newISR []int32) {
    // 找出被移除的副本
    removedReplicas := im.findRemovedReplicas(oldISR, newISR)
    
    for _, brokerID := range removedReplicas {
        replica := im.replicas[brokerID]
        log.Printf("副本 %d 从ISR中移除,原因: 落后太多", brokerID)
        
        // 可以在这里添加监控和告警
        im.onReplicaRemovedFromISR(brokerID, replica)
    }
}

// ISR扩展处理
func (im *ISRManager) handleISRExpand(oldISR, newISR []int32) {
    // 找出新加入的副本
    addedReplicas := im.findAddedReplicas(oldISR, newISR)
    
    for _, brokerID := range addedReplicas {
        replica := im.replicas[brokerID]
        log.Printf("副本 %d 加入ISR", brokerID)
        
        // 可以在这里添加监控和告警
        im.onReplicaAddedToISR(brokerID, replica)
    }
}

// 检查ISR健康状态
func (im *ISRManager) checkISRHealth() error {
    // 检查ISR大小
    if len(im.isr) < im.minInSyncReplicas {
        return fmt.Errorf("ISR大小不足: %d < %d", len(im.isr), im.minInSyncReplicas)
    }
    
    // 检查Leader是否在ISR中
    leader := im.getLeader()
    if leader != nil && !im.isInISR(leader.brokerID) {
        return fmt.Errorf("Leader不在ISR中")
    }
    
    return nil
}

🌊 高水位(HW)机制

HW更新逻辑

// 高水位管理器
type HighWatermarkManager struct {
    topic        string
    partition    int
    hw           int64
    leo          int64
    isr          []int32
    replicas     map[int32]*ReplicaInfo
}

// 更新高水位
func (hwm *HighWatermarkManager) updateHighWatermark() {
    if len(hwm.isr) == 0 {
        return
    }
    
    // 计算所有ISR副本的最小LEO
    minLEO := int64(math.MaxInt64)
    for _, brokerID := range hwm.isr {
        replica := hwm.replicas[brokerID]
        if replica.logEndOffset < minLEO {
            minLEO = replica.logEndOffset
        }
    }
    
    // 更新HW
    if minLEO != math.MaxInt64 && minLEO > hwm.hw {
        oldHW := hwm.hw
        hwm.hw = minLEO
        
        log.Printf("HW更新: %d -> %d", oldHW, hwm.hw)
        
        // 通知所有副本更新HW
        hwm.notifyReplicasHWUpdate()
    }
}

// 通知副本HW更新
func (hwm *HighWatermarkManager) notifyReplicasHWUpdate() {
    for _, brokerID := range hwm.isr {
        replica := hwm.replicas[brokerID]
        
        // 发送HW更新请求
        go func(r *ReplicaInfo) {
            client := hwm.getBrokerClient(r.brokerID)
            updateRequest := &UpdateHighWatermarkRequest{
                Topic:     hwm.topic,
                Partition: hwm.partition,
                HighWatermark: hwm.hw,
                LeaderEpoch: hwm.leaderEpoch,
            }
            
            if err := client.UpdateHighWatermark(updateRequest); err != nil {
                log.Printf("更新副本 %d 的HW失败: %v", r.brokerID, err)
            }
        }(replica)
    }
}

// 检查消息是否已提交
func (hwm *HighWatermarkManager) isCommitted(offset int64) bool {
    return offset < hwm.hw
}

// 获取可读偏移量范围
func (hwm *HighWatermarkManager) getReadableOffsetRange() (int64, int64) {
    return 0, hwm.hw - 1
}

消费者可见性控制

// 消费者可见性管理器
type ConsumerVisibilityManager struct {
    hwm *HighWatermarkManager
}

// 过滤已提交的消息
func (cvm *ConsumerVisibilityManager) filterCommittedMessages(records []Record) []Record {
    var committedRecords []Record
    
    for _, record := range records {
        if cvm.hwm.isCommitted(record.Offset) {
            committedRecords = append(committedRecords, record)
        }
    }
    
    return committedRecords
}

// 处理消费者读取请求
func (cvm *ConsumerVisibilityManager) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error) {
    // 1. 从日志中读取消息
    allRecords, err := cvm.readFromLog(req.Offset, req.MaxBytes)
    if err != nil {
        return nil, err
    }
    
    // 2. 过滤已提交的消息
    committedRecords := cvm.filterCommittedMessages(allRecords)
    
    // 3. 返回响应
    return &FetchResponse{
        Topic:     req.Topic,
        Partition: req.Partition,
        Records:   committedRecords,
        HighWatermark: cvm.hwm.hw,
    }, nil
}

🛡️ LeaderEpoch防回退机制

LeaderEpoch管理

// LeaderEpoch管理器
type LeaderEpochManager struct {
    topic        string
    partition    int
    currentEpoch int32
    epochHistory []EpochEntry
}

type EpochEntry struct {
    Epoch     int32
    StartOffset int64
    LeaderID  int32
}

// 开始新的LeaderEpoch
func (lem *LeaderEpochManager) startNewEpoch(leaderID int32, startOffset int64) int32 {
    newEpoch := lem.currentEpoch + 1
    
    // 添加新的纪元条目
    entry := EpochEntry{
        Epoch:      newEpoch,
        StartOffset: startOffset,
        LeaderID:   leaderID,
    }
    lem.epochHistory = append(lem.epochHistory, entry)
    
    // 更新当前纪元
    lem.currentEpoch = newEpoch
    
    log.Printf("开始新的LeaderEpoch: %d, Leader: %d, StartOffset: %d", 
        newEpoch, leaderID, startOffset)
    
    return newEpoch
}

// 验证LeaderEpoch
func (lem *LeaderEpochManager) validateLeaderEpoch(epoch int32) error {
    if epoch < lem.currentEpoch {
        return fmt.Errorf("过期的LeaderEpoch: %d < %d", epoch, lem.currentEpoch)
    }
    
    if epoch > lem.currentEpoch {
        return fmt.Errorf("未来的LeaderEpoch: %d > %d", epoch, lem.currentEpoch)
    }
    
    return nil
}

// 根据偏移量查找LeaderEpoch
func (lem *LeaderEpochManager) findLeaderEpoch(offset int64) int32 {
    // 从最新的纪元开始查找
    for i := len(lem.epochHistory) - 1; i >= 0; i-- {
        entry := lem.epochHistory[i]
        if offset >= entry.StartOffset {
            return entry.Epoch
        }
    }
    
    return 0 // 默认返回0
}

防回退检查

// 防回退检查器
type FencingChecker struct {
    epochManager *LeaderEpochManager
    log          *PartitionLog
}

// 检查写入请求是否会导致回退
func (fc *FencingChecker) checkWriteRequest(req *ProduceRequest) error {
    // 1. 验证LeaderEpoch
    if err := fc.epochManager.validateLeaderEpoch(req.LeaderEpoch); err != nil {
        return err
    }
    
    // 2. 检查偏移量是否会导致回退
    if req.BaseOffset < fc.log.getLastOffset() {
        return fmt.Errorf("写入偏移量 %d 小于当前最后偏移量 %d", 
            req.BaseOffset, fc.log.getLastOffset())
    }
    
    // 3. 检查是否有重复的偏移量
    if req.BaseOffset == fc.log.getLastOffset() {
        // 检查消息内容是否相同
        if fc.isDuplicateMessage(req.BaseOffset, req.Records) {
            return fmt.Errorf("重复的消息偏移量: %d", req.BaseOffset)
        }
    }
    
    return nil
}

// 检查是否为重复消息
func (fc *FencingChecker) isDuplicateMessage(offset int64, records []Record) bool {
    // 读取现有消息
    existingRecords, err := fc.log.Fetch(offset, 1024*1024)
    if err != nil {
        return false
    }
    
    // 比较消息内容
    if len(existingRecords) != len(records) {
        return false
    }
    
    for i, existing := range existingRecords {
        if !fc.compareRecords(existing, records[i]) {
            return false
        }
    }
    
    return true
}

🗳️ 副本选举机制

选举流程

// 副本选举器
type ReplicaElectionManager struct {
    topic        string
    partition    int
    replicas     map[int32]*ReplicaInfo
    isr          []int32
    controller   *Controller
}

// 触发Leader选举
func (rem *ReplicaElectionManager) triggerLeaderElection() error {
    log.Printf("开始Leader选举,Topic: %s, Partition: %d", rem.topic, rem.partition)
    
    // 1. 检查ISR是否为空
    if len(rem.isr) == 0 {
        return fmt.Errorf("ISR为空,无法进行Leader选举")
    }
    
    // 2. 选择新的Leader
    newLeaderID := rem.selectNewLeader()
    if newLeaderID == -1 {
        return fmt.Errorf("无法选择新的Leader")
    }
    
    // 3. 执行Leader切换
    if err := rem.performLeaderSwitch(newLeaderID); err != nil {
        return err
    }
    
    log.Printf("Leader选举完成,新Leader: %d", newLeaderID)
    return nil
}

// 选择新的Leader
func (rem *ReplicaElectionManager) selectNewLeader() int32 {
    // 优先选择ISR中的副本
    for _, brokerID := range rem.isr {
        replica := rem.replicas[brokerID]
        if replica.isAlive && rem.isPreferredLeader(brokerID) {
            return brokerID
        }
    }
    
    // 如果没有Preferred Leader,选择第一个可用的ISR副本
    for _, brokerID := range rem.isr {
        replica := rem.replicas[brokerID]
        if replica.isAlive {
            return brokerID
        }
    }
    
    return -1
}

// 执行Leader切换
func (rem *ReplicaElectionManager) performLeaderSwitch(newLeaderID int32) error {
    // 1. 更新LeaderEpoch
    newEpoch := rem.epochManager.startNewEpoch(newLeaderID, rem.getCurrentLEO())
    
    // 2. 通知所有副本
    if err := rem.notifyReplicasLeaderChange(newLeaderID, newEpoch); err != nil {
        return err
    }
    
    // 3. 更新元数据
    if err := rem.updateMetadata(newLeaderID, newEpoch); err != nil {
        return err
    }
    
    return nil
}

// 通知副本Leader变更
func (rem *ReplicaElectionManager) notifyReplicasLeaderChange(newLeaderID int32, newEpoch int32) error {
    for brokerID, replica := range rem.replicas {
        if brokerID == newLeaderID {
            continue // 跳过新Leader
        }
        
        go func(bID int32, r *ReplicaInfo) {
            client := rem.getBrokerClient(bID)
            request := &LeaderChangeRequest{
                Topic:     rem.topic,
                Partition: rem.partition,
                NewLeaderID: newLeaderID,
                NewEpoch:    newEpoch,
            }
            
            if err := client.NotifyLeaderChange(request); err != nil {
                log.Printf("通知副本 %d Leader变更失败: %v", bID, err)
            }
        }(brokerID, replica)
    }
    
    return nil
}

故障检测和恢复

// 故障检测器
type FailureDetector struct {
    topic        string
    partition    int
    replicas     map[int32]*ReplicaInfo
    isr          []int32
    lastHeartbeat map[int32]int64
    heartbeatTimeout int64
}

// 检测副本故障
func (fd *FailureDetector) detectFailures() []int32 {
    var failedReplicas []int32
    currentTime := time.Now().UnixMilli()
    
    for brokerID, replica := range fd.replicas {
        lastHeartbeat := fd.lastHeartbeat[brokerID]
        
        // 检查心跳超时
        if currentTime-lastHeartbeat > fd.heartbeatTimeout {
            if replica.isAlive {
                log.Printf("检测到副本 %d 故障", brokerID)
                replica.isAlive = false
                failedReplicas = append(failedReplicas, brokerID)
            }
        }
    }
    
    return failedReplicas
}

// 处理副本故障
func (fd *FailureDetector) handleReplicaFailure(failedReplicas []int32) {
    for _, brokerID := range failedReplicas {
        replica := fd.replicas[brokerID]
        
        // 从ISR中移除
        if replica.isInISR {
            fd.removeFromISR(brokerID)
        }
        
        // 触发Leader选举(如果是Leader故障)
        if replica.isLeader {
            fd.triggerLeaderElection()
        }
    }
}

// 副本恢复处理
func (fd *FailureDetector) handleReplicaRecovery(brokerID int32) {
    replica := fd.replicas[brokerID]
    
    // 标记为活跃
    replica.isAlive = true
    fd.lastHeartbeat[brokerID] = time.Now().UnixMilli()
    
    log.Printf("副本 %d 恢复", brokerID)
    
    // 检查是否可以重新加入ISR
    if fd.canRejoinISR(brokerID) {
        fd.addToISR(brokerID)
    }
}

🎯 面试高频考点

1. ISR的作用和机制?

答案要点:

  • 定义: ISR是与Leader保持同步的副本集合
  • 维护: 通过心跳和拉取请求监控副本状态
  • 更新: 副本落后时从ISR移除,追上时重新加入
  • 作用: 保证数据一致性和可用性
  • 配置: min.insync.replicas控制最小ISR大小

2. 高水位(HW)的作用?

答案要点:

  • 定义: 已提交消息的最高偏移量
  • 计算: 所有ISR副本的最小LEO
  • 作用: 控制消费者可见性,防止读取未提交消息
  • 更新: 每次ISR更新后重新计算
  • 保证: 只有HW以下的消息对消费者可见

3. LeaderEpoch如何防止数据回退?

答案要点:

  • 机制: 每个Leader有唯一的纪元号
  • 验证: 写入前验证LeaderEpoch的有效性
  • 防护: 防止旧Leader恢复后覆盖新数据
  • 历史: 维护纪元历史,支持故障恢复
  • 检查: 结合偏移量检查,确保数据完整性

4. 副本选举的流程?

答案要点:

  • 触发: Leader故障或ISR变化时触发
  • 选择: 优先选择ISR中的Preferred Leader
  • 切换: 更新LeaderEpoch,通知所有副本
  • 恢复: 新Leader开始处理读写请求
  • 同步: 其他副本开始从新Leader同步

📝 本章小结

本章深入解析了Kafka的复制和ISR机制,包括:

  1. 复制模型: Leader/Follower架构,异步复制机制
  2. ISR管理: 同步副本集合的维护和更新
  3. 高水位: 数据一致性保证和消费者可见性控制
  4. LeaderEpoch: 防回退机制和故障恢复
  5. 副本选举: 故障检测和Leader切换流程

这些机制共同构成了Kafka高可用性和数据一致性的基础,理解了这些原理,就能更好地进行集群配置和故障处理。


下一章预告: 04-元数据管理与KRaft - 深入理解Kafka的元数据管理演进

Prev
02-存储模块-日志与索引
Next
04-元数据管理与KRaft