05-消费者组协调
📋 本章概览
本章深入探讨Kafka的消费者组协调机制,这是Kafka实现负载均衡和容错的关键组件。我们将从Consumer Group概念、GroupCoordinator工作原理、分区分配策略、Rebalance机制等方面,全面解析Kafka如何协调多个消费者协同工作。
🎯 学习目标
- 理解Consumer Group的概念和作用
- 掌握GroupCoordinator的工作原理
- 了解Join/Sync/Heartbeat协议
- 学习分区分配策略(Range/RoundRobin/Sticky)
- 掌握Rebalance机制和优化策略
👥 Consumer Group概念
基本架构
┌─────────────────────────────────────────────────────────────────┐
│ Consumer Group 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Topic: user-events ││
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││
│ │ │Partition│ │Partition│ │Partition│ │Partition│ ││
│ │ │ 0 │ │ 1 │ │ 2 │ │ 3 │ ││
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Consumer Group: analytics-group ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │ ││
│ │ │ │ │ │ │ │ ││
│ │ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ ││
│ │ │ Partition 2 │ │ Partition 3 │ │ │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Group Coordinator ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Join │ │ Sync │ │ Heartbeat │ ││
│ │ │ Protocol │ │ Protocol │ │ Protocol │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
核心概念
概念 | 定义 | 作用 |
---|---|---|
Consumer Group | 消费者组 | 共同消费一个或多个Topic的消费者集合 |
Group Coordinator | 组协调器 | 管理消费者组的Broker节点 |
Group Leader | 组领导者 | 负责分区分配的消费者 |
Rebalance | 再均衡 | 重新分配分区给消费者的过程 |
Offset | 偏移量 | 消费者在分区中的消费位置 |
🎯 GroupCoordinator工作原理
GroupCoordinator结构
// GroupCoordinator结构
type GroupCoordinator struct {
brokerID int32
groupMetadata map[string]*GroupMetadata
offsetManager *OffsetManager
rebalanceManager *RebalanceManager
mu sync.RWMutex
}
// 消费者组元数据
type GroupMetadata struct {
groupID string
state GroupState
protocolType string
generation int32
leaderID string
members map[string]*MemberMetadata
subscription map[string][]int32 // Topic -> Partitions
assignment map[string][]int32 // Member -> Partitions
lastRebalanceTime int64
}
// 消费者状态
type GroupState int
const (
Empty GroupState = iota
PreparingRebalance
CompletingRebalance
Stable
Dead
)
// 成员元数据
type MemberMetadata struct {
memberID string
clientID string
clientHost string
sessionTimeout int32
rebalanceTimeout int32
protocolType string
supportedProtocols []string
subscription map[string][]int32
assignment []int32
lastHeartbeat int64
}
GroupCoordinator核心功能
// 处理消费者加入请求
func (gc *GroupCoordinator) handleJoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
gc.mu.Lock()
defer gc.mu.Unlock()
groupID := request.GroupID
memberID := request.MemberID
// 获取或创建组元数据
groupMetadata := gc.getOrCreateGroup(groupID)
// 验证会话超时
if !gc.validateSessionTimeout(request.SessionTimeout) {
return nil, fmt.Errorf("无效的会话超时时间")
}
// 处理成员加入
if err := gc.addMember(groupMetadata, request); err != nil {
return nil, err
}
// 检查是否需要触发Rebalance
if gc.shouldTriggerRebalance(groupMetadata) {
gc.triggerRebalance(groupMetadata)
}
// 返回响应
return &JoinGroupResponse{
GenerationID: groupMetadata.generation,
GroupProtocol: groupMetadata.protocolType,
LeaderID: groupMetadata.leaderID,
MemberID: memberID,
Members: gc.getMemberList(groupMetadata),
}, nil
}
// 添加成员
func (gc *GroupCoordinator) addMember(groupMetadata *GroupMetadata, request *JoinGroupRequest) error {
memberID := request.MemberID
if memberID == "" {
memberID = gc.generateMemberID(request.ClientID, request.ClientHost)
}
// 创建成员元数据
member := &MemberMetadata{
memberID: memberID,
clientID: request.ClientID,
clientHost: request.ClientHost,
sessionTimeout: request.SessionTimeout,
rebalanceTimeout: request.RebalanceTimeout,
protocolType: request.ProtocolType,
supportedProtocols: request.Protocols,
lastHeartbeat: time.Now().UnixMilli(),
}
// 添加到组
groupMetadata.members[memberID] = member
// 更新组状态
if groupMetadata.state == Empty {
groupMetadata.state = PreparingRebalance
}
return nil
}
// 检查是否需要触发Rebalance
func (gc *GroupCoordinator) shouldTriggerRebalance(groupMetadata *GroupMetadata) bool {
// 1. 新成员加入
if groupMetadata.state == Empty {
return true
}
// 2. 成员离开
if gc.hasMemberLeft(groupMetadata) {
return true
}
// 3. 订阅变更
if gc.hasSubscriptionChanged(groupMetadata) {
return true
}
// 4. 分区数量变更
if gc.hasPartitionCountChanged(groupMetadata) {
return true
}
return false
}
🔄 Join/Sync/Heartbeat协议
Join协议
// Join协议处理
func (gc *GroupCoordinator) handleJoinProtocol(groupMetadata *GroupMetadata) error {
// 1. 收集所有成员的支持协议
supportedProtocols := gc.collectSupportedProtocols(groupMetadata)
// 2. 选择协议
selectedProtocol := gc.selectProtocol(supportedProtocols)
groupMetadata.protocolType = selectedProtocol
// 3. 选择Group Leader
leaderID := gc.selectGroupLeader(groupMetadata)
groupMetadata.leaderID = leaderID
// 4. 更新Generation
groupMetadata.generation++
// 5. 设置状态
groupMetadata.state = CompletingRebalance
return nil
}
// 选择Group Leader
func (gc *GroupCoordinator) selectGroupLeader(groupMetadata *GroupMetadata) string {
// 选择第一个成员作为Leader
for memberID := range groupMetadata.members {
return memberID
}
return ""
}
// 收集支持协议
func (gc *GroupCoordinator) collectSupportedProtocols(groupMetadata *GroupMetadata) map[string][]string {
protocols := make(map[string][]string)
for _, member := range groupMetadata.members {
for _, protocol := range member.supportedProtocols {
if protocols[protocol] == nil {
protocols[protocol] = make([]string, 0)
}
protocols[protocol] = append(protocols[protocol], member.memberID)
}
}
return protocols
}
Sync协议
// Sync协议处理
func (gc *GroupCoordinator) handleSyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
gc.mu.Lock()
defer gc.mu.Unlock()
groupID := request.GroupID
memberID := request.MemberID
// 获取组元数据
groupMetadata, exists := gc.groupMetadata[groupID]
if !exists {
return nil, fmt.Errorf("组不存在: %s", groupID)
}
// 验证成员
member, exists := groupMetadata.members[memberID]
if !exists {
return nil, fmt.Errorf("成员不存在: %s", memberID)
}
// 验证Generation
if request.GenerationID != groupMetadata.generation {
return nil, fmt.Errorf("Generation不匹配")
}
// 处理Leader的分配
if memberID == groupMetadata.leaderID {
if err := gc.handleLeaderSync(groupMetadata, request); err != nil {
return nil, err
}
}
// 返回分配结果
return &SyncGroupResponse{
GenerationID: groupMetadata.generation,
MemberAssignment: member.assignment,
}, nil
}
// 处理Leader的Sync
func (gc *GroupCoordinator) handleLeaderSync(groupMetadata *GroupMetadata, request *SyncGroupRequest) error {
// 1. 解析分配结果
assignment, err := gc.parseAssignment(request.MemberAssignment)
if err != nil {
return err
}
// 2. 验证分配
if err := gc.validateAssignment(groupMetadata, assignment); err != nil {
return err
}
// 3. 应用分配
gc.applyAssignment(groupMetadata, assignment)
// 4. 更新组状态
groupMetadata.state = Stable
groupMetadata.lastRebalanceTime = time.Now().UnixMilli()
return nil
}
// 应用分配
func (gc *GroupCoordinator) applyAssignment(groupMetadata *GroupMetadata, assignment map[string][]int32) {
for memberID, partitions := range assignment {
if member, exists := groupMetadata.members[memberID]; exists {
member.assignment = partitions
}
}
// 更新组分配
groupMetadata.assignment = assignment
}
Heartbeat协议
// Heartbeat协议处理
func (gc *GroupCoordinator) handleHeartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
gc.mu.Lock()
defer gc.mu.Unlock()
groupID := request.GroupID
memberID := request.MemberID
// 获取组元数据
groupMetadata, exists := gc.groupMetadata[groupID]
if !exists {
return &HeartbeatResponse{Error: "组不存在"}, nil
}
// 获取成员
member, exists := groupMetadata.members[memberID]
if !exists {
return &HeartbeatResponse{Error: "成员不存在"}, nil
}
// 验证Generation
if request.GenerationID != groupMetadata.generation {
return &HeartbeatResponse{Error: "Generation不匹配"}, nil
}
// 更新心跳时间
member.lastHeartbeat = time.Now().UnixMilli()
// 检查组状态
if groupMetadata.state == Stable {
return &HeartbeatResponse{Error: ""}, nil
} else {
return &HeartbeatResponse{Error: "需要重新加入组"}, nil
}
}
// 心跳超时检查
func (gc *GroupCoordinator) checkHeartbeatTimeout() {
gc.mu.Lock()
defer gc.mu.Unlock()
currentTime := time.Now().UnixMilli()
for groupID, groupMetadata := range gc.groupMetadata {
var expiredMembers []string
for memberID, member := range groupMetadata.members {
if currentTime-member.lastHeartbeat > int64(member.sessionTimeout)*1000 {
expiredMembers = append(expiredMembers, memberID)
}
}
// 移除过期成员
if len(expiredMembers) > 0 {
gc.removeExpiredMembers(groupMetadata, expiredMembers)
}
}
}
// 移除过期成员
func (gc *GroupCoordinator) removeExpiredMembers(groupMetadata *GroupMetadata, expiredMembers []string) {
for _, memberID := range expiredMembers {
delete(groupMetadata.members, memberID)
log.Printf("移除过期成员: %s", memberID)
}
// 触发Rebalance
if len(groupMetadata.members) > 0 {
gc.triggerRebalance(groupMetadata)
} else {
groupMetadata.state = Empty
}
}
📊 分区分配策略
Range分配策略
// Range分配策略
type RangeAssignor struct{}
// 分配分区
func (ra *RangeAssignor) assign(groupMetadata *GroupMetadata) map[string][]int32 {
assignment := make(map[string][]int32)
// 获取所有Topic和分区
topicPartitions := ra.getTopicPartitions(groupMetadata)
for topic, partitions := range topicPartitions {
// 获取订阅该Topic的成员
members := ra.getSubscribedMembers(groupMetadata, topic)
if len(members) == 0 {
continue
}
// 按Range分配
partitionsPerMember := len(partitions) / len(members)
extraPartitions := len(partitions) % len(members)
startIndex := 0
for i, memberID := range members {
// 计算该成员分配的分区数量
memberPartitionCount := partitionsPerMember
if i < extraPartitions {
memberPartitionCount++
}
// 分配分区
endIndex := startIndex + memberPartitionCount
memberPartitions := partitions[startIndex:endIndex]
if assignment[memberID] == nil {
assignment[memberID] = make([]int32, 0)
}
assignment[memberID] = append(assignment[memberID], memberPartitions...)
startIndex = endIndex
}
}
return assignment
}
// 获取Topic分区
func (ra *RangeAssignor) getTopicPartitions(groupMetadata *GroupMetadata) map[string][]int32 {
topicPartitions := make(map[string][]int32)
for topic := range groupMetadata.subscription {
// 从元数据获取分区信息
partitions := ra.getPartitionsForTopic(topic)
topicPartitions[topic] = partitions
}
return topicPartitions
}
RoundRobin分配策略
// RoundRobin分配策略
type RoundRobinAssignor struct{}
// 分配分区
func (rra *RoundRobinAssignor) assign(groupMetadata *GroupMetadata) map[string][]int32 {
assignment := make(map[string][]int32)
// 获取所有成员
members := ra.getAllMembers(groupMetadata)
if len(members) == 0 {
return assignment
}
// 获取所有分区
allPartitions := ra.getAllPartitions(groupMetadata)
// 按RoundRobin分配
for i, partition := range allPartitions {
memberIndex := i % len(members)
memberID := members[memberIndex]
if assignment[memberID] == nil {
assignment[memberID] = make([]int32, 0)
}
assignment[memberID] = append(assignment[memberID], partition)
}
return assignment
}
// 获取所有分区
func (rra *RoundRobinAssignor) getAllPartitions(groupMetadata *GroupMetadata) []int32 {
var allPartitions []int32
for topic, partitions := range groupMetadata.subscription {
for _, partition := range partitions {
allPartitions = append(allPartitions, partition)
}
}
// 排序确保一致性
sort.Slice(allPartitions, func(i, j int) bool {
return allPartitions[i] < allPartitions[j]
})
return allPartitions
}
Sticky分配策略
// Sticky分配策略
type StickyAssignor struct {
previousAssignment map[string][]int32
}
// 分配分区
func (sa *StickyAssignor) assign(groupMetadata *GroupMetadata) map[string][]int32 {
assignment := make(map[string][]int32)
// 获取当前成员和分区
currentMembers := ra.getAllMembers(groupMetadata)
currentPartitions := ra.getAllPartitions(groupMetadata)
// 初始化分配
for _, memberID := range currentMembers {
assignment[memberID] = make([]int32, 0)
}
// 1. 保持现有分配
sa.preserveExistingAssignment(assignment, currentMembers, currentPartitions)
// 2. 分配未分配的分区
unassignedPartitions := sa.getUnassignedPartitions(assignment, currentPartitions)
sa.assignUnassignedPartitions(assignment, unassignedPartitions)
// 3. 平衡分配
sa.balanceAssignment(assignment)
// 保存当前分配
sa.previousAssignment = assignment
return assignment
}
// 保持现有分配
func (sa *StickyAssignor) preserveExistingAssignment(assignment map[string][]int32,
currentMembers []string, currentPartitions []int32) {
for memberID, partitions := range sa.previousAssignment {
// 检查成员是否仍然存在
if !ra.contains(currentMembers, memberID) {
continue
}
// 检查分区是否仍然存在
validPartitions := make([]int32, 0)
for _, partition := range partitions {
if ra.contains(currentPartitions, partition) {
validPartitions = append(validPartitions, partition)
}
}
assignment[memberID] = validPartitions
}
}
// 平衡分配
func (sa *StickyAssignor) balanceAssignment(assignment map[string][]int32) {
// 计算平均分配数量
totalPartitions := 0
for _, partitions := range assignment {
totalPartitions += len(partitions)
}
if len(assignment) == 0 {
return
}
avgPartitions := totalPartitions / len(assignment)
extraPartitions := totalPartitions % len(assignment)
// 重新平衡
for memberID, partitions := range assignment {
targetCount := avgPartitions
if len(partitions) > avgPartitions {
targetCount = avgPartitions + extraPartitions
}
if len(partitions) > targetCount {
// 移除多余分区
excess := len(partitions) - targetCount
assignment[memberID] = partitions[:len(partitions)-excess]
}
}
}
🔄 Rebalance机制
Rebalance流程
// Rebalance管理器
type RebalanceManager struct {
coordinator *GroupCoordinator
rebalanceTimeout int32
}
// 触发Rebalance
func (rm *RebalanceManager) triggerRebalance(groupMetadata *GroupMetadata) error {
log.Printf("触发Rebalance,组: %s", groupMetadata.groupID)
// 1. 设置状态
groupMetadata.state = PreparingRebalance
groupMetadata.generation++
// 2. 通知所有成员
if err := rm.notifyMembersRebalance(groupMetadata); err != nil {
return err
}
// 3. 等待所有成员重新加入
if err := rm.waitForAllMembersJoin(groupMetadata); err != nil {
return err
}
// 4. 执行分区分配
if err := rm.performPartitionAssignment(groupMetadata); err != nil {
return err
}
// 5. 通知成员分配结果
if err := rm.notifyMembersAssignment(groupMetadata); err != nil {
return err
}
// 6. 设置稳定状态
groupMetadata.state = Stable
return nil
}
// 等待所有成员重新加入
func (rm *RebalanceManager) waitForAllMembersJoin(groupMetadata *GroupMetadata) error {
timeout := time.Duration(rm.rebalanceTimeout) * time.Millisecond
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
// 检查是否所有成员都已加入
if rm.allMembersJoined(groupMetadata) {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("等待成员加入超时")
}
// 执行分区分配
func (rm *RebalanceManager) performPartitionAssignment(groupMetadata *GroupMetadata) error {
// 选择分配策略
assignor := rm.selectAssignor(groupMetadata)
// 执行分配
assignment := assignor.assign(groupMetadata)
// 验证分配
if err := rm.validateAssignment(groupMetadata, assignment); err != nil {
return err
}
// 应用分配
rm.applyAssignment(groupMetadata, assignment)
return nil
}
// 选择分配策略
func (rm *RebalanceManager) selectAssignor(groupMetadata *GroupMetadata) PartitionAssignor {
// 根据协议类型选择分配策略
switch groupMetadata.protocolType {
case "range":
return &RangeAssignor{}
case "roundrobin":
return &RoundRobinAssignor{}
case "sticky":
return &StickyAssignor{}
default:
return &RangeAssignor{} // 默认使用Range
}
}
Rebalance优化
// Rebalance优化器
type RebalanceOptimizer struct {
minRebalanceInterval int64
lastRebalanceTime map[string]int64
}
// 检查是否可以优化Rebalance
func (ro *RebalanceOptimizer) canOptimizeRebalance(groupMetadata *GroupMetadata) bool {
groupID := groupMetadata.groupID
lastTime := ro.lastRebalanceTime[groupID]
currentTime := time.Now().UnixMilli()
// 检查最小间隔
if currentTime-lastTime < ro.minRebalanceInterval {
return false
}
// 检查成员变化
if ro.isMinorChange(groupMetadata) {
return true
}
return false
}
// 判断是否为小变化
func (ro *RebalanceOptimizer) isMinorChange(groupMetadata *GroupMetadata) bool {
// 1. 检查成员数量变化
memberCountChange := ro.getMemberCountChange(groupMetadata)
if memberCountChange > 1 {
return false
}
// 2. 检查订阅变化
if ro.hasSubscriptionChanged(groupMetadata) {
return false
}
// 3. 检查分区数量变化
if ro.hasPartitionCountChanged(groupMetadata) {
return false
}
return true
}
// 增量Rebalance
func (ro *RebalanceOptimizer) incrementalRebalance(groupMetadata *GroupMetadata) error {
// 1. 识别变化
changes := ro.identifyChanges(groupMetadata)
// 2. 计算最小影响范围
affectedMembers := ro.getAffectedMembers(changes)
// 3. 只重新分配受影响的分区
if err := ro.reassignAffectedPartitions(groupMetadata, affectedMembers); err != nil {
return err
}
return nil
}
📍 Offset管理
Offset存储
// Offset管理器
type OffsetManager struct {
coordinator *GroupCoordinator
offsetStore *OffsetStore
}
// 提交Offset
func (om *OffsetManager) commitOffset(request *CommitOffsetRequest) (*CommitOffsetResponse, error) {
// 1. 验证组和成员
if err := om.validateGroupAndMember(request.GroupID, request.MemberID); err != nil {
return nil, err
}
// 2. 验证Generation
if err := om.validateGeneration(request.GroupID, request.GenerationID); err != nil {
return nil, err
}
// 3. 提交Offset
for topic, partitions := range request.Offsets {
for partition, offset := range partitions {
if err := om.offsetStore.commitOffset(request.GroupID, topic, partition, offset); err != nil {
return nil, err
}
}
}
return &CommitOffsetResponse{Error: ""}, nil
}
// 获取Offset
func (om *OffsetManager) fetchOffset(request *FetchOffsetRequest) (*FetchOffsetResponse, error) {
offsets := make(map[string]map[int32]int64)
for topic, partitions := range request.Partitions {
offsets[topic] = make(map[int32]int64)
for _, partition := range partitions {
offset, err := om.offsetStore.getOffset(request.GroupID, topic, partition)
if err != nil {
return nil, err
}
offsets[topic][partition] = offset
}
}
return &FetchOffsetResponse{Offsets: offsets}, nil
}
// Offset存储
type OffsetStore struct {
storage map[string]map[string]map[int32]int64 // Group -> Topic -> Partition -> Offset
mu sync.RWMutex
}
// 提交Offset
func (os *OffsetStore) commitOffset(groupID, topic string, partition int32, offset int64) error {
os.mu.Lock()
defer os.mu.Unlock()
if os.storage[groupID] == nil {
os.storage[groupID] = make(map[string]map[int32]int64)
}
if os.storage[groupID][topic] == nil {
os.storage[groupID][topic] = make(map[int32]int64)
}
os.storage[groupID][topic][partition] = offset
// 持久化到磁盘
return os.persistOffset(groupID, topic, partition, offset)
}
// 获取Offset
func (os *OffsetStore) getOffset(groupID, topic string, partition int32) (int64, error) {
os.mu.RLock()
defer os.mu.RUnlock()
if os.storage[groupID] == nil {
return -1, nil // 没有提交的Offset
}
if os.storage[groupID][topic] == nil {
return -1, nil
}
offset, exists := os.storage[groupID][topic][partition]
if !exists {
return -1, nil
}
return offset, nil
}
🎯 面试高频考点
1. Consumer Group的作用?
答案要点:
- 负载均衡: 分区在组内消费者间分配
- 容错性: 消费者故障时分区重新分配
- 并行处理: 不同分区可以并行消费
- Offset管理: 组级别的Offset提交和恢复
- 协调机制: 通过GroupCoordinator协调
2. Rebalance的触发条件?
答案要点:
- 成员变化: 消费者加入或离开
- 订阅变化: 消费者订阅的Topic变化
- 分区变化: Topic分区数量变化
- 会话超时: 消费者心跳超时
- 手动触发: 管理员手动触发
3. 分区分配策略的差异?
答案要点:
- Range: 按范围分配,可能不均匀
- RoundRobin: 轮询分配,相对均匀
- Sticky: 粘性分配,减少Rebalance影响
- 选择依据: 根据业务需求选择策略
- 性能影响: 不同策略对Rebalance性能影响不同
4. GroupCoordinator的职责?
答案要点:
- 成员管理: 管理消费者组的成员
- 协议协调: 处理Join/Sync/Heartbeat协议
- 分区分配: 协调分区分配过程
- 状态管理: 维护组的状态和Generation
- 故障处理: 处理成员故障和恢复
📝 本章小结
本章深入解析了Kafka的消费者组协调机制,包括:
- Consumer Group: 负载均衡、容错性、并行处理
- GroupCoordinator: 成员管理、协议协调、状态维护
- 协调协议: Join/Sync/Heartbeat协议的工作流程
- 分区分配: Range/RoundRobin/Sticky策略的差异
- Rebalance: 触发条件、执行流程、优化策略
- Offset管理: 提交、获取、持久化机制
消费者组协调是Kafka实现高可用性和负载均衡的关键机制,理解了这些原理,就能更好地进行消费者应用的开发和调优。
下一章预告: 06-事务与Exactly-Once语义 - 深入理解Kafka的事务机制和一致性保证