HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Kafka

    • Kafka 学习手册 - 总览与导读
    • 01-核心概念与架构
    • 02-存储模块-日志与索引
    • 03-复制与ISR机制
    • 04-元数据管理与KRaft
    • 05-消费者组协调
    • 06-事务与Exactly-Once语义
    • 07-性能优化与调优
    • 08-高可用与容灾
    • 09-面试高频问题详解
    • 10-实战项目-Mini-Kafka实现

08-高可用与容灾

📋 本章概览

本章深入探讨Kafka的高可用与容灾机制,这是Kafka在生产环境中稳定运行的关键保障。我们将从多副本容错、网络分区处理、磁盘故障恢复、跨数据中心部署等方面,全面解析Kafka如何实现高可用性和数据安全。

🎯 学习目标

  • 理解Kafka的多副本容错机制
  • 掌握网络分区和脑裂处理策略
  • 了解磁盘故障恢复和备份机制
  • 学习跨数据中心部署方案
  • 掌握灾备方案设计和实施

🛡️ 多副本容错机制

副本分布策略

┌─────────────────────────────────────────────────────────────────┐
│                    多副本容错架构                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                    Topic: user-events                      ││
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        ││
│  │  │Partition│  │Partition│  │Partition│  │Partition│        ││
│  │  │    0    │  │    1    │  │    2    │  │    3    │        ││
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
│                             │                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                    Broker 集群                              ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │   Broker 1  │  │   Broker 2  │  │   Broker 3  │        ││
│  │  │             │  │             │  │             │        ││
│  │  │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │        ││
│  │  │ │Part 0   │ │  │ │Part 0   │ │  │ │Part 0   │ │        ││
│  │  │ │Leader   │ │  │ │Follower │ │  │ │Follower │ │        ││
│  │  │ └─────────┘ │  │ └─────────┘ │  │ └─────────┘ │        ││
│  │  │             │  │             │  │             │        ││
│  │  │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │        ││
│  │  │ │Part 1   │ │  │ │Part 1   │ │  │ │Part 1   │ │        ││
│  │  │ │Follower │ │  │ │Leader   │ │  │ │Follower │ │        ││
│  │  │ └─────────┘ │  │ └─────────┘ │  │ └─────────┘ │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

副本管理实现

// 副本管理器
type ReplicaManager struct {
    brokerID        int32
    replicas        map[string]map[int32]*ReplicaInfo
    isrManager      *ISRManager
    hwmManager      *HighWatermarkManager
    failureDetector *FailureDetector
}

// 副本信息
type ReplicaInfo struct {
    topic        string
    partition    int32
    brokerID     int32
    isLeader     bool
    isInISR      bool
    logEndOffset int64
    highWatermark int64
    lastCaughtUpTime int64
    isAlive      bool
}

// 副本故障检测
func (rm *ReplicaManager) detectReplicaFailures() {
    ticker := time.NewTicker(30 * time.Second) // 30秒检测一次
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            rm.checkReplicaHealth()
        }
    }
}

// 检查副本健康状态
func (rm *ReplicaManager) checkReplicaHealth() {
    for topic, partitions := range rm.replicas {
        for partition, replica := range partitions {
            // 1. 检查心跳超时
            if rm.isHeartbeatTimeout(replica) {
                rm.handleReplicaFailure(topic, partition, replica)
            }
            
            // 2. 检查同步延迟
            if rm.isSyncLagTooHigh(replica) {
                rm.handleSyncLag(topic, partition, replica)
            }
        }
    }
}

// 处理副本故障
func (rm *ReplicaManager) handleReplicaFailure(topic string, partition int32, replica *ReplicaInfo) {
    log.Printf("检测到副本故障: %s-%d on broker %d", topic, partition, replica.brokerID)
    
    // 1. 标记副本为不可用
    replica.isAlive = false
    
    // 2. 从ISR中移除
    if replica.isInISR {
        rm.isrManager.removeFromISR(topic, partition, replica.brokerID)
    }
    
    // 3. 触发Leader选举(如果是Leader故障)
    if replica.isLeader {
        rm.triggerLeaderElection(topic, partition)
    }
    
    // 4. 通知Controller
    rm.notifyControllerReplicaFailure(topic, partition, replica.brokerID)
}

// 触发Leader选举
func (rm *ReplicaManager) triggerLeaderElection(topic string, partition int32) {
    // 1. 获取ISR列表
    isr := rm.isrManager.getISR(topic, partition)
    
    // 2. 选择新的Leader
    newLeader := rm.selectNewLeader(topic, partition, isr)
    
    // 3. 执行Leader切换
    if newLeader != nil {
        rm.performLeaderSwitch(topic, partition, newLeader)
    }
}

// 选择新的Leader
func (rm *ReplicaManager) selectNewLeader(topic string, partition int32, isr []int32) *ReplicaInfo {
    // 优先选择ISR中的副本
    for _, brokerID := range isr {
        replica := rm.replicas[topic][partition]
        if replica.brokerID == brokerID && replica.isAlive {
            return replica
        }
    }
    
    return nil
}

故障恢复机制

// 故障恢复管理器
type FailureRecoveryManager struct {
    replicaManager *ReplicaManager
    controller     *Controller
    recoveryQueue  chan *RecoveryTask
}

// 恢复任务
type RecoveryTask struct {
    Topic     string
    Partition int32
    BrokerID  int32
    Type      RecoveryType
    Priority  int
}

// 恢复类型
type RecoveryType int

const (
    ReplicaRecovery RecoveryType = iota
    LeaderRecovery
    ISRRecovery
    DataRecovery
)

// 启动故障恢复
func (frm *FailureRecoveryManager) start() {
    // 启动恢复工作器
    for i := 0; i < 3; i++ {
        go frm.recoveryWorker(i)
    }
    
    // 启动恢复调度器
    go frm.recoveryScheduler()
}

// 恢复工作器
func (frm *FailureRecoveryManager) recoveryWorker(id int) {
    for task := range frm.recoveryQueue {
        log.Printf("恢复工作器 %d 处理任务: %v", id, task)
        
        switch task.Type {
        case ReplicaRecovery:
            frm.recoverReplica(task)
        case LeaderRecovery:
            frm.recoverLeader(task)
        case ISRRecovery:
            frm.recoverISR(task)
        case DataRecovery:
            frm.recoverData(task)
        }
    }
}

// 恢复副本
func (frm *FailureRecoveryManager) recoverReplica(task *RecoveryTask) error {
    // 1. 检查副本状态
    replica := frm.replicaManager.getReplica(task.Topic, task.Partition, task.BrokerID)
    if replica == nil {
        return fmt.Errorf("副本不存在")
    }
    
    // 2. 启动副本同步
    if err := frm.startReplicaSync(replica); err != nil {
        return err
    }
    
    // 3. 监控同步进度
    go frm.monitorReplicaSync(replica)
    
    return nil
}

// 启动副本同步
func (frm *FailureRecoveryManager) startReplicaSync(replica *ReplicaInfo) error {
    // 1. 获取Leader信息
    leader := frm.replicaManager.getLeader(replica.topic, replica.partition)
    if leader == nil {
        return fmt.Errorf("无法找到Leader")
    }
    
    // 2. 计算同步起始点
    startOffset := replica.logEndOffset
    
    // 3. 开始同步
    go func() {
        for {
            // 从Leader拉取数据
            records, err := frm.fetchFromLeader(leader, startOffset)
            if err != nil {
                log.Printf("从Leader拉取数据失败: %v", err)
                time.Sleep(1 * time.Second)
                continue
            }
            
            // 写入本地日志
            if err := frm.writeToLocalLog(replica, records); err != nil {
                log.Printf("写入本地日志失败: %v", err)
                time.Sleep(1 * time.Second)
                continue
            }
            
            // 更新偏移量
            startOffset += int64(len(records))
            
            // 检查是否追上Leader
            if startOffset >= leader.logEndOffset {
                break
            }
        }
        
        // 同步完成,重新加入ISR
        frm.replicaManager.isrManager.addToISR(replica.topic, replica.partition, replica.brokerID)
    }()
    
    return nil
}

🌐 网络分区处理

网络分区检测

// 网络分区检测器
type NetworkPartitionDetector struct {
    brokerID        int32
    peers           map[int32]*Peer
    heartbeatInterval time.Duration
    partitionThreshold int
    mu              sync.RWMutex
}

// 对等节点
type Peer struct {
    brokerID    int32
    host        string
    port        int
    lastHeartbeat int64
    isAlive     bool
    client      *BrokerClient
}

// 检测网络分区
func (npd *NetworkPartitionDetector) detectNetworkPartition() {
    ticker := time.NewTicker(npd.heartbeatInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            npd.checkNetworkConnectivity()
        }
    }
}

// 检查网络连通性
func (npd *NetworkPartitionDetector) checkNetworkConnectivity() {
    npd.mu.Lock()
    defer npd.mu.Unlock()
    
    var alivePeers []int32
    var deadPeers []int32
    
    for brokerID, peer := range npd.peers {
        if npd.isPeerAlive(peer) {
            alivePeers = append(alivePeers, brokerID)
        } else {
            deadPeers = append(deadPeers, brokerID)
        }
    }
    
    // 检查是否发生网络分区
    if len(deadPeers) > 0 {
        npd.handleNetworkPartition(alivePeers, deadPeers)
    }
}

// 检查对等节点是否存活
func (npd *NetworkPartitionDetector) isPeerAlive(peer *Peer) bool {
    // 1. 检查心跳超时
    if time.Now().UnixMilli()-peer.lastHeartbeat > int64(npd.heartbeatInterval)*2 {
        return false
    }
    
    // 2. 发送ping请求
    if err := npd.pingPeer(peer); err != nil {
        return false
    }
    
    return true
}

// 处理网络分区
func (npd *NetworkPartitionDetector) handleNetworkPartition(alivePeers, deadPeers []int32) {
    log.Printf("检测到网络分区,存活节点: %v,故障节点: %v", alivePeers, deadPeers)
    
    // 1. 检查是否形成多数派
    if len(alivePeers) > len(deadPeers) {
        // 形成多数派,继续服务
        npd.handleMajorityPartition(alivePeers, deadPeers)
    } else {
        // 形成少数派,停止服务
        npd.handleMinorityPartition(alivePeers, deadPeers)
    }
}

// 处理多数派分区
func (npd *NetworkPartitionDetector) handleMajorityPartition(alivePeers, deadPeers []int32) {
    // 1. 继续提供服务
    log.Printf("多数派分区,继续提供服务")
    
    // 2. 从ISR中移除故障节点
    for _, deadBroker := range deadPeers {
        npd.removeDeadBrokerFromISR(deadBroker)
    }
    
    // 3. 触发Leader选举
    npd.triggerLeaderElectionForDeadBrokers(deadPeers)
}

// 处理少数派分区
func (npd *NetworkPartitionDetector) handleMinorityPartition(alivePeers, deadPeers []int32) {
    // 1. 停止接受写请求
    log.Printf("少数派分区,停止接受写请求")
    
    // 2. 设置只读模式
    npd.setReadOnlyMode(true)
    
    // 3. 等待网络恢复
    go npd.waitForNetworkRecovery()
}

// 等待网络恢复
func (npd *NetworkPartitionDetector) waitForNetworkRecovery() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if npd.isNetworkRecovered() {
                npd.handleNetworkRecovery()
                return
            }
        }
    }
}

// 检查网络是否恢复
func (npd *NetworkPartitionDetector) isNetworkRecovered() bool {
    npd.mu.RLock()
    defer npd.mu.RUnlock()
    
    aliveCount := 0
    for _, peer := range npd.peers {
        if npd.isPeerAlive(peer) {
            aliveCount++
        }
    }
    
    // 如果存活节点数超过一半,认为网络恢复
    return aliveCount > len(npd.peers)/2
}

// 处理网络恢复
func (npd *NetworkPartitionDetector) handleNetworkRecovery() {
    log.Printf("网络恢复,恢复正常服务")
    
    // 1. 取消只读模式
    npd.setReadOnlyMode(false)
    
    // 2. 重新同步数据
    npd.resyncData()
    
    // 3. 恢复ISR
    npd.restoreISR()
}

脑裂防护

// 脑裂防护器
type SplitBrainProtector struct {
    brokerID        int32
    controller      *Controller
    epochManager    *EpochManager
    fencingManager  *FencingManager
}

// 纪元管理器
type EpochManager struct {
    currentEpoch    int32
    epochHistory    []EpochEntry
    mu              sync.RWMutex
}

// 纪元条目
type EpochEntry struct {
    Epoch     int32
    LeaderID  int32
    StartTime int64
}

// 防护脑裂
func (sbp *SplitBrainProtector) protectAgainstSplitBrain() {
    // 1. 检查纪元有效性
    if !sbp.epochManager.isEpochValid() {
        sbp.handleInvalidEpoch()
        return
    }
    
    // 2. 检查Leader有效性
    if !sbp.isLeaderValid() {
        sbp.handleInvalidLeader()
        return
    }
    
    // 3. 检查多数派
    if !sbp.hasMajority() {
        sbp.handleNoMajority()
        return
    }
}

// 检查纪元有效性
func (em *EpochManager) isEpochValid() bool {
    em.mu.RLock()
    defer em.mu.RUnlock()
    
    // 检查纪元是否递增
    if len(em.epochHistory) > 0 {
        lastEpoch := em.epochHistory[len(em.epochHistory)-1].Epoch
        if em.currentEpoch < lastEpoch {
            return false
        }
    }
    
    return true
}

// 检查Leader有效性
func (sbp *SplitBrainProtector) isLeaderValid() bool {
    // 1. 检查Leader是否在ISR中
    if !sbp.isLeaderInISR() {
        return false
    }
    
    // 2. 检查Leader是否响应
    if !sbp.isLeaderResponsive() {
        return false
    }
    
    return true
}

// 检查是否有多数派
func (sbp *SplitBrainProtector) hasMajority() bool {
    // 1. 获取集群节点数
    totalNodes := sbp.controller.getClusterSize()
    
    // 2. 获取存活节点数
    aliveNodes := sbp.controller.getAliveNodes()
    
    // 3. 检查是否超过半数
    return aliveNodes > totalNodes/2
}

// 处理无效纪元
func (sbp *SplitBrainProtector) handleInvalidEpoch() {
    log.Printf("检测到无效纪元,停止服务")
    
    // 1. 停止接受请求
    sbp.stopAcceptingRequests()
    
    // 2. 等待纪元更新
    go sbp.waitForEpochUpdate()
}

// 处理无效Leader
func (sbp *SplitBrainProtector) handleInvalidLeader() {
    log.Printf("检测到无效Leader,触发重新选举")
    
    // 1. 触发Leader选举
    sbp.triggerLeaderElection()
    
    // 2. 等待新Leader
    go sbp.waitForNewLeader()
}

// 处理无多数派
func (sbp *SplitBrainProtector) handleNoMajority() {
    log.Printf("检测到无多数派,停止服务")
    
    // 1. 停止接受写请求
    sbp.stopAcceptingWriteRequests()
    
    // 2. 等待多数派恢复
    go sbp.waitForMajority()
}

💾 磁盘故障恢复

磁盘监控

// 磁盘监控器
type DiskMonitor struct {
    diskPaths       []string
    checkInterval   time.Duration
    alertThreshold  float64
    mu              sync.RWMutex
    diskStatus      map[string]*DiskStatus
}

// 磁盘状态
type DiskStatus struct {
    Path        string
    TotalSpace  int64
    UsedSpace   int64
    FreeSpace   int64
    UsagePercent float64
    IsHealthy   bool
    LastCheck   int64
}

// 监控磁盘
func (dm *DiskMonitor) monitorDisks() {
    ticker := time.NewTicker(dm.checkInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            dm.checkAllDisks()
        }
    }
}

// 检查所有磁盘
func (dm *DiskMonitor) checkAllDisks() {
    for _, path := range dm.diskPaths {
        go dm.checkDisk(path)
    }
}

// 检查单个磁盘
func (dm *DiskMonitor) checkDisk(path string) {
    // 1. 获取磁盘使用情况
    usage, err := dm.getDiskUsage(path)
    if err != nil {
        log.Printf("获取磁盘使用情况失败: %v", err)
        return
    }
    
    // 2. 更新磁盘状态
    dm.updateDiskStatus(path, usage)
    
    // 3. 检查是否需要告警
    if usage.UsagePercent > dm.alertThreshold {
        dm.handleDiskAlert(path, usage)
    }
}

// 获取磁盘使用情况
func (dm *DiskMonitor) getDiskUsage(path string) (*DiskStatus, error) {
    var stat syscall.Statfs_t
    if err := syscall.Statfs(path, &stat); err != nil {
        return nil, err
    }
    
    totalSpace := int64(stat.Blocks) * int64(stat.Bsize)
    freeSpace := int64(stat.Bavail) * int64(stat.Bsize)
    usedSpace := totalSpace - freeSpace
    usagePercent := float64(usedSpace) / float64(totalSpace) * 100
    
    return &DiskStatus{
        Path:        path,
        TotalSpace:  totalSpace,
        UsedSpace:   usedSpace,
        FreeSpace:   freeSpace,
        UsagePercent: usagePercent,
        IsHealthy:   usagePercent < dm.alertThreshold,
        LastCheck:   time.Now().UnixMilli(),
    }, nil
}

// 处理磁盘告警
func (dm *DiskMonitor) handleDiskAlert(path string, status *DiskStatus) {
    log.Printf("磁盘告警: %s 使用率 %.2f%%", path, status.UsagePercent)
    
    // 1. 发送告警通知
    dm.sendDiskAlert(path, status)
    
    // 2. 触发磁盘清理
    if status.UsagePercent > 90 {
        dm.triggerDiskCleanup(path)
    }
    
    // 3. 触发数据迁移
    if status.UsagePercent > 95 {
        dm.triggerDataMigration(path)
    }
}

数据备份与恢复

// 数据备份管理器
type DataBackupManager struct {
    backupDir       string
    backupInterval  time.Duration
    retentionDays   int
    compressionType string
    encryptionKey   []byte
}

// 备份任务
type BackupTask struct {
    Topic     string
    Partition int32
    StartOffset int64
    EndOffset   int64
    BackupPath  string
    Timestamp   int64
}

// 执行备份
func (dbm *DataBackupManager) performBackup() {
    ticker := time.NewTicker(dbm.backupInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            dbm.backupAllTopics()
        }
    }
}

// 备份所有Topic
func (dbm *DataBackupManager) backupAllTopics() {
    // 1. 获取所有Topic
    topics := dbm.getAllTopics()
    
    // 2. 并行备份每个Topic
    var wg sync.WaitGroup
    for _, topic := range topics {
        wg.Add(1)
        go func(t string) {
            defer wg.Done()
            dbm.backupTopic(t)
        }(topic)
    }
    
    wg.Wait()
}

// 备份Topic
func (dbm *DataBackupManager) backupTopic(topic string) {
    // 1. 获取Topic的所有分区
    partitions := dbm.getTopicPartitions(topic)
    
    // 2. 备份每个分区
    for _, partition := range partitions {
        dbm.backupPartition(topic, partition)
    }
}

// 备份分区
func (dbm *DataBackupManager) backupPartition(topic string, partition int32) {
    // 1. 创建备份任务
    task := &BackupTask{
        Topic:       topic,
        Partition:   partition,
        StartOffset: 0,
        EndOffset:   dbm.getPartitionEndOffset(topic, partition),
        BackupPath:  dbm.generateBackupPath(topic, partition),
        Timestamp:   time.Now().UnixMilli(),
    }
    
    // 2. 执行备份
    if err := dbm.executeBackup(task); err != nil {
        log.Printf("备份失败: %v", err)
        return
    }
    
    // 3. 验证备份
    if err := dbm.verifyBackup(task); err != nil {
        log.Printf("备份验证失败: %v", err)
        return
    }
    
    log.Printf("备份完成: %s-%d", topic, partition)
}

// 执行备份
func (dbm *DataBackupManager) executeBackup(task *BackupTask) error {
    // 1. 读取分区数据
    data, err := dbm.readPartitionData(task.Topic, task.Partition, task.StartOffset, task.EndOffset)
    if err != nil {
        return err
    }
    
    // 2. 压缩数据
    compressedData, err := dbm.compressData(data)
    if err != nil {
        return err
    }
    
    // 3. 加密数据
    encryptedData, err := dbm.encryptData(compressedData)
    if err != nil {
        return err
    }
    
    // 4. 写入备份文件
    return dbm.writeBackupFile(task.BackupPath, encryptedData)
}

// 数据恢复
func (dbm *DataBackupManager) restoreData(topic string, partition int32, backupPath string) error {
    // 1. 读取备份文件
    encryptedData, err := dbm.readBackupFile(backupPath)
    if err != nil {
        return err
    }
    
    // 2. 解密数据
    compressedData, err := dbm.decryptData(encryptedData)
    if err != nil {
        return err
    }
    
    // 3. 解压数据
    data, err := dbm.decompressData(compressedData)
    if err != nil {
        return err
    }
    
    // 4. 恢复分区数据
    return dbm.restorePartitionData(topic, partition, data)
}

🌍 跨数据中心部署

MirrorMaker架构

┌─────────────────────────────────────────────────────────────────┐
│                   跨数据中心部署架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   数据中心 A (主)                          ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │   Kafka     │  │   Kafka     │  │   Kafka     │        ││
│  │  │   Broker 1  │  │   Broker 2  │  │   Broker 3  │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
│                             │                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   网络连接                                 ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │   Mirror    │  │   Mirror    │  │   Mirror    │        ││
│  │  │   Maker 1   │  │   Maker 2   │  │   Maker 3   │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
│                             │                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   数据中心 B (备)                          ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │   Kafka     │  │   Kafka     │  │   Kafka     │        ││
│  │  │   Broker 1  │  │   Broker 2  │  │   Broker 3  │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

MirrorMaker实现

// MirrorMaker
type MirrorMaker struct {
    sourceCluster    *ClusterConfig
    targetCluster    *ClusterConfig
    topics           []string
    consumerGroup    string
    producerConfig   *ProducerConfig
    consumerConfig   *ConsumerConfig
    offsetManager    *OffsetManager
    errorHandler     *ErrorHandler
}

// 集群配置
type ClusterConfig struct {
    Name        string
    Brokers     []string
    Security    *SecurityConfig
    Properties  map[string]string
}

// 启动MirrorMaker
func (mm *MirrorMaker) start() error {
    // 1. 初始化消费者
    consumer, err := mm.createConsumer()
    if err != nil {
        return err
    }
    
    // 2. 初始化生产者
    producer, err := mm.createProducer()
    if err != nil {
        return err
    }
    
    // 3. 启动镜像任务
    go mm.mirrorTask(consumer, producer)
    
    return nil
}

// 镜像任务
func (mm *MirrorMaker) mirrorTask(consumer *Consumer, producer *Producer) {
    for {
        // 1. 从源集群拉取消息
        records, err := consumer.poll()
        if err != nil {
            mm.errorHandler.handleError(err)
            continue
        }
        
        // 2. 转换消息格式
        transformedRecords := mm.transformRecords(records)
        
        // 3. 发送到目标集群
        if err := producer.sendBatch(transformedRecords); err != nil {
            mm.errorHandler.handleError(err)
            continue
        }
        
        // 4. 提交偏移量
        if err := consumer.commitOffset(); err != nil {
            mm.errorHandler.handleError(err)
        }
    }
}

// 转换消息格式
func (mm *MirrorMaker) transformRecords(records []*ConsumerRecord) []*ProducerRecord {
    var transformedRecords []*ProducerRecord
    
    for _, record := range records {
        // 1. 转换Topic名称
        targetTopic := mm.transformTopicName(record.Topic)
        
        // 2. 转换消息内容
        transformedRecord := &ProducerRecord{
            Topic:     targetTopic,
            Partition: record.Partition,
            Key:       record.Key,
            Value:     record.Value,
            Headers:   record.Headers,
            Timestamp: record.Timestamp,
        }
        
        transformedRecords = append(transformedRecords, transformedRecord)
    }
    
    return transformedRecords
}

// 转换Topic名称
func (mm *MirrorMaker) transformTopicName(sourceTopic string) string {
    // 添加前缀或后缀
    return fmt.Sprintf("%s.%s", mm.targetCluster.Name, sourceTopic)
}

双向同步

// 双向同步管理器
type BidirectionalSyncManager struct {
    clusterA    *ClusterConfig
    clusterB    *ClusterConfig
    syncConfig  *SyncConfig
    conflictResolver *ConflictResolver
}

// 同步配置
type SyncConfig struct {
    SyncInterval    time.Duration
    ConflictPolicy  ConflictPolicy
    MaxRetries      int
    RetryInterval   time.Duration
}

// 冲突解决策略
type ConflictPolicy int

const (
    LastWriteWins ConflictPolicy = iota
    FirstWriteWins
    ManualResolution
)

// 启动双向同步
func (bsm *BidirectionalSyncManager) start() error {
    // 1. 启动A到B的同步
    go bsm.syncAToB()
    
    // 2. 启动B到A的同步
    go bsm.syncBToA()
    
    // 3. 启动冲突检测
    go bsm.detectConflicts()
    
    return nil
}

// A到B同步
func (bsm *BidirectionalSyncManager) syncAToB() {
    ticker := time.NewTicker(bsm.syncConfig.SyncInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            bsm.performSync(bsm.clusterA, bsm.clusterB)
        }
    }
}

// B到A同步
func (bsm *BidirectionalSyncManager) syncBToA() {
    ticker := time.NewTicker(bsm.syncConfig.SyncInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            bsm.performSync(bsm.clusterB, bsm.clusterA)
        }
    }
}

// 执行同步
func (bsm *BidirectionalSyncManager) performSync(source, target *ClusterConfig) {
    // 1. 获取源集群的变更
    changes, err := bsm.getChanges(source)
    if err != nil {
        log.Printf("获取变更失败: %v", err)
        return
    }
    
    // 2. 应用变更到目标集群
    for _, change := range changes {
        if err := bsm.applyChange(target, change); err != nil {
            log.Printf("应用变更失败: %v", err)
            continue
        }
    }
}

// 检测冲突
func (bsm *BidirectionalSyncManager) detectConflicts() {
    ticker := time.NewTicker(bsm.syncConfig.SyncInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            conflicts := bsm.findConflicts()
            for _, conflict := range conflicts {
                bsm.resolveConflict(conflict)
            }
        }
    }
}

// 解决冲突
func (bsm *BidirectionalSyncManager) resolveConflict(conflict *Conflict) {
    switch bsm.syncConfig.ConflictPolicy {
    case LastWriteWins:
        bsm.resolveLastWriteWins(conflict)
    case FirstWriteWins:
        bsm.resolveFirstWriteWins(conflict)
    case ManualResolution:
        bsm.resolveManual(conflict)
    }
}

🎯 面试高频考点

1. Kafka如何保证高可用性?

答案要点:

  • 多副本机制: 每个分区有多个副本
  • ISR管理: 维护同步副本集合
  • Leader选举: 故障时自动选举新Leader
  • 故障检测: 实时监控副本健康状态
  • 自动恢复: 故障恢复后自动重新同步

2. 网络分区时Kafka如何处理?

答案要点:

  • 多数派原则: 只有多数派才能提供服务
  • 脑裂防护: 使用纪元机制防止脑裂
  • 只读模式: 少数派进入只读模式
  • 自动恢复: 网络恢复后自动恢复正常
  • 数据一致性: 确保数据不丢失

3. 磁盘故障如何恢复?

答案要点:

  • 磁盘监控: 实时监控磁盘使用情况
  • 数据备份: 定期备份重要数据
  • 故障转移: 故障时转移到其他副本
  • 数据恢复: 从备份恢复数据
  • 预防措施: 使用RAID和冗余存储

4. 跨数据中心部署方案?

答案要点:

  • MirrorMaker: 单向数据复制
  • 双向同步: 支持双向数据同步
  • 冲突解决: 处理数据冲突
  • 网络优化: 优化跨网络传输
  • 监控告警: 监控同步状态

📝 本章小结

本章深入解析了Kafka的高可用与容灾机制,包括:

  1. 多副本容错: 副本分布、故障检测、自动恢复
  2. 网络分区处理: 分区检测、脑裂防护、自动恢复
  3. 磁盘故障恢复: 磁盘监控、数据备份、故障转移
  4. 跨数据中心部署: MirrorMaker、双向同步、冲突解决
  5. 灾备方案: 完整的灾备策略和实施方法

高可用与容灾是Kafka在生产环境中的关键保障,理解了这些机制,就能更好地设计稳定可靠的Kafka集群。


下一章预告: 09-面试高频问题详解 - 深入解析Kafka面试中的核心问题

Prev
07-性能优化与调优
Next
09-面试高频问题详解