03-复制与ISR机制
📋 本章概览
本章深入探讨Kafka的复制机制和ISR(In-Sync Replicas)管理,这是Kafka实现高可用性和数据一致性的核心机制。我们将从Leader/Follower模型、ISR维护、高水位机制、副本选举等方面,全面解析Kafka如何保证数据的可靠性和一致性。
🎯 学习目标
- 理解Kafka的Leader/Follower复制模型
- 掌握ISR(In-Sync Replicas)的维护机制
- 了解高水位(HW)和日志末端偏移量(LEO)的概念
- 学习LeaderEpoch防回退机制
- 掌握副本选举流程和故障恢复
🏗️ 复制架构概览
基本复制模型
Topic: user-events, Partition: 0, Replication Factor: 3
┌─────────────────────────────────────────────────────────────────┐
│ Kafka 集群 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ LEADER │ │◄───┤ │FOLLOWER │ │ │ │FOLLOWER │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ Offset: │ │ │ │ Offset: │ │ │ │ Offset: │ │ │
│ │ │ 0,1,2,3 │ │ │ │ 0,1,2,3 │ │ │ │ 0,1,2 │ │ │
│ │ │ HW: 3 │ │ │ │ HW: 3 │ │ │ │ HW: 2 │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ISR: [Broker1, Broker2] (Broker3落后,不在ISR中) │
└─────────────────────────────────────────────────────────────────┘
核心概念
概念 | 定义 | 作用 |
---|---|---|
Leader | 分区的主副本 | 处理所有读写请求 |
Follower | 分区的从副本 | 从Leader同步数据 |
ISR | 同步副本集合 | 与Leader保持同步的副本列表 |
HW | 高水位 | 已提交消息的最高偏移量 |
LEO | 日志末端偏移量 | 下一条消息的偏移量 |
LeaderEpoch | Leader纪元 | 防止数据回退的版本号 |
🔄 Leader/Follower复制流程
消息写入流程
// Leader处理消息写入
type PartitionLeader struct {
topic string
partition int
replicas []ReplicaInfo
isr []int32 // ISR中的Broker ID列表
hw int64 // 高水位
leo int64 // 日志末端偏移量
leaderEpoch int32 // Leader纪元
}
type ReplicaInfo struct {
brokerID int32
isLeader bool
isInISR bool
lastCaughtUpTime int64
logEndOffset int64
}
// 处理生产请求
func (pl *PartitionLeader) HandleProduceRequest(records []Record) (*ProduceResponse, error) {
// 1. 验证ISR大小
if len(pl.isr) < pl.minInSyncReplicas {
return nil, fmt.Errorf("ISR大小不足: %d < %d", len(pl.isr), pl.minInSyncReplicas)
}
// 2. 追加到本地日志
baseOffset, err := pl.appendToLocalLog(records)
if err != nil {
return nil, err
}
// 3. 更新LEO
pl.leo = baseOffset + int64(len(records))
// 4. 并行复制到Followers
replicationResults := pl.replicateToFollowers(records, baseOffset)
// 5. 更新ISR和HW
pl.updateISRAndHW(replicationResults)
// 6. 返回响应
return &ProduceResponse{
BaseOffset: baseOffset,
LastOffset: pl.leo - 1,
HighWatermark: pl.hw,
}, nil
}
// 复制到Followers
func (pl *PartitionLeader) replicateToFollowers(records []Record, baseOffset int64) map[int32]ReplicationResult {
results := make(map[int32]ReplicationResult)
var wg sync.WaitGroup
for _, replica := range pl.replicas {
if replica.isLeader {
continue // 跳过Leader自己
}
wg.Add(1)
go func(r ReplicaInfo) {
defer wg.Done()
// 发送Fetch请求到Follower
result := pl.sendFetchRequest(r.brokerID, baseOffset, records)
results[r.brokerID] = result
}(replica)
}
wg.Wait()
return results
}
// 发送Fetch请求
func (pl *PartitionLeader) sendFetchRequest(brokerID int32, baseOffset int64, records []Record) ReplicationResult {
// 构建Fetch请求
fetchRequest := &FetchRequest{
Topic: pl.topic,
Partition: pl.partition,
Offset: baseOffset,
MaxBytes: 1024 * 1024, // 1MB
LeaderEpoch: pl.leaderEpoch,
}
// 发送到Follower
client := pl.getBrokerClient(brokerID)
response, err := client.Fetch(fetchRequest)
if err != nil {
return ReplicationResult{
BrokerID: brokerID,
Success: false,
Error: err,
}
}
return ReplicationResult{
BrokerID: brokerID,
Success: true,
LogEndOffset: response.LogEndOffset,
HighWatermark: response.HighWatermark,
}
}
Follower同步流程
// Follower处理Fetch请求
type PartitionFollower struct {
topic string
partition int
leaderID int32
leaderEpoch int32
hw int64
leo int64
log *PartitionLog
}
// 处理Fetch请求
func (pf *PartitionFollower) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error) {
// 1. 验证LeaderEpoch
if req.LeaderEpoch < pf.leaderEpoch {
return nil, fmt.Errorf("过期的LeaderEpoch: %d < %d", req.LeaderEpoch, pf.leaderEpoch)
}
// 2. 更新LeaderEpoch
if req.LeaderEpoch > pf.leaderEpoch {
pf.leaderEpoch = req.LeaderEpoch
}
// 3. 从指定偏移量读取消息
records, err := pf.log.Fetch(req.Offset, req.MaxBytes)
if err != nil {
return nil, err
}
// 4. 更新LEO
if len(records) > 0 {
pf.leo = records[len(records)-1].Offset + 1
}
// 5. 返回响应
return &FetchResponse{
Topic: pf.topic,
Partition: pf.partition,
Records: records,
LogEndOffset: pf.leo,
HighWatermark: pf.hw,
}, nil
}
// Follower主动拉取
func (pf *PartitionFollower) startReplication() {
ticker := time.NewTicker(100 * time.Millisecond) // 100ms拉取一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := pf.fetchFromLeader(); err != nil {
log.Printf("从Leader拉取失败: %v", err)
}
}
}
}
func (pf *PartitionFollower) fetchFromLeader() error {
// 构建Fetch请求
fetchRequest := &FetchRequest{
Topic: pf.topic,
Partition: pf.partition,
Offset: pf.leo, // 从当前LEO开始拉取
MaxBytes: 1024 * 1024,
LeaderEpoch: pf.leaderEpoch,
}
// 发送到Leader
leaderClient := pf.getLeaderClient()
response, err := leaderClient.Fetch(fetchRequest)
if err != nil {
return err
}
// 处理响应
if len(response.Records) > 0 {
// 追加到本地日志
if err := pf.log.Append(response.Records); err != nil {
return err
}
// 更新LEO
pf.leo = response.LogEndOffset
// 更新HW
pf.hw = response.HighWatermark
}
return nil
}
📊 ISR维护机制
ISR更新逻辑
// ISR管理器
type ISRManager struct {
topic string
partition int
replicas map[int32]*ReplicaInfo
isr []int32
minInSyncReplicas int
replicaMaxLagMs int64
}
// 更新ISR
func (im *ISRManager) updateISR() {
newISR := make([]int32, 0)
currentTime := time.Now().UnixMilli()
for brokerID, replica := range im.replicas {
// 检查副本是否应该加入ISR
if im.shouldBeInISR(replica, currentTime) {
newISR = append(newISR, brokerID)
}
}
// 排序ISR列表
sort.Slice(newISR, func(i, j int) bool {
return newISR[i] < newISR[j]
})
// 更新ISR
if !im.isISREqual(im.isr, newISR) {
oldISR := im.isr
im.isr = newISR
log.Printf("ISR更新: %v -> %v", oldISR, newISR)
// 触发ISR变更事件
im.onISRChange(oldISR, newISR)
}
}
// 判断副本是否应该在ISR中
func (im *ISRManager) shouldBeInISR(replica *ReplicaInfo, currentTime int64) bool {
// 1. 副本必须是活跃的
if !replica.isAlive {
return false
}
// 2. 检查副本是否落后太多
if replica.lastCaughtUpTime > 0 {
lag := currentTime - replica.lastCaughtUpTime
if lag > im.replicaMaxLagMs {
return false
}
}
// 3. 检查副本的LEO是否足够接近Leader
leader := im.getLeader()
if leader != nil {
leoLag := leader.logEndOffset - replica.logEndOffset
if leoLag > im.maxLogLag {
return false
}
}
return true
}
// 处理副本状态更新
func (im *ISRManager) updateReplicaStatus(brokerID int32, logEndOffset int64, highWatermark int64) {
replica, exists := im.replicas[brokerID]
if !exists {
return
}
// 更新副本状态
replica.logEndOffset = logEndOffset
replica.highWatermark = highWatermark
replica.lastCaughtUpTime = time.Now().UnixMilli()
// 检查是否需要更新ISR
im.updateISR()
}
ISR收缩和扩展
// ISR收缩处理
func (im *ISRManager) handleISRShrink(oldISR, newISR []int32) {
// 找出被移除的副本
removedReplicas := im.findRemovedReplicas(oldISR, newISR)
for _, brokerID := range removedReplicas {
replica := im.replicas[brokerID]
log.Printf("副本 %d 从ISR中移除,原因: 落后太多", brokerID)
// 可以在这里添加监控和告警
im.onReplicaRemovedFromISR(brokerID, replica)
}
}
// ISR扩展处理
func (im *ISRManager) handleISRExpand(oldISR, newISR []int32) {
// 找出新加入的副本
addedReplicas := im.findAddedReplicas(oldISR, newISR)
for _, brokerID := range addedReplicas {
replica := im.replicas[brokerID]
log.Printf("副本 %d 加入ISR", brokerID)
// 可以在这里添加监控和告警
im.onReplicaAddedToISR(brokerID, replica)
}
}
// 检查ISR健康状态
func (im *ISRManager) checkISRHealth() error {
// 检查ISR大小
if len(im.isr) < im.minInSyncReplicas {
return fmt.Errorf("ISR大小不足: %d < %d", len(im.isr), im.minInSyncReplicas)
}
// 检查Leader是否在ISR中
leader := im.getLeader()
if leader != nil && !im.isInISR(leader.brokerID) {
return fmt.Errorf("Leader不在ISR中")
}
return nil
}
🌊 高水位(HW)机制
HW更新逻辑
// 高水位管理器
type HighWatermarkManager struct {
topic string
partition int
hw int64
leo int64
isr []int32
replicas map[int32]*ReplicaInfo
}
// 更新高水位
func (hwm *HighWatermarkManager) updateHighWatermark() {
if len(hwm.isr) == 0 {
return
}
// 计算所有ISR副本的最小LEO
minLEO := int64(math.MaxInt64)
for _, brokerID := range hwm.isr {
replica := hwm.replicas[brokerID]
if replica.logEndOffset < minLEO {
minLEO = replica.logEndOffset
}
}
// 更新HW
if minLEO != math.MaxInt64 && minLEO > hwm.hw {
oldHW := hwm.hw
hwm.hw = minLEO
log.Printf("HW更新: %d -> %d", oldHW, hwm.hw)
// 通知所有副本更新HW
hwm.notifyReplicasHWUpdate()
}
}
// 通知副本HW更新
func (hwm *HighWatermarkManager) notifyReplicasHWUpdate() {
for _, brokerID := range hwm.isr {
replica := hwm.replicas[brokerID]
// 发送HW更新请求
go func(r *ReplicaInfo) {
client := hwm.getBrokerClient(r.brokerID)
updateRequest := &UpdateHighWatermarkRequest{
Topic: hwm.topic,
Partition: hwm.partition,
HighWatermark: hwm.hw,
LeaderEpoch: hwm.leaderEpoch,
}
if err := client.UpdateHighWatermark(updateRequest); err != nil {
log.Printf("更新副本 %d 的HW失败: %v", r.brokerID, err)
}
}(replica)
}
}
// 检查消息是否已提交
func (hwm *HighWatermarkManager) isCommitted(offset int64) bool {
return offset < hwm.hw
}
// 获取可读偏移量范围
func (hwm *HighWatermarkManager) getReadableOffsetRange() (int64, int64) {
return 0, hwm.hw - 1
}
消费者可见性控制
// 消费者可见性管理器
type ConsumerVisibilityManager struct {
hwm *HighWatermarkManager
}
// 过滤已提交的消息
func (cvm *ConsumerVisibilityManager) filterCommittedMessages(records []Record) []Record {
var committedRecords []Record
for _, record := range records {
if cvm.hwm.isCommitted(record.Offset) {
committedRecords = append(committedRecords, record)
}
}
return committedRecords
}
// 处理消费者读取请求
func (cvm *ConsumerVisibilityManager) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error) {
// 1. 从日志中读取消息
allRecords, err := cvm.readFromLog(req.Offset, req.MaxBytes)
if err != nil {
return nil, err
}
// 2. 过滤已提交的消息
committedRecords := cvm.filterCommittedMessages(allRecords)
// 3. 返回响应
return &FetchResponse{
Topic: req.Topic,
Partition: req.Partition,
Records: committedRecords,
HighWatermark: cvm.hwm.hw,
}, nil
}
🛡️ LeaderEpoch防回退机制
LeaderEpoch管理
// LeaderEpoch管理器
type LeaderEpochManager struct {
topic string
partition int
currentEpoch int32
epochHistory []EpochEntry
}
type EpochEntry struct {
Epoch int32
StartOffset int64
LeaderID int32
}
// 开始新的LeaderEpoch
func (lem *LeaderEpochManager) startNewEpoch(leaderID int32, startOffset int64) int32 {
newEpoch := lem.currentEpoch + 1
// 添加新的纪元条目
entry := EpochEntry{
Epoch: newEpoch,
StartOffset: startOffset,
LeaderID: leaderID,
}
lem.epochHistory = append(lem.epochHistory, entry)
// 更新当前纪元
lem.currentEpoch = newEpoch
log.Printf("开始新的LeaderEpoch: %d, Leader: %d, StartOffset: %d",
newEpoch, leaderID, startOffset)
return newEpoch
}
// 验证LeaderEpoch
func (lem *LeaderEpochManager) validateLeaderEpoch(epoch int32) error {
if epoch < lem.currentEpoch {
return fmt.Errorf("过期的LeaderEpoch: %d < %d", epoch, lem.currentEpoch)
}
if epoch > lem.currentEpoch {
return fmt.Errorf("未来的LeaderEpoch: %d > %d", epoch, lem.currentEpoch)
}
return nil
}
// 根据偏移量查找LeaderEpoch
func (lem *LeaderEpochManager) findLeaderEpoch(offset int64) int32 {
// 从最新的纪元开始查找
for i := len(lem.epochHistory) - 1; i >= 0; i-- {
entry := lem.epochHistory[i]
if offset >= entry.StartOffset {
return entry.Epoch
}
}
return 0 // 默认返回0
}
防回退检查
// 防回退检查器
type FencingChecker struct {
epochManager *LeaderEpochManager
log *PartitionLog
}
// 检查写入请求是否会导致回退
func (fc *FencingChecker) checkWriteRequest(req *ProduceRequest) error {
// 1. 验证LeaderEpoch
if err := fc.epochManager.validateLeaderEpoch(req.LeaderEpoch); err != nil {
return err
}
// 2. 检查偏移量是否会导致回退
if req.BaseOffset < fc.log.getLastOffset() {
return fmt.Errorf("写入偏移量 %d 小于当前最后偏移量 %d",
req.BaseOffset, fc.log.getLastOffset())
}
// 3. 检查是否有重复的偏移量
if req.BaseOffset == fc.log.getLastOffset() {
// 检查消息内容是否相同
if fc.isDuplicateMessage(req.BaseOffset, req.Records) {
return fmt.Errorf("重复的消息偏移量: %d", req.BaseOffset)
}
}
return nil
}
// 检查是否为重复消息
func (fc *FencingChecker) isDuplicateMessage(offset int64, records []Record) bool {
// 读取现有消息
existingRecords, err := fc.log.Fetch(offset, 1024*1024)
if err != nil {
return false
}
// 比较消息内容
if len(existingRecords) != len(records) {
return false
}
for i, existing := range existingRecords {
if !fc.compareRecords(existing, records[i]) {
return false
}
}
return true
}
🗳️ 副本选举机制
选举流程
// 副本选举器
type ReplicaElectionManager struct {
topic string
partition int
replicas map[int32]*ReplicaInfo
isr []int32
controller *Controller
}
// 触发Leader选举
func (rem *ReplicaElectionManager) triggerLeaderElection() error {
log.Printf("开始Leader选举,Topic: %s, Partition: %d", rem.topic, rem.partition)
// 1. 检查ISR是否为空
if len(rem.isr) == 0 {
return fmt.Errorf("ISR为空,无法进行Leader选举")
}
// 2. 选择新的Leader
newLeaderID := rem.selectNewLeader()
if newLeaderID == -1 {
return fmt.Errorf("无法选择新的Leader")
}
// 3. 执行Leader切换
if err := rem.performLeaderSwitch(newLeaderID); err != nil {
return err
}
log.Printf("Leader选举完成,新Leader: %d", newLeaderID)
return nil
}
// 选择新的Leader
func (rem *ReplicaElectionManager) selectNewLeader() int32 {
// 优先选择ISR中的副本
for _, brokerID := range rem.isr {
replica := rem.replicas[brokerID]
if replica.isAlive && rem.isPreferredLeader(brokerID) {
return brokerID
}
}
// 如果没有Preferred Leader,选择第一个可用的ISR副本
for _, brokerID := range rem.isr {
replica := rem.replicas[brokerID]
if replica.isAlive {
return brokerID
}
}
return -1
}
// 执行Leader切换
func (rem *ReplicaElectionManager) performLeaderSwitch(newLeaderID int32) error {
// 1. 更新LeaderEpoch
newEpoch := rem.epochManager.startNewEpoch(newLeaderID, rem.getCurrentLEO())
// 2. 通知所有副本
if err := rem.notifyReplicasLeaderChange(newLeaderID, newEpoch); err != nil {
return err
}
// 3. 更新元数据
if err := rem.updateMetadata(newLeaderID, newEpoch); err != nil {
return err
}
return nil
}
// 通知副本Leader变更
func (rem *ReplicaElectionManager) notifyReplicasLeaderChange(newLeaderID int32, newEpoch int32) error {
for brokerID, replica := range rem.replicas {
if brokerID == newLeaderID {
continue // 跳过新Leader
}
go func(bID int32, r *ReplicaInfo) {
client := rem.getBrokerClient(bID)
request := &LeaderChangeRequest{
Topic: rem.topic,
Partition: rem.partition,
NewLeaderID: newLeaderID,
NewEpoch: newEpoch,
}
if err := client.NotifyLeaderChange(request); err != nil {
log.Printf("通知副本 %d Leader变更失败: %v", bID, err)
}
}(brokerID, replica)
}
return nil
}
故障检测和恢复
// 故障检测器
type FailureDetector struct {
topic string
partition int
replicas map[int32]*ReplicaInfo
isr []int32
lastHeartbeat map[int32]int64
heartbeatTimeout int64
}
// 检测副本故障
func (fd *FailureDetector) detectFailures() []int32 {
var failedReplicas []int32
currentTime := time.Now().UnixMilli()
for brokerID, replica := range fd.replicas {
lastHeartbeat := fd.lastHeartbeat[brokerID]
// 检查心跳超时
if currentTime-lastHeartbeat > fd.heartbeatTimeout {
if replica.isAlive {
log.Printf("检测到副本 %d 故障", brokerID)
replica.isAlive = false
failedReplicas = append(failedReplicas, brokerID)
}
}
}
return failedReplicas
}
// 处理副本故障
func (fd *FailureDetector) handleReplicaFailure(failedReplicas []int32) {
for _, brokerID := range failedReplicas {
replica := fd.replicas[brokerID]
// 从ISR中移除
if replica.isInISR {
fd.removeFromISR(brokerID)
}
// 触发Leader选举(如果是Leader故障)
if replica.isLeader {
fd.triggerLeaderElection()
}
}
}
// 副本恢复处理
func (fd *FailureDetector) handleReplicaRecovery(brokerID int32) {
replica := fd.replicas[brokerID]
// 标记为活跃
replica.isAlive = true
fd.lastHeartbeat[brokerID] = time.Now().UnixMilli()
log.Printf("副本 %d 恢复", brokerID)
// 检查是否可以重新加入ISR
if fd.canRejoinISR(brokerID) {
fd.addToISR(brokerID)
}
}
🎯 面试高频考点
1. ISR的作用和机制?
答案要点:
- 定义: ISR是与Leader保持同步的副本集合
- 维护: 通过心跳和拉取请求监控副本状态
- 更新: 副本落后时从ISR移除,追上时重新加入
- 作用: 保证数据一致性和可用性
- 配置: min.insync.replicas控制最小ISR大小
2. 高水位(HW)的作用?
答案要点:
- 定义: 已提交消息的最高偏移量
- 计算: 所有ISR副本的最小LEO
- 作用: 控制消费者可见性,防止读取未提交消息
- 更新: 每次ISR更新后重新计算
- 保证: 只有HW以下的消息对消费者可见
3. LeaderEpoch如何防止数据回退?
答案要点:
- 机制: 每个Leader有唯一的纪元号
- 验证: 写入前验证LeaderEpoch的有效性
- 防护: 防止旧Leader恢复后覆盖新数据
- 历史: 维护纪元历史,支持故障恢复
- 检查: 结合偏移量检查,确保数据完整性
4. 副本选举的流程?
答案要点:
- 触发: Leader故障或ISR变化时触发
- 选择: 优先选择ISR中的Preferred Leader
- 切换: 更新LeaderEpoch,通知所有副本
- 恢复: 新Leader开始处理读写请求
- 同步: 其他副本开始从新Leader同步
📝 本章小结
本章深入解析了Kafka的复制和ISR机制,包括:
- 复制模型: Leader/Follower架构,异步复制机制
- ISR管理: 同步副本集合的维护和更新
- 高水位: 数据一致性保证和消费者可见性控制
- LeaderEpoch: 防回退机制和故障恢复
- 副本选举: 故障检测和Leader切换流程
这些机制共同构成了Kafka高可用性和数据一致性的基础,理解了这些原理,就能更好地进行集群配置和故障处理。
下一章预告: 04-元数据管理与KRaft - 深入理解Kafka的元数据管理演进