HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串与 SDS 实现
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

哨兵模式实现

学习目标

  • 深入理解 Redis 哨兵的监控机制和故障检测
  • 掌握主观下线和客观下线的判断逻辑
  • 实现自动故障转移流程和选举算法
  • 理解哨兵集群的通信协议和配置传播
  • 掌握哨兵模式的高可用性保证

哨兵模式概述

1. 哨兵架构设计

Redis 哨兵模式通过多个哨兵节点监控主从节点,实现自动故障转移和高可用性:

┌─────────────────────────────────────────────────────────────┐
│                   哨兵模式架构                              │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   哨兵1     │    │   哨兵2     │    │   哨兵3     │     │
│  │  (Sentinel) │    │  (Sentinel) │    │  (Sentinel) │     │
│  │             │    │             │    │             │     │
│  │  ┌───────┐  │    │  ┌───────┐  │    │  ┌───────┐  │     │
│  │  │ 监控  │  │    │  │ 监控  │  │    │  │ 监控  │  │     │
│  │  │ 主节点 │  │    │  │ 主节点 │  │    │  │ 主节点 │  │     │
│  │  └───────┘  │    │  └───────┘  │    │  └───────┘  │     │
│  │  ┌───────┐  │    │  ┌───────┐  │    │  ┌───────┐  │     │
│  │  │ 监控  │  │    │  │ 监控  │  │    │  │ 监控  │  │     │
│  │  │ 从节点 │  │    │  │ 从节点 │  │    │  │ 从节点 │  │     │
│  │  └───────┘  │    │  └───────┘  │    │  └───────┘  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│           │                   │                   │        │
│           └───────────┬───────┴───────────────────┘        │
│                       │                                    │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │                监控目标                                  │ │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐ │ │
│  │  │   主节点    │    │   从节点1   │    │   从节点2   │ │ │
│  │  │  (Master)   │    │  (Slave)    │    │  (Slave)    │ │ │
│  │  └─────────────┘    └─────────────┘    └─────────────┘ │ │
│  └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

2. 哨兵状态机

状态描述转换条件
SENTINEL_STATE_NONE未初始化初始状态
SENTINEL_STATE_CONNECT连接中开始连接主节点
SENTINEL_STATE_CONNECTED已连接连接建立成功
SENTINEL_STATE_SUBSCRIBE订阅中订阅主节点事件
SENTINEL_STATE_MONITORING监控中开始监控主节点
SENTINEL_STATE_FAILOVER故障转移执行故障转移
SENTINEL_STATE_RECONFIG重新配置更新配置

️ Go 语言哨兵实现

1. 哨兵节点实现

// sentinel/sentinel.go
package sentinel

import (
    "context"
    "fmt"
    "net"
    "sync"
    "time"
)

// 哨兵状态
type SentinelState int

const (
    SENTINEL_STATE_NONE SentinelState = iota
    SENTINEL_STATE_CONNECT
    SENTINEL_STATE_CONNECTED
    SENTINEL_STATE_SUBSCRIBE
    SENTINEL_STATE_MONITORING
    SENTINEL_STATE_FAILOVER
    SENTINEL_STATE_RECONFIG
)

// 监控目标
type MonitoredTarget struct {
    Name        string
    Host        string
    Port        int
    Role        string // "master" or "slave"
    State       string // "up" or "down"
    LastPing    time.Time
    DownTime    time.Time
    FailoverStartTime time.Time
    mu          sync.RWMutex
}

// 哨兵节点
type Sentinel struct {
    ID          string
    Addr        string
    State       SentinelState
    Targets     map[string]*MonitoredTarget
    OtherSentinels map[string]*Sentinel
    isRunning   bool
    stopCh      chan struct{}
    wg          sync.WaitGroup
    mu          sync.RWMutex
    config      *SentinelConfig
}

// 哨兵配置
type SentinelConfig struct {
    MasterName      string
    MasterHost      string
    MasterPort      int
    DownAfterMs     int
    FailoverTimeout int
    ParallelSyncs   int
}

// 创建哨兵节点
func NewSentinel(id, addr string, config *SentinelConfig) *Sentinel {
    return &Sentinel{
        ID:            id,
        Addr:          addr,
        State:         SENTINEL_STATE_NONE,
        Targets:       make(map[string]*MonitoredTarget),
        OtherSentinels: make(map[string]*Sentinel),
        stopCh:        make(chan struct{}),
        config:        config,
    }
}

// 启动哨兵
func (s *Sentinel) Start() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if s.isRunning {
        return fmt.Errorf("sentinel already running")
    }
    
    s.isRunning = true
    s.wg.Add(1)
    
    go s.sentinelLoop()
    
    return nil
}

// 哨兵主循环
func (s *Sentinel) sentinelLoop() {
    defer s.wg.Done()
    
    // 初始化监控目标
    s.initializeTargets()
    
    // 开始监控
    s.startMonitoring()
    
    // 主循环
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            s.performMonitoring()
        case <-s.stopCh:
            return
        }
    }
}

// 初始化监控目标
func (s *Sentinel) initializeTargets() {
    // 添加主节点
    master := &MonitoredTarget{
        Name:     s.config.MasterName,
        Host:     s.config.MasterHost,
        Port:     s.config.MasterPort,
        Role:     "master",
        State:    "up",
        LastPing: time.Now(),
    }
    
    s.Targets[s.config.MasterName] = master
}

// 开始监控
func (s *Sentinel) startMonitoring() {
    s.mu.Lock()
    s.State = SENTINEL_STATE_MONITORING
    s.mu.Unlock()
    
    // 连接主节点
    if err := s.connectToMaster(); err != nil {
        fmt.Printf("Failed to connect to master: %v\n", err)
        return
    }
    
    // 发现从节点
    s.discoverSlaves()
    
    // 发现其他哨兵
    s.discoverOtherSentinels()
}

// 连接主节点
func (s *Sentinel) connectToMaster() error {
    master := s.Targets[s.config.MasterName]
    
    // 连接主节点
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
    if err != nil {
        return err
    }
    defer conn.Close()
    
    // 发送 PING 命令
    if err := s.sendPing(conn); err != nil {
        return err
    }
    
    // 更新状态
    master.mu.Lock()
    master.State = "up"
    master.LastPing = time.Now()
    master.mu.Unlock()
    
    return nil
}

// 发送 PING 命令
func (s *Sentinel) sendPing(conn net.Conn) error {
    // 发送 PING 命令
    pingCmd := "*1\r\n$4\r\nPING\r\n"
    if _, err := conn.Write([]byte(pingCmd)); err != nil {
        return err
    }
    
    // 读取响应
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err != nil {
        return err
    }
    
    response := string(buffer[:n])
    if response != "+PONG\r\n" {
        return fmt.Errorf("unexpected response: %s", response)
    }
    
    return nil
}

// 发现从节点
func (s *Sentinel) discoverSlaves() {
    master := s.Targets[s.config.MasterName]
    
    // 连接主节点
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
    if err != nil {
        return
    }
    defer conn.Close()
    
    // 发送 INFO replication 命令
    infoCmd := "*2\r\n$4\r\nINFO\r\n$10\r\nreplication\r\n"
    if _, err := conn.Write([]byte(infoCmd)); err != nil {
        return
    }
    
    // 读取响应
    buffer := make([]byte, 4096)
    n, err := conn.Read(buffer)
    if err != nil {
        return
    }
    
    // 解析从节点信息
    s.parseSlaveInfo(string(buffer[:n]))
}

// 解析从节点信息
func (s *Sentinel) parseSlaveInfo(info string) {
    // 简化实现,实际应该解析 INFO 输出
    // 这里假设发现了一个从节点
    slave := &MonitoredTarget{
        Name:     "slave1",
        Host:     "127.0.0.1",
        Port:     6380,
        Role:     "slave",
        State:    "up",
        LastPing: time.Now(),
    }
    
    s.Targets["slave1"] = slave
}

// 发现其他哨兵
func (s *Sentinel) discoverOtherSentinels() {
    master := s.Targets[s.config.MasterName]
    
    // 连接主节点
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
    if err != nil {
        return
    }
    defer conn.Close()
    
    // 发送 PUBLISH 命令订阅哨兵事件
    pubCmd := "*3\r\n$7\r\nPUBLISH\r\n$8\r\n__sentinel__:hello\r\n$10\r\nsentinel1\r\n"
    if _, err := conn.Write([]byte(pubCmd)); err != nil {
        return
    }
    
    // 读取响应
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err != nil {
        return
    }
    
    // 解析哨兵信息
    s.parseSentinelInfo(string(buffer[:n]))
}

// 解析哨兵信息
func (s *Sentinel) parseSentinelInfo(info string) {
    // 简化实现,实际应该解析 PUBLISH 输出
    // 这里假设发现了另一个哨兵
    otherSentinel := &Sentinel{
        ID:   "sentinel2",
        Addr: "127.0.0.1:26380",
    }
    
    s.OtherSentinels["sentinel2"] = otherSentinel
}

// 执行监控
func (s *Sentinel) performMonitoring() {
    // 检查主节点状态
    s.checkMasterHealth()
    
    // 检查从节点状态
    s.checkSlavesHealth()
    
    // 检查其他哨兵状态
    s.checkOtherSentinelsHealth()
    
    // 执行故障转移
    s.performFailoverIfNeeded()
}

// 检查主节点健康状态
func (s *Sentinel) checkMasterHealth() {
    master := s.Targets[s.config.MasterName]
    
    // 连接主节点
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
    if err != nil {
        s.handleMasterDown(master)
        return
    }
    defer conn.Close()
    
    // 发送 PING 命令
    if err := s.sendPing(conn); err != nil {
        s.handleMasterDown(master)
        return
    }
    
    // 更新状态
    master.mu.Lock()
    master.State = "up"
    master.LastPing = time.Now()
    master.mu.Unlock()
}

// 处理主节点下线
func (s *Sentinel) handleMasterDown(master *MonitoredTarget) {
    master.mu.Lock()
    defer master.mu.Unlock()
    
    if master.State == "up" {
        master.State = "down"
        master.DownTime = time.Now()
        
        // 检查是否达到主观下线条件
        if s.isSubjectivelyDown(master) {
            s.handleSubjectiveDown(master)
        }
    }
}

// 检查是否主观下线
func (s *Sentinel) isSubjectivelyDown(master *MonitoredTarget) bool {
    return time.Since(master.DownTime) > time.Duration(s.config.DownAfterMs)*time.Millisecond
}

// 处理主观下线
func (s *Sentinel) handleSubjectiveDown(master *MonitoredTarget) {
    fmt.Printf("Master %s is subjectively down\n", master.Name)
    
    // 询问其他哨兵
    s.askOtherSentinels(master)
}

// 询问其他哨兵
func (s *Sentinel) askOtherSentinels(master *MonitoredTarget) {
    for _, sentinel := range s.OtherSentinels {
        go s.askSentinel(sentinel, master)
    }
}

// 询问单个哨兵
func (s *Sentinel) askSentinel(sentinel *Sentinel, master *MonitoredTarget) {
    // 连接哨兵
    conn, err := net.Dial("tcp", sentinel.Addr)
    if err != nil {
        return
    }
    defer conn.Close()
    
    // 发送 SENTINEL is-master-down-by-addr 命令
    cmd := fmt.Sprintf("*4\r\n$7\r\nSENTINEL\r\n$20\r\nis-master-down-by-addr\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n", 
        len(master.Host), master.Host, len(fmt.Sprintf("%d", master.Port)), master.Port)
    
    if _, err := conn.Write([]byte(cmd)); err != nil {
        return
    }
    
    // 读取响应
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err != nil {
        return
    }
    
    // 处理响应
    s.handleSentinelResponse(string(buffer[:n]), master)
}

// 处理哨兵响应
func (s *Sentinel) handleSentinelResponse(response string, master *MonitoredTarget) {
    // 简化实现,实际应该解析响应
    if response == ":1\r\n" {
        // 其他哨兵也认为主节点下线
        s.handleObjectiveDown(master)
    }
}

// 处理客观下线
func (s *Sentinel) handleObjectiveDown(master *MonitoredTarget) {
    fmt.Printf("Master %s is objectively down\n", master.Name)
    
    // 开始故障转移
    s.startFailover(master)
}

// 开始故障转移
func (s *Sentinel) startFailover(master *MonitoredTarget) {
    master.mu.Lock()
    master.FailoverStartTime = time.Now()
    master.mu.Unlock()
    
    s.mu.Lock()
    s.State = SENTINEL_STATE_FAILOVER
    s.mu.Unlock()
    
    // 选择新的主节点
    newMaster := s.selectNewMaster()
    if newMaster == nil {
        fmt.Println("No suitable slave found for failover")
        return
    }
    
    // 执行故障转移
    s.executeFailover(master, newMaster)
}

// 选择新的主节点
func (s *Sentinel) selectNewMaster() *MonitoredTarget {
    // 简化实现,选择第一个从节点
    for _, target := range s.Targets {
        if target.Role == "slave" && target.State == "up" {
            return target
        }
    }
    
    return nil
}

// 执行故障转移
func (s *Sentinel) executeFailover(oldMaster, newMaster *MonitoredTarget) {
    fmt.Printf("Executing failover from %s to %s\n", oldMaster.Name, newMaster.Name)
    
    // 1. 停止旧主节点
    s.stopMaster(oldMaster)
    
    // 2. 提升从节点为主节点
    s.promoteSlaveToMaster(newMaster)
    
    // 3. 更新其他从节点
    s.updateOtherSlaves(newMaster)
    
    // 4. 更新哨兵配置
    s.updateSentinelConfig(newMaster)
    
    // 5. 完成故障转移
    s.completeFailover(newMaster)
}

// 停止主节点
func (s *Sentinel) stopMaster(master *MonitoredTarget) {
    // 简化实现,实际应该发送 SHUTDOWN 命令
    fmt.Printf("Stopping master %s\n", master.Name)
}

// 提升从节点为主节点
func (s *Sentinel) promoteSlaveToMaster(slave *MonitoredTarget) {
    // 简化实现,实际应该发送 SLAVEOF NO ONE 命令
    fmt.Printf("Promoting slave %s to master\n", slave.Name)
    
    slave.mu.Lock()
    slave.Role = "master"
    slave.mu.Unlock()
}

// 更新其他从节点
func (s *Sentinel) updateOtherSlaves(newMaster *MonitoredTarget) {
    for _, target := range s.Targets {
        if target.Role == "slave" && target != newMaster {
            // 发送 SLAVEOF 命令
            s.sendSlaveOfCommand(target, newMaster)
        }
    }
}

// 发送 SLAVEOF 命令
func (s *Sentinel) sendSlaveOfCommand(slave, master *MonitoredTarget) {
    // 简化实现,实际应该发送 SLAVEOF 命令
    fmt.Printf("Updating slave %s to follow master %s\n", slave.Name, master.Name)
}

// 更新哨兵配置
func (s *Sentinel) updateSentinelConfig(newMaster *MonitoredTarget) {
    // 简化实现,实际应该更新配置文件
    fmt.Printf("Updating sentinel config to point to new master %s\n", newMaster.Name)
}

// 完成故障转移
func (s *Sentinel) completeFailover(newMaster *MonitoredTarget) {
    fmt.Printf("Failover completed successfully, new master is %s\n", newMaster.Name)
    
    s.mu.Lock()
    s.State = SENTINEL_STATE_MONITORING
    s.mu.Unlock()
}

// 检查从节点健康状态
func (s *Sentinel) checkSlavesHealth() {
    for _, target := range s.Targets {
        if target.Role == "slave" {
            s.checkTargetHealth(target)
        }
    }
}

// 检查其他哨兵健康状态
func (s *Sentinel) checkOtherSentinelsHealth() {
    for _, sentinel := range s.OtherSentinels {
        s.checkSentinelHealth(sentinel)
    }
}

// 检查目标健康状态
func (s *Sentinel) checkTargetHealth(target *MonitoredTarget) {
    // 连接目标
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", target.Host, target.Port))
    if err != nil {
        s.handleTargetDown(target)
        return
    }
    defer conn.Close()
    
    // 发送 PING 命令
    if err := s.sendPing(conn); err != nil {
        s.handleTargetDown(target)
        return
    }
    
    // 更新状态
    target.mu.Lock()
    target.State = "up"
    target.LastPing = time.Now()
    target.mu.Unlock()
}

// 处理目标下线
func (s *Sentinel) handleTargetDown(target *MonitoredTarget) {
    target.mu.Lock()
    defer target.mu.Unlock()
    
    if target.State == "up" {
        target.State = "down"
        target.DownTime = time.Now()
        fmt.Printf("Target %s is down\n", target.Name)
    }
}

// 检查哨兵健康状态
func (s *Sentinel) checkSentinelHealth(sentinel *Sentinel) {
    // 连接哨兵
    conn, err := net.Dial("tcp", sentinel.Addr)
    if err != nil {
        fmt.Printf("Sentinel %s is down\n", sentinel.ID)
        return
    }
    defer conn.Close()
    
    // 发送 PING 命令
    if err := s.sendPing(conn); err != nil {
        fmt.Printf("Sentinel %s is down\n", sentinel.ID)
        return
    }
    
    fmt.Printf("Sentinel %s is up\n", sentinel.ID)
}

// 执行故障转移(如果需要)
func (s *Sentinel) performFailoverIfNeeded() {
    // 检查是否有主节点需要故障转移
    for _, target := range s.Targets {
        if target.Role == "master" && target.State == "down" {
            if s.shouldStartFailover(target) {
                s.startFailover(target)
            }
        }
    }
}

// 检查是否应该开始故障转移
func (s *Sentinel) shouldStartFailover(master *MonitoredTarget) bool {
    master.mu.RLock()
    defer master.mu.RUnlock()
    
    // 检查是否已经超时
    if time.Since(master.FailoverStartTime) > time.Duration(s.config.FailoverTimeout)*time.Millisecond {
        return true
    }
    
    return false
}

// 停止哨兵
func (s *Sentinel) Stop() {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if !s.isRunning {
        return
    }
    
    close(s.stopCh)
    s.wg.Wait()
    s.isRunning = false
}

// 获取哨兵状态
func (s *Sentinel) GetStatus() map[string]interface{} {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    return map[string]interface{}{
        "id":                s.ID,
        "addr":              s.Addr,
        "state":             s.State,
        "targets_count":     len(s.Targets),
        "sentinels_count":   len(s.OtherSentinels),
        "is_running":        s.isRunning,
    }
}

2. 哨兵集群实现

// sentinel/sentinel_cluster.go
package sentinel

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 哨兵集群
type SentinelCluster struct {
    Sentinels map[string]*Sentinel
    mu        sync.RWMutex
    isRunning bool
    stopCh    chan struct{}
    wg        sync.WaitGroup
}

// 创建哨兵集群
func NewSentinelCluster() *SentinelCluster {
    return &SentinelCluster{
        Sentinels: make(map[string]*Sentinel),
        stopCh:    make(chan struct{}),
    }
}

// 添加哨兵
func (sc *SentinelCluster) AddSentinel(sentinel *Sentinel) {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    
    sc.Sentinels[sentinel.ID] = sentinel
}

// 启动集群
func (sc *SentinelCluster) Start() error {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    
    if sc.isRunning {
        return fmt.Errorf("cluster already running")
    }
    
    sc.isRunning = true
    sc.wg.Add(1)
    
    go sc.clusterLoop()
    
    // 启动所有哨兵
    for _, sentinel := range sc.Sentinels {
        if err := sentinel.Start(); err != nil {
            return err
        }
    }
    
    return nil
}

// 集群主循环
func (sc *SentinelCluster) clusterLoop() {
    defer sc.wg.Done()
    
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            sc.performClusterOperations()
        case <-sc.stopCh:
            return
        }
    }
}

// 执行集群操作
func (sc *SentinelCluster) performClusterOperations() {
    // 同步哨兵状态
    sc.syncSentinelStates()
    
    // 执行故障转移
    sc.performFailoverIfNeeded()
    
    // 更新配置
    sc.updateConfigurations()
}

// 同步哨兵状态
func (sc *SentinelCluster) syncSentinelStates() {
    sc.mu.RLock()
    sentinels := make([]*Sentinel, 0, len(sc.Sentinels))
    for _, sentinel := range sc.Sentinels {
        sentinels = append(sentinels, sentinel)
    }
    sc.mu.RUnlock()
    
    // 同步状态
    for _, sentinel := range sentinels {
        go sc.syncSentinelState(sentinel)
    }
}

// 同步单个哨兵状态
func (sc *SentinelCluster) syncSentinelState(sentinel *Sentinel) {
    // 简化实现,实际应该同步状态
    status := sentinel.GetStatus()
    fmt.Printf("Sentinel %s status: %v\n", sentinel.ID, status)
}

// 执行故障转移(如果需要)
func (sc *SentinelCluster) performFailoverIfNeeded() {
    // 检查是否有哨兵需要故障转移
    for _, sentinel := range sc.Sentinels {
        if sentinel.State == SENTINEL_STATE_FAILOVER {
            // 执行故障转移
            go sentinel.performFailoverIfNeeded()
        }
    }
}

// 更新配置
func (sc *SentinelCluster) updateConfigurations() {
    // 简化实现,实际应该更新配置
    fmt.Println("Updating configurations...")
}

// 停止集群
func (sc *SentinelCluster) Stop() {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    
    if !sc.isRunning {
        return
    }
    
    close(sc.stopCh)
    sc.wg.Wait()
    
    // 停止所有哨兵
    for _, sentinel := range sc.Sentinels {
        sentinel.Stop()
    }
    
    sc.isRunning = false
}

// 获取集群状态
func (sc *SentinelCluster) GetStatus() map[string]interface{} {
    sc.mu.RLock()
    defer sc.mu.RUnlock()
    
    return map[string]interface{}{
        "sentinels_count": len(sc.Sentinels),
        "is_running":      sc.isRunning,
    }
}

3. 故障转移实现

// sentinel/failover.go
package sentinel

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 故障转移管理器
type FailoverManager struct {
    sentinel   *Sentinel
    isRunning  bool
    stopCh     chan struct{}
    wg         sync.WaitGroup
    mu         sync.RWMutex
}

// 创建故障转移管理器
func NewFailoverManager(sentinel *Sentinel) *FailoverManager {
    return &FailoverManager{
        sentinel:  sentinel,
        stopCh:    make(chan struct{}),
    }
}

// 启动故障转移管理器
func (fm *FailoverManager) Start() error {
    fm.mu.Lock()
    defer fm.mu.Unlock()
    
    if fm.isRunning {
        return fmt.Errorf("failover manager already running")
    }
    
    fm.isRunning = true
    fm.wg.Add(1)
    
    go fm.failoverLoop()
    
    return nil
}

// 故障转移循环
func (fm *FailoverManager) failoverLoop() {
    defer fm.wg.Done()
    
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            fm.checkFailoverConditions()
        case <-fm.stopCh:
            return
        }
    }
}

// 检查故障转移条件
func (fm *FailoverManager) checkFailoverConditions() {
    // 检查主节点状态
    for _, target := range fm.sentinel.Targets {
        if target.Role == "master" && target.State == "down" {
            if fm.shouldStartFailover(target) {
                fm.startFailover(target)
            }
        }
    }
}

// 检查是否应该开始故障转移
func (fm *FailoverManager) shouldStartFailover(master *MonitoredTarget) bool {
    master.mu.RLock()
    defer master.mu.RUnlock()
    
    // 检查是否已经超时
    if time.Since(master.FailoverStartTime) > time.Duration(fm.sentinel.config.FailoverTimeout)*time.Millisecond {
        return true
    }
    
    return false
}

// 开始故障转移
func (fm *FailoverManager) startFailover(master *MonitoredTarget) {
    fmt.Printf("Starting failover for master %s\n", master.Name)
    
    // 选择新的主节点
    newMaster := fm.selectNewMaster()
    if newMaster == nil {
        fmt.Println("No suitable slave found for failover")
        return
    }
    
    // 执行故障转移
    fm.executeFailover(master, newMaster)
}

// 选择新的主节点
func (fm *FailoverManager) selectNewMaster() *MonitoredTarget {
    // 简化实现,选择第一个从节点
    for _, target := range fm.sentinel.Targets {
        if target.Role == "slave" && target.State == "up" {
            return target
        }
    }
    
    return nil
}

// 执行故障转移
func (fm *FailoverManager) executeFailover(oldMaster, newMaster *MonitoredTarget) {
    fmt.Printf("Executing failover from %s to %s\n", oldMaster.Name, newMaster.Name)
    
    // 1. 停止旧主节点
    fm.stopMaster(oldMaster)
    
    // 2. 提升从节点为主节点
    fm.promoteSlaveToMaster(newMaster)
    
    // 3. 更新其他从节点
    fm.updateOtherSlaves(newMaster)
    
    // 4. 更新哨兵配置
    fm.updateSentinelConfig(newMaster)
    
    // 5. 完成故障转移
    fm.completeFailover(newMaster)
}

// 停止主节点
func (fm *FailoverManager) stopMaster(master *MonitoredTarget) {
    // 简化实现,实际应该发送 SHUTDOWN 命令
    fmt.Printf("Stopping master %s\n", master.Name)
    
    master.mu.Lock()
    master.State = "down"
    master.mu.Unlock()
}

// 提升从节点为主节点
func (fm *FailoverManager) promoteSlaveToMaster(slave *MonitoredTarget) {
    // 简化实现,实际应该发送 SLAVEOF NO ONE 命令
    fmt.Printf("Promoting slave %s to master\n", slave.Name)
    
    slave.mu.Lock()
    slave.Role = "master"
    slave.State = "up"
    slave.mu.Unlock()
}

// 更新其他从节点
func (fm *FailoverManager) updateOtherSlaves(newMaster *MonitoredTarget) {
    for _, target := range fm.sentinel.Targets {
        if target.Role == "slave" && target != newMaster {
            // 发送 SLAVEOF 命令
            fm.sendSlaveOfCommand(target, newMaster)
        }
    }
}

// 发送 SLAVEOF 命令
func (fm *FailoverManager) sendSlaveOfCommand(slave, master *MonitoredTarget) {
    // 简化实现,实际应该发送 SLAVEOF 命令
    fmt.Printf("Updating slave %s to follow master %s\n", slave.Name, master.Name)
}

// 更新哨兵配置
func (fm *FailoverManager) updateSentinelConfig(newMaster *MonitoredTarget) {
    // 简化实现,实际应该更新配置文件
    fmt.Printf("Updating sentinel config to point to new master %s\n", newMaster.Name)
}

// 完成故障转移
func (fm *FailoverManager) completeFailover(newMaster *MonitoredTarget) {
    fmt.Printf("Failover completed successfully, new master is %s\n", newMaster.Name)
    
    // 更新哨兵状态
    fm.sentinel.mu.Lock()
    fm.sentinel.State = SENTINEL_STATE_MONITORING
    fm.sentinel.mu.Unlock()
}

// 停止故障转移管理器
func (fm *FailoverManager) Stop() {
    fm.mu.Lock()
    defer fm.mu.Unlock()
    
    if !fm.isRunning {
        return
    }
    
    close(fm.stopCh)
    fm.wg.Wait()
    fm.isRunning = false
}

4. 配置管理实现

// sentinel/config.go
package sentinel

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "os"
    "sync"
)

// 哨兵配置管理器
type ConfigManager struct {
    configFile string
    config     *SentinelConfig
    mu         sync.RWMutex
}

// 创建配置管理器
func NewConfigManager(configFile string) *ConfigManager {
    return &ConfigManager{
        configFile: configFile,
        config:     &SentinelConfig{},
    }
}

// 加载配置
func (cm *ConfigManager) LoadConfig() error {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    // 检查配置文件是否存在
    if _, err := os.Stat(cm.configFile); os.IsNotExist(err) {
        // 创建默认配置
        return cm.createDefaultConfig()
    }
    
    // 读取配置文件
    data, err := ioutil.ReadFile(cm.configFile)
    if err != nil {
        return err
    }
    
    // 解析配置
    if err := json.Unmarshal(data, cm.config); err != nil {
        return err
    }
    
    return nil
}

// 保存配置
func (cm *ConfigManager) SaveConfig() error {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
    
    // 序列化配置
    data, err := json.MarshalIndent(cm.config, "", "  ")
    if err != nil {
        return err
    }
    
    // 写入文件
    return ioutil.WriteFile(cm.configFile, data, 0644)
}

// 创建默认配置
func (cm *ConfigManager) createDefaultConfig() error {
    cm.config = &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    return cm.SaveConfig()
}

// 获取配置
func (cm *ConfigManager) GetConfig() *SentinelConfig {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
    
    return cm.config
}

// 更新配置
func (cm *ConfigManager) UpdateConfig(config *SentinelConfig) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    cm.config = config
    return cm.SaveConfig()
}

// 更新主节点配置
func (cm *ConfigManager) UpdateMasterConfig(host string, port int) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    cm.config.MasterHost = host
    cm.config.MasterPort = port
    
    return cm.SaveConfig()
}

// 更新超时配置
func (cm *ConfigManager) UpdateTimeoutConfig(downAfterMs, failoverTimeout int) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    cm.config.DownAfterMs = downAfterMs
    cm.config.FailoverTimeout = failoverTimeout
    
    return cm.SaveConfig()
}

// 获取配置状态
func (cm *ConfigManager) GetConfigStatus() map[string]interface{} {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
    
    return map[string]interface{}{
        "master_name":       cm.config.MasterName,
        "master_host":       cm.config.MasterHost,
        "master_port":       cm.config.MasterPort,
        "down_after_ms":     cm.config.DownAfterMs,
        "failover_timeout":  cm.config.FailoverTimeout,
        "parallel_syncs":    cm.config.ParallelSyncs,
    }
}

测试验证

1. 单元测试

// sentinel/sentinel_test.go
package sentinel

import (
    "testing"
    "time"
)

func TestSentinel(t *testing.T) {
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 创建哨兵
    sentinel := NewSentinel("sentinel1", ":26379", config)
    
    // 启动哨兵
    if err := sentinel.Start(); err != nil {
        t.Fatalf("Failed to start sentinel: %v", err)
    }
    defer sentinel.Stop()
    
    // 等待监控开始
    time.Sleep(time.Second)
    
    // 检查状态
    status := sentinel.GetStatus()
    if status["is_running"].(bool) != true {
        t.Error("Sentinel should be running")
    }
    
    if status["targets_count"].(int) != 1 {
        t.Errorf("Expected 1 target, got %d", status["targets_count"])
    }
}

func TestSentinelCluster(t *testing.T) {
    // 创建哨兵集群
    cluster := NewSentinelCluster()
    
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 添加哨兵
    sentinel1 := NewSentinel("sentinel1", ":26379", config)
    sentinel2 := NewSentinel("sentinel2", ":26380", config)
    sentinel3 := NewSentinel("sentinel3", ":26381", config)
    
    cluster.AddSentinel(sentinel1)
    cluster.AddSentinel(sentinel2)
    cluster.AddSentinel(sentinel3)
    
    // 启动集群
    if err := cluster.Start(); err != nil {
        t.Fatalf("Failed to start cluster: %v", err)
    }
    defer cluster.Stop()
    
    // 等待集群启动
    time.Sleep(time.Second)
    
    // 检查状态
    status := cluster.GetStatus()
    if status["sentinels_count"].(int) != 3 {
        t.Errorf("Expected 3 sentinels, got %d", status["sentinels_count"])
    }
    
    if status["is_running"].(bool) != true {
        t.Error("Cluster should be running")
    }
}

func TestFailoverManager(t *testing.T) {
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 创建哨兵
    sentinel := NewSentinel("sentinel1", ":26379", config)
    
    // 创建故障转移管理器
    manager := NewFailoverManager(sentinel)
    
    // 启动管理器
    if err := manager.Start(); err != nil {
        t.Fatalf("Failed to start failover manager: %v", err)
    }
    defer manager.Stop()
    
    // 等待管理器启动
    time.Sleep(time.Second)
}

func TestConfigManager(t *testing.T) {
    // 创建配置管理器
    manager := NewConfigManager("test_config.json")
    defer os.Remove("test_config.json")
    
    // 加载配置
    if err := manager.LoadConfig(); err != nil {
        t.Fatalf("Failed to load config: %v", err)
    }
    
    // 检查配置
    config := manager.GetConfig()
    if config.MasterName != "mymaster" {
        t.Errorf("Expected master name 'mymaster', got %s", config.MasterName)
    }
    
    if config.MasterHost != "127.0.0.1" {
        t.Errorf("Expected master host '127.0.0.1', got %s", config.MasterHost)
    }
    
    if config.MasterPort != 6379 {
        t.Errorf("Expected master port 6379, got %d", config.MasterPort)
    }
    
    // 更新配置
    config.MasterHost = "192.168.1.100"
    config.MasterPort = 6380
    
    if err := manager.UpdateConfig(config); err != nil {
        t.Fatalf("Failed to update config: %v", err)
    }
    
    // 检查更新后的配置
    updatedConfig := manager.GetConfig()
    if updatedConfig.MasterHost != "192.168.1.100" {
        t.Errorf("Expected master host '192.168.1.100', got %s", updatedConfig.MasterHost)
    }
    
    if updatedConfig.MasterPort != 6380 {
        t.Errorf("Expected master port 6380, got %d", updatedConfig.MasterPort)
    }
}

2. 性能基准测试

// sentinel/benchmark_test.go
package sentinel

import (
    "testing"
    "time"
)

func BenchmarkSentinelMonitoring(b *testing.B) {
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 创建哨兵
    sentinel := NewSentinel("sentinel1", ":26400", config)
    
    // 启动哨兵
    sentinel.Start()
    defer sentinel.Stop()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // 模拟监控操作
        sentinel.performMonitoring()
    }
}

func BenchmarkSentinelCluster(b *testing.B) {
    // 创建哨兵集群
    cluster := NewSentinelCluster()
    
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 添加哨兵
    for i := 0; i < 3; i++ {
        sentinel := NewSentinel(fmt.Sprintf("sentinel%d", i), fmt.Sprintf(":2640%d", i), config)
        cluster.AddSentinel(sentinel)
    }
    
    // 启动集群
    cluster.Start()
    defer cluster.Stop()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // 模拟集群操作
        cluster.performClusterOperations()
    }
}

func BenchmarkFailoverManager(b *testing.B) {
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 创建哨兵
    sentinel := NewSentinel("sentinel1", ":26410", config)
    
    // 创建故障转移管理器
    manager := NewFailoverManager(sentinel)
    
    // 启动管理器
    manager.Start()
    defer manager.Stop()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // 模拟故障转移检查
        manager.checkFailoverConditions()
    }
}

func BenchmarkConfigManager(b *testing.B) {
    // 创建配置管理器
    manager := NewConfigManager("benchmark_config.json")
    defer os.Remove("benchmark_config.json")
    
    // 加载配置
    manager.LoadConfig()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // 模拟配置操作
        config := manager.GetConfig()
        config.MasterHost = fmt.Sprintf("192.168.1.%d", i%255)
        manager.UpdateConfig(config)
    }
}

3. 并发测试

// sentinel/concurrent_test.go
package sentinel

import (
    "sync"
    "testing"
    "time"
)

func TestSentinelConcurrent(t *testing.T) {
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 创建哨兵
    sentinel := NewSentinel("sentinel1", ":26500", config)
    
    // 启动哨兵
    if err := sentinel.Start(); err != nil {
        t.Fatalf("Failed to start sentinel: %v", err)
    }
    defer sentinel.Stop()
    
    // 等待监控开始
    time.Sleep(time.Second)
    
    // 并发监控
    const numGoroutines = 10
    const numOperations = 100
    
    var wg sync.WaitGroup
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                // 模拟监控操作
                sentinel.performMonitoring()
            }
        }(i)
    }
    
    wg.Wait()
    
    // 检查状态
    status := sentinel.GetStatus()
    if status["is_running"].(bool) != true {
        t.Error("Sentinel should be running")
    }
}

func TestSentinelClusterConcurrent(t *testing.T) {
    // 创建哨兵集群
    cluster := NewSentinelCluster()
    
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 添加哨兵
    for i := 0; i < 3; i++ {
        sentinel := NewSentinel(fmt.Sprintf("sentinel%d", i), fmt.Sprintf(":2650%d", i), config)
        cluster.AddSentinel(sentinel)
    }
    
    // 启动集群
    if err := cluster.Start(); err != nil {
        t.Fatalf("Failed to start cluster: %v", err)
    }
    defer cluster.Stop()
    
    // 等待集群启动
    time.Sleep(time.Second)
    
    // 并发操作
    const numGoroutines = 10
    const numOperations = 100
    
    var wg sync.WaitGroup
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                // 模拟集群操作
                cluster.performClusterOperations()
            }
        }(i)
    }
    
    wg.Wait()
    
    // 检查状态
    status := cluster.GetStatus()
    if status["is_running"].(bool) != true {
        t.Error("Cluster should be running")
    }
}

func TestFailoverManagerConcurrent(t *testing.T) {
    // 创建哨兵配置
    config := &SentinelConfig{
        MasterName:      "mymaster",
        MasterHost:      "127.0.0.1",
        MasterPort:      6379,
        DownAfterMs:     30000,
        FailoverTimeout: 180000,
        ParallelSyncs:   1,
    }
    
    // 创建哨兵
    sentinel := NewSentinel("sentinel1", ":26600", config)
    
    // 创建故障转移管理器
    manager := NewFailoverManager(sentinel)
    
    // 启动管理器
    if err := manager.Start(); err != nil {
        t.Fatalf("Failed to start failover manager: %v", err)
    }
    defer manager.Stop()
    
    // 等待管理器启动
    time.Sleep(time.Second)
    
    // 并发故障转移检查
    const numGoroutines = 10
    const numOperations = 100
    
    var wg sync.WaitGroup
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                // 模拟故障转移检查
                manager.checkFailoverConditions()
            }
        }(i)
    }
    
    wg.Wait()
}

性能对比分析

1. 哨兵模式对比

特性单哨兵多哨兵哨兵集群说明
可用性低中等高哨兵数量越多,可用性越高
性能高中等低哨兵数量越多,性能越低
复杂度低中等高哨兵数量越多,复杂度越高
成本低中等高哨兵数量越多,成本越高

2. 故障转移策略对比

策略速度准确性复杂度适用场景
主观下线快低低单哨兵
客观下线中等高中等多哨兵
自动故障转移慢高高生产环境

3. 性能测试结果

// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
    // 单哨兵性能
    b.Run("SingleSentinel", func(b *testing.B) {
        config := &SentinelConfig{
            MasterName:      "mymaster",
            MasterHost:      "127.0.0.1",
            MasterPort:      6379,
            DownAfterMs:     30000,
            FailoverTimeout: 180000,
            ParallelSyncs:   1,
        }
        
        sentinel := NewSentinel("sentinel1", ":26700", config)
        sentinel.Start()
        defer sentinel.Stop()
        
        for i := 0; i < b.N; i++ {
            sentinel.performMonitoring()
        }
    })
    
    // 多哨兵性能
    b.Run("MultipleSentinels", func(b *testing.B) {
        cluster := NewSentinelCluster()
        
        config := &SentinelConfig{
            MasterName:      "mymaster",
            MasterHost:      "127.0.0.1",
            MasterPort:      6379,
            DownAfterMs:     30000,
            FailoverTimeout: 180000,
            ParallelSyncs:   1,
        }
        
        for i := 0; i < 3; i++ {
            sentinel := NewSentinel(fmt.Sprintf("sentinel%d", i), fmt.Sprintf(":2670%d", i), config)
            cluster.AddSentinel(sentinel)
        }
        
        cluster.Start()
        defer cluster.Stop()
        
        for i := 0; i < b.N; i++ {
            cluster.performClusterOperations()
        }
    })
    
    // 故障转移性能
    b.Run("Failover", func(b *testing.B) {
        config := &SentinelConfig{
            MasterName:      "mymaster",
            MasterHost:      "127.0.0.1",
            MasterPort:      6379,
            DownAfterMs:     30000,
            FailoverTimeout: 180000,
            ParallelSyncs:   1,
        }
        
        sentinel := NewSentinel("sentinel1", ":26800", config)
        manager := NewFailoverManager(sentinel)
        
        manager.Start()
        defer manager.Stop()
        
        for i := 0; i < b.N; i++ {
            manager.checkFailoverConditions()
        }
    })
}

面试要点

1. 哨兵模式的优势

答案要点:

  • 自动故障转移:无需人工干预
  • 高可用性:多个哨兵节点保证服务可用
  • 配置管理:自动更新配置
  • 监控告警:实时监控主从节点状态

2. 主观下线 vs 客观下线

答案要点:

  • 主观下线:单个哨兵认为主节点下线
  • 客观下线:多个哨兵认为主节点下线
  • 判断条件:超时时间、哨兵数量
  • 处理策略:主观下线询问其他哨兵,客观下线开始故障转移

3. 故障转移流程

答案要点:

  • 故障检测:心跳检测、超时判断
  • 选主算法:优先级、偏移量、运行时间
  • 数据一致性:确保新主节点数据最新
  • 服务切换:更新配置、重定向流量

4. 哨兵集群的通信

答案要点:

  • 发布订阅:使用 Redis 的发布订阅功能
  • 命令传播:通过命令传播配置变更
  • 状态同步:定期同步哨兵状态
  • 故障检测:相互监控哨兵健康状态

总结

通过本章学习,我们深入理解了:

  1. Redis 哨兵模式的监控机制和故障检测
  2. 主观下线和客观下线的判断逻辑
  3. 自动故障转移流程和选举算法
  4. 哨兵集群的通信协议和配置传播
  5. 哨兵模式的高可用性保证

哨兵模式为 Redis 提供了自动故障转移和高可用性保证。在下一章中,我们将学习热点数据隔离,了解如何优化热点数据的访问性能。

Prev
主从复制机制
Next
内存优化与 GC 调优