HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Kafka

    • Kafka 学习手册 - 总览与导读
    • 01-核心概念与架构
    • 02-存储模块-日志与索引
    • 03-复制与ISR机制
    • 04-元数据管理与KRaft
    • 05-消费者组协调
    • 06-事务与Exactly-Once语义
    • 07-性能优化与调优
    • 08-高可用与容灾
    • 09-面试高频问题详解
    • 10-实战项目-Mini-Kafka实现

05-消费者组协调

📋 本章概览

本章深入探讨Kafka的消费者组协调机制,这是Kafka实现负载均衡和容错的关键组件。我们将从Consumer Group概念、GroupCoordinator工作原理、分区分配策略、Rebalance机制等方面,全面解析Kafka如何协调多个消费者协同工作。

🎯 学习目标

  • 理解Consumer Group的概念和作用
  • 掌握GroupCoordinator的工作原理
  • 了解Join/Sync/Heartbeat协议
  • 学习分区分配策略(Range/RoundRobin/Sticky)
  • 掌握Rebalance机制和优化策略

👥 Consumer Group概念

基本架构

┌─────────────────────────────────────────────────────────────────┐
│                    Consumer Group 架构                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                    Topic: user-events                      ││
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        ││
│  │  │Partition│  │Partition│  │Partition│  │Partition│        ││
│  │  │    0    │  │    1    │  │    2    │  │    3    │        ││
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
│                             │                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                Consumer Group: analytics-group              ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │  Consumer 1 │  │  Consumer 2 │  │  Consumer 3 │        ││
│  │  │             │  │             │  │             │        ││
│  │  │ Partition 0 │  │ Partition 1 │  │ Partition 2 │        ││
│  │  │ Partition 2 │  │ Partition 3 │  │             │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
│                             │                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                Group Coordinator                            ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │   Join      │  │    Sync     │  │  Heartbeat  │        ││
│  │  │  Protocol   │  │  Protocol   │  │  Protocol   │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

核心概念

概念定义作用
Consumer Group消费者组共同消费一个或多个Topic的消费者集合
Group Coordinator组协调器管理消费者组的Broker节点
Group Leader组领导者负责分区分配的消费者
Rebalance再均衡重新分配分区给消费者的过程
Offset偏移量消费者在分区中的消费位置

🎯 GroupCoordinator工作原理

GroupCoordinator结构

// GroupCoordinator结构
type GroupCoordinator struct {
    brokerID        int32
    groupMetadata   map[string]*GroupMetadata
    offsetManager   *OffsetManager
    rebalanceManager *RebalanceManager
    mu              sync.RWMutex
}

// 消费者组元数据
type GroupMetadata struct {
    groupID           string
    state             GroupState
    protocolType      string
    generation        int32
    leaderID          string
    members           map[string]*MemberMetadata
    subscription      map[string][]int32 // Topic -> Partitions
    assignment        map[string][]int32 // Member -> Partitions
    lastRebalanceTime int64
}

// 消费者状态
type GroupState int

const (
    Empty GroupState = iota
    PreparingRebalance
    CompletingRebalance
    Stable
    Dead
)

// 成员元数据
type MemberMetadata struct {
    memberID        string
    clientID        string
    clientHost      string
    sessionTimeout  int32
    rebalanceTimeout int32
    protocolType    string
    supportedProtocols []string
    subscription    map[string][]int32
    assignment      []int32
    lastHeartbeat   int64
}

GroupCoordinator核心功能

// 处理消费者加入请求
func (gc *GroupCoordinator) handleJoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
    gc.mu.Lock()
    defer gc.mu.Unlock()
    
    groupID := request.GroupID
    memberID := request.MemberID
    
    // 获取或创建组元数据
    groupMetadata := gc.getOrCreateGroup(groupID)
    
    // 验证会话超时
    if !gc.validateSessionTimeout(request.SessionTimeout) {
        return nil, fmt.Errorf("无效的会话超时时间")
    }
    
    // 处理成员加入
    if err := gc.addMember(groupMetadata, request); err != nil {
        return nil, err
    }
    
    // 检查是否需要触发Rebalance
    if gc.shouldTriggerRebalance(groupMetadata) {
        gc.triggerRebalance(groupMetadata)
    }
    
    // 返回响应
    return &JoinGroupResponse{
        GenerationID: groupMetadata.generation,
        GroupProtocol: groupMetadata.protocolType,
        LeaderID:     groupMetadata.leaderID,
        MemberID:     memberID,
        Members:      gc.getMemberList(groupMetadata),
    }, nil
}

// 添加成员
func (gc *GroupCoordinator) addMember(groupMetadata *GroupMetadata, request *JoinGroupRequest) error {
    memberID := request.MemberID
    if memberID == "" {
        memberID = gc.generateMemberID(request.ClientID, request.ClientHost)
    }
    
    // 创建成员元数据
    member := &MemberMetadata{
        memberID:        memberID,
        clientID:        request.ClientID,
        clientHost:      request.ClientHost,
        sessionTimeout:  request.SessionTimeout,
        rebalanceTimeout: request.RebalanceTimeout,
        protocolType:    request.ProtocolType,
        supportedProtocols: request.Protocols,
        lastHeartbeat:   time.Now().UnixMilli(),
    }
    
    // 添加到组
    groupMetadata.members[memberID] = member
    
    // 更新组状态
    if groupMetadata.state == Empty {
        groupMetadata.state = PreparingRebalance
    }
    
    return nil
}

// 检查是否需要触发Rebalance
func (gc *GroupCoordinator) shouldTriggerRebalance(groupMetadata *GroupMetadata) bool {
    // 1. 新成员加入
    if groupMetadata.state == Empty {
        return true
    }
    
    // 2. 成员离开
    if gc.hasMemberLeft(groupMetadata) {
        return true
    }
    
    // 3. 订阅变更
    if gc.hasSubscriptionChanged(groupMetadata) {
        return true
    }
    
    // 4. 分区数量变更
    if gc.hasPartitionCountChanged(groupMetadata) {
        return true
    }
    
    return false
}

🔄 Join/Sync/Heartbeat协议

Join协议

// Join协议处理
func (gc *GroupCoordinator) handleJoinProtocol(groupMetadata *GroupMetadata) error {
    // 1. 收集所有成员的支持协议
    supportedProtocols := gc.collectSupportedProtocols(groupMetadata)
    
    // 2. 选择协议
    selectedProtocol := gc.selectProtocol(supportedProtocols)
    groupMetadata.protocolType = selectedProtocol
    
    // 3. 选择Group Leader
    leaderID := gc.selectGroupLeader(groupMetadata)
    groupMetadata.leaderID = leaderID
    
    // 4. 更新Generation
    groupMetadata.generation++
    
    // 5. 设置状态
    groupMetadata.state = CompletingRebalance
    
    return nil
}

// 选择Group Leader
func (gc *GroupCoordinator) selectGroupLeader(groupMetadata *GroupMetadata) string {
    // 选择第一个成员作为Leader
    for memberID := range groupMetadata.members {
        return memberID
    }
    return ""
}

// 收集支持协议
func (gc *GroupCoordinator) collectSupportedProtocols(groupMetadata *GroupMetadata) map[string][]string {
    protocols := make(map[string][]string)
    
    for _, member := range groupMetadata.members {
        for _, protocol := range member.supportedProtocols {
            if protocols[protocol] == nil {
                protocols[protocol] = make([]string, 0)
            }
            protocols[protocol] = append(protocols[protocol], member.memberID)
        }
    }
    
    return protocols
}

Sync协议

// Sync协议处理
func (gc *GroupCoordinator) handleSyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
    gc.mu.Lock()
    defer gc.mu.Unlock()
    
    groupID := request.GroupID
    memberID := request.MemberID
    
    // 获取组元数据
    groupMetadata, exists := gc.groupMetadata[groupID]
    if !exists {
        return nil, fmt.Errorf("组不存在: %s", groupID)
    }
    
    // 验证成员
    member, exists := groupMetadata.members[memberID]
    if !exists {
        return nil, fmt.Errorf("成员不存在: %s", memberID)
    }
    
    // 验证Generation
    if request.GenerationID != groupMetadata.generation {
        return nil, fmt.Errorf("Generation不匹配")
    }
    
    // 处理Leader的分配
    if memberID == groupMetadata.leaderID {
        if err := gc.handleLeaderSync(groupMetadata, request); err != nil {
            return nil, err
        }
    }
    
    // 返回分配结果
    return &SyncGroupResponse{
        GenerationID: groupMetadata.generation,
        MemberAssignment: member.assignment,
    }, nil
}

// 处理Leader的Sync
func (gc *GroupCoordinator) handleLeaderSync(groupMetadata *GroupMetadata, request *SyncGroupRequest) error {
    // 1. 解析分配结果
    assignment, err := gc.parseAssignment(request.MemberAssignment)
    if err != nil {
        return err
    }
    
    // 2. 验证分配
    if err := gc.validateAssignment(groupMetadata, assignment); err != nil {
        return err
    }
    
    // 3. 应用分配
    gc.applyAssignment(groupMetadata, assignment)
    
    // 4. 更新组状态
    groupMetadata.state = Stable
    groupMetadata.lastRebalanceTime = time.Now().UnixMilli()
    
    return nil
}

// 应用分配
func (gc *GroupCoordinator) applyAssignment(groupMetadata *GroupMetadata, assignment map[string][]int32) {
    for memberID, partitions := range assignment {
        if member, exists := groupMetadata.members[memberID]; exists {
            member.assignment = partitions
        }
    }
    
    // 更新组分配
    groupMetadata.assignment = assignment
}

Heartbeat协议

// Heartbeat协议处理
func (gc *GroupCoordinator) handleHeartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
    gc.mu.Lock()
    defer gc.mu.Unlock()
    
    groupID := request.GroupID
    memberID := request.MemberID
    
    // 获取组元数据
    groupMetadata, exists := gc.groupMetadata[groupID]
    if !exists {
        return &HeartbeatResponse{Error: "组不存在"}, nil
    }
    
    // 获取成员
    member, exists := groupMetadata.members[memberID]
    if !exists {
        return &HeartbeatResponse{Error: "成员不存在"}, nil
    }
    
    // 验证Generation
    if request.GenerationID != groupMetadata.generation {
        return &HeartbeatResponse{Error: "Generation不匹配"}, nil
    }
    
    // 更新心跳时间
    member.lastHeartbeat = time.Now().UnixMilli()
    
    // 检查组状态
    if groupMetadata.state == Stable {
        return &HeartbeatResponse{Error: ""}, nil
    } else {
        return &HeartbeatResponse{Error: "需要重新加入组"}, nil
    }
}

// 心跳超时检查
func (gc *GroupCoordinator) checkHeartbeatTimeout() {
    gc.mu.Lock()
    defer gc.mu.Unlock()
    
    currentTime := time.Now().UnixMilli()
    
    for groupID, groupMetadata := range gc.groupMetadata {
        var expiredMembers []string
        
        for memberID, member := range groupMetadata.members {
            if currentTime-member.lastHeartbeat > int64(member.sessionTimeout)*1000 {
                expiredMembers = append(expiredMembers, memberID)
            }
        }
        
        // 移除过期成员
        if len(expiredMembers) > 0 {
            gc.removeExpiredMembers(groupMetadata, expiredMembers)
        }
    }
}

// 移除过期成员
func (gc *GroupCoordinator) removeExpiredMembers(groupMetadata *GroupMetadata, expiredMembers []string) {
    for _, memberID := range expiredMembers {
        delete(groupMetadata.members, memberID)
        log.Printf("移除过期成员: %s", memberID)
    }
    
    // 触发Rebalance
    if len(groupMetadata.members) > 0 {
        gc.triggerRebalance(groupMetadata)
    } else {
        groupMetadata.state = Empty
    }
}

📊 分区分配策略

Range分配策略

// Range分配策略
type RangeAssignor struct{}

// 分配分区
func (ra *RangeAssignor) assign(groupMetadata *GroupMetadata) map[string][]int32 {
    assignment := make(map[string][]int32)
    
    // 获取所有Topic和分区
    topicPartitions := ra.getTopicPartitions(groupMetadata)
    
    for topic, partitions := range topicPartitions {
        // 获取订阅该Topic的成员
        members := ra.getSubscribedMembers(groupMetadata, topic)
        if len(members) == 0 {
            continue
        }
        
        // 按Range分配
        partitionsPerMember := len(partitions) / len(members)
        extraPartitions := len(partitions) % len(members)
        
        startIndex := 0
        for i, memberID := range members {
            // 计算该成员分配的分区数量
            memberPartitionCount := partitionsPerMember
            if i < extraPartitions {
                memberPartitionCount++
            }
            
            // 分配分区
            endIndex := startIndex + memberPartitionCount
            memberPartitions := partitions[startIndex:endIndex]
            
            if assignment[memberID] == nil {
                assignment[memberID] = make([]int32, 0)
            }
            assignment[memberID] = append(assignment[memberID], memberPartitions...)
            
            startIndex = endIndex
        }
    }
    
    return assignment
}

// 获取Topic分区
func (ra *RangeAssignor) getTopicPartitions(groupMetadata *GroupMetadata) map[string][]int32 {
    topicPartitions := make(map[string][]int32)
    
    for topic := range groupMetadata.subscription {
        // 从元数据获取分区信息
        partitions := ra.getPartitionsForTopic(topic)
        topicPartitions[topic] = partitions
    }
    
    return topicPartitions
}

RoundRobin分配策略

// RoundRobin分配策略
type RoundRobinAssignor struct{}

// 分配分区
func (rra *RoundRobinAssignor) assign(groupMetadata *GroupMetadata) map[string][]int32 {
    assignment := make(map[string][]int32)
    
    // 获取所有成员
    members := ra.getAllMembers(groupMetadata)
    if len(members) == 0 {
        return assignment
    }
    
    // 获取所有分区
    allPartitions := ra.getAllPartitions(groupMetadata)
    
    // 按RoundRobin分配
    for i, partition := range allPartitions {
        memberIndex := i % len(members)
        memberID := members[memberIndex]
        
        if assignment[memberID] == nil {
            assignment[memberID] = make([]int32, 0)
        }
        assignment[memberID] = append(assignment[memberID], partition)
    }
    
    return assignment
}

// 获取所有分区
func (rra *RoundRobinAssignor) getAllPartitions(groupMetadata *GroupMetadata) []int32 {
    var allPartitions []int32
    
    for topic, partitions := range groupMetadata.subscription {
        for _, partition := range partitions {
            allPartitions = append(allPartitions, partition)
        }
    }
    
    // 排序确保一致性
    sort.Slice(allPartitions, func(i, j int) bool {
        return allPartitions[i] < allPartitions[j]
    })
    
    return allPartitions
}

Sticky分配策略

// Sticky分配策略
type StickyAssignor struct {
    previousAssignment map[string][]int32
}

// 分配分区
func (sa *StickyAssignor) assign(groupMetadata *GroupMetadata) map[string][]int32 {
    assignment := make(map[string][]int32)
    
    // 获取当前成员和分区
    currentMembers := ra.getAllMembers(groupMetadata)
    currentPartitions := ra.getAllPartitions(groupMetadata)
    
    // 初始化分配
    for _, memberID := range currentMembers {
        assignment[memberID] = make([]int32, 0)
    }
    
    // 1. 保持现有分配
    sa.preserveExistingAssignment(assignment, currentMembers, currentPartitions)
    
    // 2. 分配未分配的分区
    unassignedPartitions := sa.getUnassignedPartitions(assignment, currentPartitions)
    sa.assignUnassignedPartitions(assignment, unassignedPartitions)
    
    // 3. 平衡分配
    sa.balanceAssignment(assignment)
    
    // 保存当前分配
    sa.previousAssignment = assignment
    
    return assignment
}

// 保持现有分配
func (sa *StickyAssignor) preserveExistingAssignment(assignment map[string][]int32, 
    currentMembers []string, currentPartitions []int32) {
    
    for memberID, partitions := range sa.previousAssignment {
        // 检查成员是否仍然存在
        if !ra.contains(currentMembers, memberID) {
            continue
        }
        
        // 检查分区是否仍然存在
        validPartitions := make([]int32, 0)
        for _, partition := range partitions {
            if ra.contains(currentPartitions, partition) {
                validPartitions = append(validPartitions, partition)
            }
        }
        
        assignment[memberID] = validPartitions
    }
}

// 平衡分配
func (sa *StickyAssignor) balanceAssignment(assignment map[string][]int32) {
    // 计算平均分配数量
    totalPartitions := 0
    for _, partitions := range assignment {
        totalPartitions += len(partitions)
    }
    
    if len(assignment) == 0 {
        return
    }
    
    avgPartitions := totalPartitions / len(assignment)
    extraPartitions := totalPartitions % len(assignment)
    
    // 重新平衡
    for memberID, partitions := range assignment {
        targetCount := avgPartitions
        if len(partitions) > avgPartitions {
            targetCount = avgPartitions + extraPartitions
        }
        
        if len(partitions) > targetCount {
            // 移除多余分区
            excess := len(partitions) - targetCount
            assignment[memberID] = partitions[:len(partitions)-excess]
        }
    }
}

🔄 Rebalance机制

Rebalance流程

// Rebalance管理器
type RebalanceManager struct {
    coordinator *GroupCoordinator
    rebalanceTimeout int32
}

// 触发Rebalance
func (rm *RebalanceManager) triggerRebalance(groupMetadata *GroupMetadata) error {
    log.Printf("触发Rebalance,组: %s", groupMetadata.groupID)
    
    // 1. 设置状态
    groupMetadata.state = PreparingRebalance
    groupMetadata.generation++
    
    // 2. 通知所有成员
    if err := rm.notifyMembersRebalance(groupMetadata); err != nil {
        return err
    }
    
    // 3. 等待所有成员重新加入
    if err := rm.waitForAllMembersJoin(groupMetadata); err != nil {
        return err
    }
    
    // 4. 执行分区分配
    if err := rm.performPartitionAssignment(groupMetadata); err != nil {
        return err
    }
    
    // 5. 通知成员分配结果
    if err := rm.notifyMembersAssignment(groupMetadata); err != nil {
        return err
    }
    
    // 6. 设置稳定状态
    groupMetadata.state = Stable
    
    return nil
}

// 等待所有成员重新加入
func (rm *RebalanceManager) waitForAllMembersJoin(groupMetadata *GroupMetadata) error {
    timeout := time.Duration(rm.rebalanceTimeout) * time.Millisecond
    deadline := time.Now().Add(timeout)
    
    for time.Now().Before(deadline) {
        // 检查是否所有成员都已加入
        if rm.allMembersJoined(groupMetadata) {
            return nil
        }
        
        time.Sleep(100 * time.Millisecond)
    }
    
    return fmt.Errorf("等待成员加入超时")
}

// 执行分区分配
func (rm *RebalanceManager) performPartitionAssignment(groupMetadata *GroupMetadata) error {
    // 选择分配策略
    assignor := rm.selectAssignor(groupMetadata)
    
    // 执行分配
    assignment := assignor.assign(groupMetadata)
    
    // 验证分配
    if err := rm.validateAssignment(groupMetadata, assignment); err != nil {
        return err
    }
    
    // 应用分配
    rm.applyAssignment(groupMetadata, assignment)
    
    return nil
}

// 选择分配策略
func (rm *RebalanceManager) selectAssignor(groupMetadata *GroupMetadata) PartitionAssignor {
    // 根据协议类型选择分配策略
    switch groupMetadata.protocolType {
    case "range":
        return &RangeAssignor{}
    case "roundrobin":
        return &RoundRobinAssignor{}
    case "sticky":
        return &StickyAssignor{}
    default:
        return &RangeAssignor{} // 默认使用Range
    }
}

Rebalance优化

// Rebalance优化器
type RebalanceOptimizer struct {
    minRebalanceInterval int64
    lastRebalanceTime    map[string]int64
}

// 检查是否可以优化Rebalance
func (ro *RebalanceOptimizer) canOptimizeRebalance(groupMetadata *GroupMetadata) bool {
    groupID := groupMetadata.groupID
    lastTime := ro.lastRebalanceTime[groupID]
    currentTime := time.Now().UnixMilli()
    
    // 检查最小间隔
    if currentTime-lastTime < ro.minRebalanceInterval {
        return false
    }
    
    // 检查成员变化
    if ro.isMinorChange(groupMetadata) {
        return true
    }
    
    return false
}

// 判断是否为小变化
func (ro *RebalanceOptimizer) isMinorChange(groupMetadata *GroupMetadata) bool {
    // 1. 检查成员数量变化
    memberCountChange := ro.getMemberCountChange(groupMetadata)
    if memberCountChange > 1 {
        return false
    }
    
    // 2. 检查订阅变化
    if ro.hasSubscriptionChanged(groupMetadata) {
        return false
    }
    
    // 3. 检查分区数量变化
    if ro.hasPartitionCountChanged(groupMetadata) {
        return false
    }
    
    return true
}

// 增量Rebalance
func (ro *RebalanceOptimizer) incrementalRebalance(groupMetadata *GroupMetadata) error {
    // 1. 识别变化
    changes := ro.identifyChanges(groupMetadata)
    
    // 2. 计算最小影响范围
    affectedMembers := ro.getAffectedMembers(changes)
    
    // 3. 只重新分配受影响的分区
    if err := ro.reassignAffectedPartitions(groupMetadata, affectedMembers); err != nil {
        return err
    }
    
    return nil
}

📍 Offset管理

Offset存储

// Offset管理器
type OffsetManager struct {
    coordinator *GroupCoordinator
    offsetStore *OffsetStore
}

// 提交Offset
func (om *OffsetManager) commitOffset(request *CommitOffsetRequest) (*CommitOffsetResponse, error) {
    // 1. 验证组和成员
    if err := om.validateGroupAndMember(request.GroupID, request.MemberID); err != nil {
        return nil, err
    }
    
    // 2. 验证Generation
    if err := om.validateGeneration(request.GroupID, request.GenerationID); err != nil {
        return nil, err
    }
    
    // 3. 提交Offset
    for topic, partitions := range request.Offsets {
        for partition, offset := range partitions {
            if err := om.offsetStore.commitOffset(request.GroupID, topic, partition, offset); err != nil {
                return nil, err
            }
        }
    }
    
    return &CommitOffsetResponse{Error: ""}, nil
}

// 获取Offset
func (om *OffsetManager) fetchOffset(request *FetchOffsetRequest) (*FetchOffsetResponse, error) {
    offsets := make(map[string]map[int32]int64)
    
    for topic, partitions := range request.Partitions {
        offsets[topic] = make(map[int32]int64)
        
        for _, partition := range partitions {
            offset, err := om.offsetStore.getOffset(request.GroupID, topic, partition)
            if err != nil {
                return nil, err
            }
            offsets[topic][partition] = offset
        }
    }
    
    return &FetchOffsetResponse{Offsets: offsets}, nil
}

// Offset存储
type OffsetStore struct {
    storage map[string]map[string]map[int32]int64 // Group -> Topic -> Partition -> Offset
    mu      sync.RWMutex
}

// 提交Offset
func (os *OffsetStore) commitOffset(groupID, topic string, partition int32, offset int64) error {
    os.mu.Lock()
    defer os.mu.Unlock()
    
    if os.storage[groupID] == nil {
        os.storage[groupID] = make(map[string]map[int32]int64)
    }
    
    if os.storage[groupID][topic] == nil {
        os.storage[groupID][topic] = make(map[int32]int64)
    }
    
    os.storage[groupID][topic][partition] = offset
    
    // 持久化到磁盘
    return os.persistOffset(groupID, topic, partition, offset)
}

// 获取Offset
func (os *OffsetStore) getOffset(groupID, topic string, partition int32) (int64, error) {
    os.mu.RLock()
    defer os.mu.RUnlock()
    
    if os.storage[groupID] == nil {
        return -1, nil // 没有提交的Offset
    }
    
    if os.storage[groupID][topic] == nil {
        return -1, nil
    }
    
    offset, exists := os.storage[groupID][topic][partition]
    if !exists {
        return -1, nil
    }
    
    return offset, nil
}

🎯 面试高频考点

1. Consumer Group的作用?

答案要点:

  • 负载均衡: 分区在组内消费者间分配
  • 容错性: 消费者故障时分区重新分配
  • 并行处理: 不同分区可以并行消费
  • Offset管理: 组级别的Offset提交和恢复
  • 协调机制: 通过GroupCoordinator协调

2. Rebalance的触发条件?

答案要点:

  • 成员变化: 消费者加入或离开
  • 订阅变化: 消费者订阅的Topic变化
  • 分区变化: Topic分区数量变化
  • 会话超时: 消费者心跳超时
  • 手动触发: 管理员手动触发

3. 分区分配策略的差异?

答案要点:

  • Range: 按范围分配,可能不均匀
  • RoundRobin: 轮询分配,相对均匀
  • Sticky: 粘性分配,减少Rebalance影响
  • 选择依据: 根据业务需求选择策略
  • 性能影响: 不同策略对Rebalance性能影响不同

4. GroupCoordinator的职责?

答案要点:

  • 成员管理: 管理消费者组的成员
  • 协议协调: 处理Join/Sync/Heartbeat协议
  • 分区分配: 协调分区分配过程
  • 状态管理: 维护组的状态和Generation
  • 故障处理: 处理成员故障和恢复

📝 本章小结

本章深入解析了Kafka的消费者组协调机制,包括:

  1. Consumer Group: 负载均衡、容错性、并行处理
  2. GroupCoordinator: 成员管理、协议协调、状态维护
  3. 协调协议: Join/Sync/Heartbeat协议的工作流程
  4. 分区分配: Range/RoundRobin/Sticky策略的差异
  5. Rebalance: 触发条件、执行流程、优化策略
  6. Offset管理: 提交、获取、持久化机制

消费者组协调是Kafka实现高可用性和负载均衡的关键机制,理解了这些原理,就能更好地进行消费者应用的开发和调优。


下一章预告: 06-事务与Exactly-Once语义 - 深入理解Kafka的事务机制和一致性保证

Prev
04-元数据管理与KRaft
Next
06-事务与Exactly-Once语义