08-高可用与容灾
📋 本章概览
本章深入探讨Kafka的高可用与容灾机制,这是Kafka在生产环境中稳定运行的关键保障。我们将从多副本容错、网络分区处理、磁盘故障恢复、跨数据中心部署等方面,全面解析Kafka如何实现高可用性和数据安全。
🎯 学习目标
- 理解Kafka的多副本容错机制
- 掌握网络分区和脑裂处理策略
- 了解磁盘故障恢复和备份机制
- 学习跨数据中心部署方案
- 掌握灾备方案设计和实施
🛡️ 多副本容错机制
副本分布策略
┌─────────────────────────────────────────────────────────────────┐
│ 多副本容错架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Topic: user-events ││
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││
│ │ │Partition│ │Partition│ │Partition│ │Partition│ ││
│ │ │ 0 │ │ 1 │ │ 2 │ │ 3 │ ││
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Broker 集群 ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ ││
│ │ │ │ │ │ │ │ ││
│ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ ││
│ │ │ │Part 0 │ │ │ │Part 0 │ │ │ │Part 0 │ │ ││
│ │ │ │Leader │ │ │ │Follower │ │ │ │Follower │ │ ││
│ │ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ ││
│ │ │ │ │ │ │ │ ││
│ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ ││
│ │ │ │Part 1 │ │ │ │Part 1 │ │ │ │Part 1 │ │ ││
│ │ │ │Follower │ │ │ │Leader │ │ │ │Follower │ │ ││
│ │ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
副本管理实现
// 副本管理器
type ReplicaManager struct {
brokerID int32
replicas map[string]map[int32]*ReplicaInfo
isrManager *ISRManager
hwmManager *HighWatermarkManager
failureDetector *FailureDetector
}
// 副本信息
type ReplicaInfo struct {
topic string
partition int32
brokerID int32
isLeader bool
isInISR bool
logEndOffset int64
highWatermark int64
lastCaughtUpTime int64
isAlive bool
}
// 副本故障检测
func (rm *ReplicaManager) detectReplicaFailures() {
ticker := time.NewTicker(30 * time.Second) // 30秒检测一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
rm.checkReplicaHealth()
}
}
}
// 检查副本健康状态
func (rm *ReplicaManager) checkReplicaHealth() {
for topic, partitions := range rm.replicas {
for partition, replica := range partitions {
// 1. 检查心跳超时
if rm.isHeartbeatTimeout(replica) {
rm.handleReplicaFailure(topic, partition, replica)
}
// 2. 检查同步延迟
if rm.isSyncLagTooHigh(replica) {
rm.handleSyncLag(topic, partition, replica)
}
}
}
}
// 处理副本故障
func (rm *ReplicaManager) handleReplicaFailure(topic string, partition int32, replica *ReplicaInfo) {
log.Printf("检测到副本故障: %s-%d on broker %d", topic, partition, replica.brokerID)
// 1. 标记副本为不可用
replica.isAlive = false
// 2. 从ISR中移除
if replica.isInISR {
rm.isrManager.removeFromISR(topic, partition, replica.brokerID)
}
// 3. 触发Leader选举(如果是Leader故障)
if replica.isLeader {
rm.triggerLeaderElection(topic, partition)
}
// 4. 通知Controller
rm.notifyControllerReplicaFailure(topic, partition, replica.brokerID)
}
// 触发Leader选举
func (rm *ReplicaManager) triggerLeaderElection(topic string, partition int32) {
// 1. 获取ISR列表
isr := rm.isrManager.getISR(topic, partition)
// 2. 选择新的Leader
newLeader := rm.selectNewLeader(topic, partition, isr)
// 3. 执行Leader切换
if newLeader != nil {
rm.performLeaderSwitch(topic, partition, newLeader)
}
}
// 选择新的Leader
func (rm *ReplicaManager) selectNewLeader(topic string, partition int32, isr []int32) *ReplicaInfo {
// 优先选择ISR中的副本
for _, brokerID := range isr {
replica := rm.replicas[topic][partition]
if replica.brokerID == brokerID && replica.isAlive {
return replica
}
}
return nil
}
故障恢复机制
// 故障恢复管理器
type FailureRecoveryManager struct {
replicaManager *ReplicaManager
controller *Controller
recoveryQueue chan *RecoveryTask
}
// 恢复任务
type RecoveryTask struct {
Topic string
Partition int32
BrokerID int32
Type RecoveryType
Priority int
}
// 恢复类型
type RecoveryType int
const (
ReplicaRecovery RecoveryType = iota
LeaderRecovery
ISRRecovery
DataRecovery
)
// 启动故障恢复
func (frm *FailureRecoveryManager) start() {
// 启动恢复工作器
for i := 0; i < 3; i++ {
go frm.recoveryWorker(i)
}
// 启动恢复调度器
go frm.recoveryScheduler()
}
// 恢复工作器
func (frm *FailureRecoveryManager) recoveryWorker(id int) {
for task := range frm.recoveryQueue {
log.Printf("恢复工作器 %d 处理任务: %v", id, task)
switch task.Type {
case ReplicaRecovery:
frm.recoverReplica(task)
case LeaderRecovery:
frm.recoverLeader(task)
case ISRRecovery:
frm.recoverISR(task)
case DataRecovery:
frm.recoverData(task)
}
}
}
// 恢复副本
func (frm *FailureRecoveryManager) recoverReplica(task *RecoveryTask) error {
// 1. 检查副本状态
replica := frm.replicaManager.getReplica(task.Topic, task.Partition, task.BrokerID)
if replica == nil {
return fmt.Errorf("副本不存在")
}
// 2. 启动副本同步
if err := frm.startReplicaSync(replica); err != nil {
return err
}
// 3. 监控同步进度
go frm.monitorReplicaSync(replica)
return nil
}
// 启动副本同步
func (frm *FailureRecoveryManager) startReplicaSync(replica *ReplicaInfo) error {
// 1. 获取Leader信息
leader := frm.replicaManager.getLeader(replica.topic, replica.partition)
if leader == nil {
return fmt.Errorf("无法找到Leader")
}
// 2. 计算同步起始点
startOffset := replica.logEndOffset
// 3. 开始同步
go func() {
for {
// 从Leader拉取数据
records, err := frm.fetchFromLeader(leader, startOffset)
if err != nil {
log.Printf("从Leader拉取数据失败: %v", err)
time.Sleep(1 * time.Second)
continue
}
// 写入本地日志
if err := frm.writeToLocalLog(replica, records); err != nil {
log.Printf("写入本地日志失败: %v", err)
time.Sleep(1 * time.Second)
continue
}
// 更新偏移量
startOffset += int64(len(records))
// 检查是否追上Leader
if startOffset >= leader.logEndOffset {
break
}
}
// 同步完成,重新加入ISR
frm.replicaManager.isrManager.addToISR(replica.topic, replica.partition, replica.brokerID)
}()
return nil
}
🌐 网络分区处理
网络分区检测
// 网络分区检测器
type NetworkPartitionDetector struct {
brokerID int32
peers map[int32]*Peer
heartbeatInterval time.Duration
partitionThreshold int
mu sync.RWMutex
}
// 对等节点
type Peer struct {
brokerID int32
host string
port int
lastHeartbeat int64
isAlive bool
client *BrokerClient
}
// 检测网络分区
func (npd *NetworkPartitionDetector) detectNetworkPartition() {
ticker := time.NewTicker(npd.heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
npd.checkNetworkConnectivity()
}
}
}
// 检查网络连通性
func (npd *NetworkPartitionDetector) checkNetworkConnectivity() {
npd.mu.Lock()
defer npd.mu.Unlock()
var alivePeers []int32
var deadPeers []int32
for brokerID, peer := range npd.peers {
if npd.isPeerAlive(peer) {
alivePeers = append(alivePeers, brokerID)
} else {
deadPeers = append(deadPeers, brokerID)
}
}
// 检查是否发生网络分区
if len(deadPeers) > 0 {
npd.handleNetworkPartition(alivePeers, deadPeers)
}
}
// 检查对等节点是否存活
func (npd *NetworkPartitionDetector) isPeerAlive(peer *Peer) bool {
// 1. 检查心跳超时
if time.Now().UnixMilli()-peer.lastHeartbeat > int64(npd.heartbeatInterval)*2 {
return false
}
// 2. 发送ping请求
if err := npd.pingPeer(peer); err != nil {
return false
}
return true
}
// 处理网络分区
func (npd *NetworkPartitionDetector) handleNetworkPartition(alivePeers, deadPeers []int32) {
log.Printf("检测到网络分区,存活节点: %v,故障节点: %v", alivePeers, deadPeers)
// 1. 检查是否形成多数派
if len(alivePeers) > len(deadPeers) {
// 形成多数派,继续服务
npd.handleMajorityPartition(alivePeers, deadPeers)
} else {
// 形成少数派,停止服务
npd.handleMinorityPartition(alivePeers, deadPeers)
}
}
// 处理多数派分区
func (npd *NetworkPartitionDetector) handleMajorityPartition(alivePeers, deadPeers []int32) {
// 1. 继续提供服务
log.Printf("多数派分区,继续提供服务")
// 2. 从ISR中移除故障节点
for _, deadBroker := range deadPeers {
npd.removeDeadBrokerFromISR(deadBroker)
}
// 3. 触发Leader选举
npd.triggerLeaderElectionForDeadBrokers(deadPeers)
}
// 处理少数派分区
func (npd *NetworkPartitionDetector) handleMinorityPartition(alivePeers, deadPeers []int32) {
// 1. 停止接受写请求
log.Printf("少数派分区,停止接受写请求")
// 2. 设置只读模式
npd.setReadOnlyMode(true)
// 3. 等待网络恢复
go npd.waitForNetworkRecovery()
}
// 等待网络恢复
func (npd *NetworkPartitionDetector) waitForNetworkRecovery() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if npd.isNetworkRecovered() {
npd.handleNetworkRecovery()
return
}
}
}
}
// 检查网络是否恢复
func (npd *NetworkPartitionDetector) isNetworkRecovered() bool {
npd.mu.RLock()
defer npd.mu.RUnlock()
aliveCount := 0
for _, peer := range npd.peers {
if npd.isPeerAlive(peer) {
aliveCount++
}
}
// 如果存活节点数超过一半,认为网络恢复
return aliveCount > len(npd.peers)/2
}
// 处理网络恢复
func (npd *NetworkPartitionDetector) handleNetworkRecovery() {
log.Printf("网络恢复,恢复正常服务")
// 1. 取消只读模式
npd.setReadOnlyMode(false)
// 2. 重新同步数据
npd.resyncData()
// 3. 恢复ISR
npd.restoreISR()
}
脑裂防护
// 脑裂防护器
type SplitBrainProtector struct {
brokerID int32
controller *Controller
epochManager *EpochManager
fencingManager *FencingManager
}
// 纪元管理器
type EpochManager struct {
currentEpoch int32
epochHistory []EpochEntry
mu sync.RWMutex
}
// 纪元条目
type EpochEntry struct {
Epoch int32
LeaderID int32
StartTime int64
}
// 防护脑裂
func (sbp *SplitBrainProtector) protectAgainstSplitBrain() {
// 1. 检查纪元有效性
if !sbp.epochManager.isEpochValid() {
sbp.handleInvalidEpoch()
return
}
// 2. 检查Leader有效性
if !sbp.isLeaderValid() {
sbp.handleInvalidLeader()
return
}
// 3. 检查多数派
if !sbp.hasMajority() {
sbp.handleNoMajority()
return
}
}
// 检查纪元有效性
func (em *EpochManager) isEpochValid() bool {
em.mu.RLock()
defer em.mu.RUnlock()
// 检查纪元是否递增
if len(em.epochHistory) > 0 {
lastEpoch := em.epochHistory[len(em.epochHistory)-1].Epoch
if em.currentEpoch < lastEpoch {
return false
}
}
return true
}
// 检查Leader有效性
func (sbp *SplitBrainProtector) isLeaderValid() bool {
// 1. 检查Leader是否在ISR中
if !sbp.isLeaderInISR() {
return false
}
// 2. 检查Leader是否响应
if !sbp.isLeaderResponsive() {
return false
}
return true
}
// 检查是否有多数派
func (sbp *SplitBrainProtector) hasMajority() bool {
// 1. 获取集群节点数
totalNodes := sbp.controller.getClusterSize()
// 2. 获取存活节点数
aliveNodes := sbp.controller.getAliveNodes()
// 3. 检查是否超过半数
return aliveNodes > totalNodes/2
}
// 处理无效纪元
func (sbp *SplitBrainProtector) handleInvalidEpoch() {
log.Printf("检测到无效纪元,停止服务")
// 1. 停止接受请求
sbp.stopAcceptingRequests()
// 2. 等待纪元更新
go sbp.waitForEpochUpdate()
}
// 处理无效Leader
func (sbp *SplitBrainProtector) handleInvalidLeader() {
log.Printf("检测到无效Leader,触发重新选举")
// 1. 触发Leader选举
sbp.triggerLeaderElection()
// 2. 等待新Leader
go sbp.waitForNewLeader()
}
// 处理无多数派
func (sbp *SplitBrainProtector) handleNoMajority() {
log.Printf("检测到无多数派,停止服务")
// 1. 停止接受写请求
sbp.stopAcceptingWriteRequests()
// 2. 等待多数派恢复
go sbp.waitForMajority()
}
💾 磁盘故障恢复
磁盘监控
// 磁盘监控器
type DiskMonitor struct {
diskPaths []string
checkInterval time.Duration
alertThreshold float64
mu sync.RWMutex
diskStatus map[string]*DiskStatus
}
// 磁盘状态
type DiskStatus struct {
Path string
TotalSpace int64
UsedSpace int64
FreeSpace int64
UsagePercent float64
IsHealthy bool
LastCheck int64
}
// 监控磁盘
func (dm *DiskMonitor) monitorDisks() {
ticker := time.NewTicker(dm.checkInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
dm.checkAllDisks()
}
}
}
// 检查所有磁盘
func (dm *DiskMonitor) checkAllDisks() {
for _, path := range dm.diskPaths {
go dm.checkDisk(path)
}
}
// 检查单个磁盘
func (dm *DiskMonitor) checkDisk(path string) {
// 1. 获取磁盘使用情况
usage, err := dm.getDiskUsage(path)
if err != nil {
log.Printf("获取磁盘使用情况失败: %v", err)
return
}
// 2. 更新磁盘状态
dm.updateDiskStatus(path, usage)
// 3. 检查是否需要告警
if usage.UsagePercent > dm.alertThreshold {
dm.handleDiskAlert(path, usage)
}
}
// 获取磁盘使用情况
func (dm *DiskMonitor) getDiskUsage(path string) (*DiskStatus, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return nil, err
}
totalSpace := int64(stat.Blocks) * int64(stat.Bsize)
freeSpace := int64(stat.Bavail) * int64(stat.Bsize)
usedSpace := totalSpace - freeSpace
usagePercent := float64(usedSpace) / float64(totalSpace) * 100
return &DiskStatus{
Path: path,
TotalSpace: totalSpace,
UsedSpace: usedSpace,
FreeSpace: freeSpace,
UsagePercent: usagePercent,
IsHealthy: usagePercent < dm.alertThreshold,
LastCheck: time.Now().UnixMilli(),
}, nil
}
// 处理磁盘告警
func (dm *DiskMonitor) handleDiskAlert(path string, status *DiskStatus) {
log.Printf("磁盘告警: %s 使用率 %.2f%%", path, status.UsagePercent)
// 1. 发送告警通知
dm.sendDiskAlert(path, status)
// 2. 触发磁盘清理
if status.UsagePercent > 90 {
dm.triggerDiskCleanup(path)
}
// 3. 触发数据迁移
if status.UsagePercent > 95 {
dm.triggerDataMigration(path)
}
}
数据备份与恢复
// 数据备份管理器
type DataBackupManager struct {
backupDir string
backupInterval time.Duration
retentionDays int
compressionType string
encryptionKey []byte
}
// 备份任务
type BackupTask struct {
Topic string
Partition int32
StartOffset int64
EndOffset int64
BackupPath string
Timestamp int64
}
// 执行备份
func (dbm *DataBackupManager) performBackup() {
ticker := time.NewTicker(dbm.backupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
dbm.backupAllTopics()
}
}
}
// 备份所有Topic
func (dbm *DataBackupManager) backupAllTopics() {
// 1. 获取所有Topic
topics := dbm.getAllTopics()
// 2. 并行备份每个Topic
var wg sync.WaitGroup
for _, topic := range topics {
wg.Add(1)
go func(t string) {
defer wg.Done()
dbm.backupTopic(t)
}(topic)
}
wg.Wait()
}
// 备份Topic
func (dbm *DataBackupManager) backupTopic(topic string) {
// 1. 获取Topic的所有分区
partitions := dbm.getTopicPartitions(topic)
// 2. 备份每个分区
for _, partition := range partitions {
dbm.backupPartition(topic, partition)
}
}
// 备份分区
func (dbm *DataBackupManager) backupPartition(topic string, partition int32) {
// 1. 创建备份任务
task := &BackupTask{
Topic: topic,
Partition: partition,
StartOffset: 0,
EndOffset: dbm.getPartitionEndOffset(topic, partition),
BackupPath: dbm.generateBackupPath(topic, partition),
Timestamp: time.Now().UnixMilli(),
}
// 2. 执行备份
if err := dbm.executeBackup(task); err != nil {
log.Printf("备份失败: %v", err)
return
}
// 3. 验证备份
if err := dbm.verifyBackup(task); err != nil {
log.Printf("备份验证失败: %v", err)
return
}
log.Printf("备份完成: %s-%d", topic, partition)
}
// 执行备份
func (dbm *DataBackupManager) executeBackup(task *BackupTask) error {
// 1. 读取分区数据
data, err := dbm.readPartitionData(task.Topic, task.Partition, task.StartOffset, task.EndOffset)
if err != nil {
return err
}
// 2. 压缩数据
compressedData, err := dbm.compressData(data)
if err != nil {
return err
}
// 3. 加密数据
encryptedData, err := dbm.encryptData(compressedData)
if err != nil {
return err
}
// 4. 写入备份文件
return dbm.writeBackupFile(task.BackupPath, encryptedData)
}
// 数据恢复
func (dbm *DataBackupManager) restoreData(topic string, partition int32, backupPath string) error {
// 1. 读取备份文件
encryptedData, err := dbm.readBackupFile(backupPath)
if err != nil {
return err
}
// 2. 解密数据
compressedData, err := dbm.decryptData(encryptedData)
if err != nil {
return err
}
// 3. 解压数据
data, err := dbm.decompressData(compressedData)
if err != nil {
return err
}
// 4. 恢复分区数据
return dbm.restorePartitionData(topic, partition, data)
}
🌍 跨数据中心部署
MirrorMaker架构
┌─────────────────────────────────────────────────────────────────┐
│ 跨数据中心部署架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 数据中心 A (主) ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Kafka │ │ Kafka │ │ Kafka │ ││
│ │ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 网络连接 ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Mirror │ │ Mirror │ │ Mirror │ ││
│ │ │ Maker 1 │ │ Maker 2 │ │ Maker 3 │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 数据中心 B (备) ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Kafka │ │ Kafka │ │ Kafka │ ││
│ │ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
MirrorMaker实现
// MirrorMaker
type MirrorMaker struct {
sourceCluster *ClusterConfig
targetCluster *ClusterConfig
topics []string
consumerGroup string
producerConfig *ProducerConfig
consumerConfig *ConsumerConfig
offsetManager *OffsetManager
errorHandler *ErrorHandler
}
// 集群配置
type ClusterConfig struct {
Name string
Brokers []string
Security *SecurityConfig
Properties map[string]string
}
// 启动MirrorMaker
func (mm *MirrorMaker) start() error {
// 1. 初始化消费者
consumer, err := mm.createConsumer()
if err != nil {
return err
}
// 2. 初始化生产者
producer, err := mm.createProducer()
if err != nil {
return err
}
// 3. 启动镜像任务
go mm.mirrorTask(consumer, producer)
return nil
}
// 镜像任务
func (mm *MirrorMaker) mirrorTask(consumer *Consumer, producer *Producer) {
for {
// 1. 从源集群拉取消息
records, err := consumer.poll()
if err != nil {
mm.errorHandler.handleError(err)
continue
}
// 2. 转换消息格式
transformedRecords := mm.transformRecords(records)
// 3. 发送到目标集群
if err := producer.sendBatch(transformedRecords); err != nil {
mm.errorHandler.handleError(err)
continue
}
// 4. 提交偏移量
if err := consumer.commitOffset(); err != nil {
mm.errorHandler.handleError(err)
}
}
}
// 转换消息格式
func (mm *MirrorMaker) transformRecords(records []*ConsumerRecord) []*ProducerRecord {
var transformedRecords []*ProducerRecord
for _, record := range records {
// 1. 转换Topic名称
targetTopic := mm.transformTopicName(record.Topic)
// 2. 转换消息内容
transformedRecord := &ProducerRecord{
Topic: targetTopic,
Partition: record.Partition,
Key: record.Key,
Value: record.Value,
Headers: record.Headers,
Timestamp: record.Timestamp,
}
transformedRecords = append(transformedRecords, transformedRecord)
}
return transformedRecords
}
// 转换Topic名称
func (mm *MirrorMaker) transformTopicName(sourceTopic string) string {
// 添加前缀或后缀
return fmt.Sprintf("%s.%s", mm.targetCluster.Name, sourceTopic)
}
双向同步
// 双向同步管理器
type BidirectionalSyncManager struct {
clusterA *ClusterConfig
clusterB *ClusterConfig
syncConfig *SyncConfig
conflictResolver *ConflictResolver
}
// 同步配置
type SyncConfig struct {
SyncInterval time.Duration
ConflictPolicy ConflictPolicy
MaxRetries int
RetryInterval time.Duration
}
// 冲突解决策略
type ConflictPolicy int
const (
LastWriteWins ConflictPolicy = iota
FirstWriteWins
ManualResolution
)
// 启动双向同步
func (bsm *BidirectionalSyncManager) start() error {
// 1. 启动A到B的同步
go bsm.syncAToB()
// 2. 启动B到A的同步
go bsm.syncBToA()
// 3. 启动冲突检测
go bsm.detectConflicts()
return nil
}
// A到B同步
func (bsm *BidirectionalSyncManager) syncAToB() {
ticker := time.NewTicker(bsm.syncConfig.SyncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
bsm.performSync(bsm.clusterA, bsm.clusterB)
}
}
}
// B到A同步
func (bsm *BidirectionalSyncManager) syncBToA() {
ticker := time.NewTicker(bsm.syncConfig.SyncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
bsm.performSync(bsm.clusterB, bsm.clusterA)
}
}
}
// 执行同步
func (bsm *BidirectionalSyncManager) performSync(source, target *ClusterConfig) {
// 1. 获取源集群的变更
changes, err := bsm.getChanges(source)
if err != nil {
log.Printf("获取变更失败: %v", err)
return
}
// 2. 应用变更到目标集群
for _, change := range changes {
if err := bsm.applyChange(target, change); err != nil {
log.Printf("应用变更失败: %v", err)
continue
}
}
}
// 检测冲突
func (bsm *BidirectionalSyncManager) detectConflicts() {
ticker := time.NewTicker(bsm.syncConfig.SyncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
conflicts := bsm.findConflicts()
for _, conflict := range conflicts {
bsm.resolveConflict(conflict)
}
}
}
}
// 解决冲突
func (bsm *BidirectionalSyncManager) resolveConflict(conflict *Conflict) {
switch bsm.syncConfig.ConflictPolicy {
case LastWriteWins:
bsm.resolveLastWriteWins(conflict)
case FirstWriteWins:
bsm.resolveFirstWriteWins(conflict)
case ManualResolution:
bsm.resolveManual(conflict)
}
}
🎯 面试高频考点
1. Kafka如何保证高可用性?
答案要点:
- 多副本机制: 每个分区有多个副本
- ISR管理: 维护同步副本集合
- Leader选举: 故障时自动选举新Leader
- 故障检测: 实时监控副本健康状态
- 自动恢复: 故障恢复后自动重新同步
2. 网络分区时Kafka如何处理?
答案要点:
- 多数派原则: 只有多数派才能提供服务
- 脑裂防护: 使用纪元机制防止脑裂
- 只读模式: 少数派进入只读模式
- 自动恢复: 网络恢复后自动恢复正常
- 数据一致性: 确保数据不丢失
3. 磁盘故障如何恢复?
答案要点:
- 磁盘监控: 实时监控磁盘使用情况
- 数据备份: 定期备份重要数据
- 故障转移: 故障时转移到其他副本
- 数据恢复: 从备份恢复数据
- 预防措施: 使用RAID和冗余存储
4. 跨数据中心部署方案?
答案要点:
- MirrorMaker: 单向数据复制
- 双向同步: 支持双向数据同步
- 冲突解决: 处理数据冲突
- 网络优化: 优化跨网络传输
- 监控告警: 监控同步状态
📝 本章小结
本章深入解析了Kafka的高可用与容灾机制,包括:
- 多副本容错: 副本分布、故障检测、自动恢复
- 网络分区处理: 分区检测、脑裂防护、自动恢复
- 磁盘故障恢复: 磁盘监控、数据备份、故障转移
- 跨数据中心部署: MirrorMaker、双向同步、冲突解决
- 灾备方案: 完整的灾备策略和实施方法
高可用与容灾是Kafka在生产环境中的关键保障,理解了这些机制,就能更好地设计稳定可靠的Kafka集群。
下一章预告: 09-面试高频问题详解 - 深入解析Kafka面试中的核心问题