HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串与 SDS 实现
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

主从复制机制

学习目标

  • 深入理解 Redis 主从复制的原理和协议
  • 掌握全量同步和增量同步的实现机制
  • 实现复制偏移量和积压缓冲区的管理
  • 理解心跳检测和断线重连机制
  • 掌握主从切换流程和故障恢复

主从复制概述

1. 主从复制架构

Redis 主从复制通过异步复制实现数据同步,提供高可用性和读写分离:

┌─────────────────────────────────────────────────────────────┐
│                   主从复制架构                              │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   主节点    │    │   从节点1   │    │   从节点2   │     │
│  │  (Master)   │    │  (Slave)    │    │  (Slave)    │     │
│  │             │    │             │    │             │     │
│  │  ┌───────┐  │    │  ┌───────┐  │    │  ┌───────┐  │     │
│  │  │ 数据  │  │    │  │ 数据  │  │    │  │ 数据  │  │     │
│  │  │ 写入  │  │    │  │ 同步  │  │    │  │ 同步  │  │     │
│  │  └───────┘  │    │  └───────┘  │    │  └───────┘  │     │
│  │  ┌───────┐  │    │  ┌───────┐  │    │  ┌───────┐  │     │
│  │  │ 复制  │  │    │  │ 复制  │  │    │  │ 复制  │  │     │
│  │  │ 日志  │  │    │  │ 偏移  │  │    │  │ 偏移  │  │     │
│  │  └───────┘  │    │  └───────┘  │    │  └───────┘  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│           │                   │                   │        │
│           └───────────┬───────┴───────────────────┘        │
│                       │                                    │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │                复制流程                                  │ │
│  │  1. 从节点连接主节点                                     │ │
│  │  2. 发送 PSYNC 命令                                     │ │
│  │  3. 主节点判断同步方式(全量/增量)                      │ │
│  │  4. 执行数据同步                                        │ │
│  │  5. 持续增量同步                                        │ │
│  └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

2. 复制状态机

状态描述转换条件
REPL_STATE_NONE未连接初始状态
REPL_STATE_CONNECT连接中开始连接主节点
REPL_STATE_CONNECTED已连接连接建立成功
REPL_STATE_HANDSHAKE握手发送 PING 命令
REPL_STATE_SEND_PORT发送端口发送 REPLCONF 命令
REPL_STATE_SEND_IP发送 IP发送 REPLCONF 命令
REPL_STATE_SEND_CAPA发送能力发送 REPLCONF 命令
REPL_STATE_RECEIVE_PONG接收 PONG等待主节点响应
REPL_STATE_SEND_PSYNC发送 PSYNC发送同步命令
REPL_STATE_RECEIVE_PSYNC接收 PSYNC等待同步响应
REPL_STATE_TRANSFER传输数据执行数据同步
REPL_STATE_CONNECTED已连接同步完成

️ Go 语言主从复制实现

1. 主节点实现

// replication/master.go
package replication

import (
    "context"
    "fmt"
    "net"
    "sync"
    "time"
)

// 主节点
type Master struct {
    addr        string
    slaves      map[string]*SlaveConnection
    mu          sync.RWMutex
    replicationLog *ReplicationLog
    isRunning   bool
    stopCh      chan struct{}
    wg          sync.WaitGroup
}

// 从节点连接
type SlaveConnection struct {
    conn        net.Conn
    addr        string
    port        int
    ip          string
    capabilities []string
    offset      int64
    lastPing    time.Time
    mu          sync.RWMutex
}

// 复制日志
type ReplicationLog struct {
    commands   []ReplicationCommand
    offset     int64
    mu         sync.RWMutex
}

// 复制命令
type ReplicationCommand struct {
    Command string
    Args    []string
    Offset  int64
    Time    time.Time
}

// 创建主节点
func NewMaster(addr string) *Master {
    return &Master{
        addr:          addr,
        slaves:        make(map[string]*SlaveConnection),
        replicationLog: NewReplicationLog(),
        isRunning:     false,
        stopCh:        make(chan struct{}),
    }
}

// 启动主节点
func (m *Master) Start() error {
    m.mu.Lock()
    defer m.mu.Unlock()
    
    if m.isRunning {
        return fmt.Errorf("master already running")
    }
    
    // 启动监听
    listener, err := net.Listen("tcp", m.addr)
    if err != nil {
        return err
    }
    
    m.isRunning = true
    m.wg.Add(1)
    
    go m.acceptConnections(listener)
    
    return nil
}

// 接受连接
func (m *Master) acceptConnections(listener net.Listener) {
    defer m.wg.Done()
    
    for {
        select {
        case <-m.stopCh:
            listener.Close()
            return
        default:
            conn, err := listener.Accept()
            if err != nil {
                continue
            }
            
            // 处理新连接
            go m.handleConnection(conn)
        }
    }
}

// 处理连接
func (m *Master) handleConnection(conn net.Conn) {
    defer conn.Close()
    
    // 创建从节点连接
    slave := &SlaveConnection{
        conn:     conn,
        addr:     conn.RemoteAddr().String(),
        lastPing: time.Now(),
    }
    
    // 添加到从节点列表
    m.mu.Lock()
    m.slaves[slave.addr] = slave
    m.mu.Unlock()
    
    // 处理从节点命令
    m.handleSlaveCommands(slave)
}

// 处理从节点命令
func (m *Master) handleSlaveCommands(slave *SlaveConnection) {
    // 简化实现,实际应该解析 RESP 协议
    for {
        // 读取命令
        command, err := m.readCommand(slave.conn)
        if err != nil {
            break
        }
        
        // 处理命令
        if err := m.processSlaveCommand(slave, command); err != nil {
            break
        }
    }
    
    // 移除从节点
    m.mu.Lock()
    delete(m.slaves, slave.addr)
    m.mu.Unlock()
}

// 读取命令
func (m *Master) readCommand(conn net.Conn) (string, error) {
    // 简化实现,实际应该解析 RESP 协议
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err != nil {
        return "", err
    }
    
    return string(buffer[:n]), nil
}

// 处理从节点命令
func (m *Master) processSlaveCommand(slave *SlaveConnection, command string) error {
    // 简化实现,实际应该解析命令
    switch command {
    case "PING":
        return m.handlePing(slave)
    case "REPLCONF":
        return m.handleReplConf(slave)
    case "PSYNC":
        return m.handlePsync(slave)
    default:
        return fmt.Errorf("unknown command: %s", command)
    }
}

// 处理 PING 命令
func (m *Master) handlePing(slave *SlaveConnection) error {
    slave.mu.Lock()
    slave.lastPing = time.Now()
    slave.mu.Unlock()
    
    // 发送 PONG 响应
    _, err := slave.conn.Write([]byte("+PONG\r\n"))
    return err
}

// 处理 REPLCONF 命令
func (m *Master) handleReplConf(slave *SlaveConnection) error {
    // 简化实现,实际应该解析参数
    // 发送 OK 响应
    _, err := slave.conn.Write([]byte("+OK\r\n"))
    return err
}

// 处理 PSYNC 命令
func (m *Master) handlePsync(slave *SlaveConnection) error {
    // 简化实现,实际应该解析参数
    // 发送同步响应
    response := fmt.Sprintf("+FULLRESYNC %s %d\r\n", "master_id", 0)
    if _, err := slave.conn.Write([]byte(response)); err != nil {
        return err
    }
    
    // 执行全量同步
    return m.performFullSync(slave)
}

// 执行全量同步
func (m *Master) performFullSync(slave *SlaveConnection) error {
    // 简化实现,实际应该发送 RDB 文件
    rdbData := []byte("REDIS0001sample_rdb_data")
    
    // 发送 RDB 数据长度
    length := fmt.Sprintf("$%d\r\n", len(rdbData))
    if _, err := slave.conn.Write([]byte(length)); err != nil {
        return err
    }
    
    // 发送 RDB 数据
    if _, err := slave.conn.Write(rdbData); err != nil {
        return err
    }
    
    // 发送结束标识
    if _, err := slave.conn.Write([]byte("\r\n")); err != nil {
        return err
    }
    
    return nil
}

// 执行命令
func (m *Master) ExecuteCommand(command string, args []string) error {
    // 执行命令
    // 这里应该调用实际的命令执行逻辑
    
    // 记录到复制日志
    m.replicationLog.AddCommand(command, args)
    
    // 发送给所有从节点
    m.broadcastCommand(command, args)
    
    return nil
}

// 广播命令
func (m *Master) broadcastCommand(command string, args []string) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    
    for _, slave := range m.slaves {
        go m.sendCommandToSlave(slave, command, args)
    }
}

// 发送命令到从节点
func (m *Master) sendCommandToSlave(slave *SlaveConnection, command string, args []string) {
    // 构建 RESP 格式的命令
    resp := m.buildRESPCommand(command, args)
    
    // 发送命令
    if _, err := slave.conn.Write([]byte(resp)); err != nil {
        // 连接断开,移除从节点
        m.mu.Lock()
        delete(m.slaves, slave.addr)
        m.mu.Unlock()
    }
}

// 构建 RESP 格式命令
func (m *Master) buildRESPCommand(command string, args []string) string {
    var result string
    
    // 数组开始
    result += fmt.Sprintf("*%d\r\n", len(args)+1)
    
    // 命令
    result += fmt.Sprintf("$%d\r\n%s\r\n", len(command), command)
    
    // 参数
    for _, arg := range args {
        result += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
    }
    
    return result
}

// 停止主节点
func (m *Master) Stop() {
    m.mu.Lock()
    defer m.mu.Unlock()
    
    if !m.isRunning {
        return
    }
    
    close(m.stopCh)
    m.wg.Wait()
    m.isRunning = false
}

// 获取从节点数量
func (m *Master) GetSlaveCount() int {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return len(m.slaves)
}

// 获取复制状态
func (m *Master) GetReplicationStatus() map[string]interface{} {
    m.mu.RLock()
    defer m.mu.RUnlock()
    
    return map[string]interface{}{
        "role":           "master",
        "connected_slaves": len(m.slaves),
        "replication_log_length": m.replicationLog.GetLength(),
        "replication_offset": m.replicationLog.GetOffset(),
    }
}

2. 从节点实现

// replication/slave.go
package replication

import (
    "context"
    "fmt"
    "net"
    "sync"
    "time"
)

// 从节点
type Slave struct {
    masterAddr string
    conn       net.Conn
    isRunning  bool
    stopCh     chan struct{}
    wg         sync.WaitGroup
    mu         sync.RWMutex
    offset     int64
    lastPing   time.Time
}

// 创建从节点
func NewSlave(masterAddr string) *Slave {
    return &Slave{
        masterAddr: masterAddr,
        isRunning:  false,
        stopCh:     make(chan struct{}),
        lastPing:   time.Now(),
    }
}

// 启动从节点
func (s *Slave) Start() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if s.isRunning {
        return fmt.Errorf("slave already running")
    }
    
    // 连接主节点
    conn, err := net.Dial("tcp", s.masterAddr)
    if err != nil {
        return err
    }
    
    s.conn = conn
    s.isRunning = true
    s.wg.Add(1)
    
    go s.replicationLoop()
    
    return nil
}

// 复制循环
func (s *Slave) replicationLoop() {
    defer s.wg.Done()
    
    // 发送复制握手
    if err := s.sendHandshake(); err != nil {
        fmt.Printf("Failed to send handshake: %v\n", err)
        return
    }
    
    // 发送 PSYNC 命令
    if err := s.sendPsync(); err != nil {
        fmt.Printf("Failed to send PSYNC: %v\n", err)
        return
    }
    
    // 处理主节点响应
    s.handleMasterResponse()
}

// 发送握手
func (s *Slave) sendHandshake() error {
    // 发送 PING
    if err := s.sendCommand("PING"); err != nil {
        return err
    }
    
    // 等待 PONG 响应
    if err := s.waitForResponse("PONG"); err != nil {
        return err
    }
    
    // 发送 REPLCONF 命令
    if err := s.sendCommand("REPLCONF", "listening-port", "6380"); err != nil {
        return err
    }
    
    if err := s.waitForResponse("OK"); err != nil {
        return err
    }
    
    // 发送 REPLCONF 命令
    if err := s.sendCommand("REPLCONF", "capa", "eof"); err != nil {
        return err
    }
    
    if err := s.waitForResponse("OK"); err != nil {
        return err
    }
    
    return nil
}

// 发送 PSYNC 命令
func (s *Slave) sendPsync() error {
    return s.sendCommand("PSYNC", "?", "-1")
}

// 处理主节点响应
func (s *Slave) handleMasterResponse() {
    for {
        select {
        case <-s.stopCh:
            return
        default:
            // 读取响应
            response, err := s.readResponse()
            if err != nil {
                fmt.Printf("Failed to read response: %v\n", err)
                return
            }
            
            // 处理响应
            if err := s.processResponse(response); err != nil {
                fmt.Printf("Failed to process response: %v\n", err)
                return
            }
        }
    }
}

// 发送命令
func (s *Slave) sendCommand(command string, args ...string) error {
    // 构建 RESP 格式命令
    resp := s.buildRESPCommand(command, args)
    
    // 发送命令
    _, err := s.conn.Write([]byte(resp))
    return err
}

// 构建 RESP 格式命令
func (s *Slave) buildRESPCommand(command string, args []string) string {
    var result string
    
    // 数组开始
    result += fmt.Sprintf("*%d\r\n", len(args)+1)
    
    // 命令
    result += fmt.Sprintf("$%d\r\n%s\r\n", len(command), command)
    
    // 参数
    for _, arg := range args {
        result += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
    }
    
    return result
}

// 读取响应
func (s *Slave) readResponse() (string, error) {
    buffer := make([]byte, 1024)
    n, err := s.conn.Read(buffer)
    if err != nil {
        return "", err
    }
    
    return string(buffer[:n]), nil
}

// 等待响应
func (s *Slave) waitForResponse(expected string) error {
    response, err := s.readResponse()
    if err != nil {
        return err
    }
    
    if response != expected {
        return fmt.Errorf("unexpected response: %s", response)
    }
    
    return nil
}

// 处理响应
func (s *Slave) processResponse(response string) error {
    // 简化实现,实际应该解析 RESP 协议
    fmt.Printf("Received response: %s\n", response)
    
    // 更新最后 ping 时间
    s.mu.Lock()
    s.lastPing = time.Now()
    s.mu.Unlock()
    
    return nil
}

// 停止从节点
func (s *Slave) Stop() {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if !s.isRunning {
        return
    }
    
    close(s.stopCh)
    s.wg.Wait()
    
    if s.conn != nil {
        s.conn.Close()
    }
    
    s.isRunning = false
}

// 获取复制状态
func (s *Slave) GetReplicationStatus() map[string]interface{} {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    return map[string]interface{}{
        "role":           "slave",
        "master_host":    s.masterAddr,
        "master_port":    6379,
        "master_link_status": "up",
        "slave_repl_offset": s.offset,
        "last_ping":      s.lastPing,
    }
}

3. 复制日志实现

// replication/replication_log.go
package replication

import (
    "sync"
    "time"
)

// 创建复制日志
func NewReplicationLog() *ReplicationLog {
    return &ReplicationLog{
        commands: make([]ReplicationCommand, 0),
        offset:   0,
    }
}

// 添加命令
func (rl *ReplicationLog) AddCommand(command string, args []string) {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    
    cmd := ReplicationCommand{
        Command: command,
        Args:    args,
        Offset:  rl.offset,
        Time:    time.Now(),
    }
    
    rl.commands = append(rl.commands, cmd)
    rl.offset++
}

// 获取命令
func (rl *ReplicationLog) GetCommand(offset int64) *ReplicationCommand {
    rl.mu.RLock()
    defer rl.mu.RUnlock()
    
    if offset < 0 || offset >= int64(len(rl.commands)) {
        return nil
    }
    
    return &rl.commands[offset]
}

// 获取命令范围
func (rl *ReplicationLog) GetCommands(start, end int64) []ReplicationCommand {
    rl.mu.RLock()
    defer rl.mu.RUnlock()
    
    if start < 0 || end >= int64(len(rl.commands)) || start > end {
        return nil
    }
    
    return rl.commands[start:end+1]
}

// 获取长度
func (rl *ReplicationLog) GetLength() int64 {
    rl.mu.RLock()
    defer rl.mu.RUnlock()
    return int64(len(rl.commands))
}

// 获取偏移量
func (rl *ReplicationLog) GetOffset() int64 {
    rl.mu.RLock()
    defer rl.mu.RUnlock()
    return rl.offset
}

// 清理过期命令
func (rl *ReplicationLog) CleanupExpired(maxAge time.Duration) {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    
    cutoff := time.Now().Add(-maxAge)
    
    // 找到第一个未过期的命令
    start := 0
    for i, cmd := range rl.commands {
        if cmd.Time.After(cutoff) {
            start = i
            break
        }
    }
    
    // 保留未过期的命令
    rl.commands = rl.commands[start:]
}

4. 心跳检测实现

// replication/heartbeat.go
package replication

import (
    "context"
    "sync"
    "time"
)

// 心跳检测器
type HeartbeatChecker struct {
    master    *Master
    interval  time.Duration
    timeout   time.Duration
    isRunning bool
    stopCh    chan struct{}
    wg        sync.WaitGroup
    mu        sync.RWMutex
}

// 创建心跳检测器
func NewHeartbeatChecker(master *Master, interval, timeout time.Duration) *HeartbeatChecker {
    return &HeartbeatChecker{
        master:   master,
        interval: interval,
        timeout:  timeout,
        stopCh:   make(chan struct{}),
    }
}

// 启动心跳检测
func (hc *HeartbeatChecker) Start() error {
    hc.mu.Lock()
    defer hc.mu.Unlock()
    
    if hc.isRunning {
        return fmt.Errorf("heartbeat checker already running")
    }
    
    hc.isRunning = true
    hc.wg.Add(1)
    
    go hc.heartbeatLoop()
    
    return nil
}

// 心跳循环
func (hc *HeartbeatChecker) heartbeatLoop() {
    defer hc.wg.Done()
    
    ticker := time.NewTicker(hc.interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            hc.checkHeartbeats()
        case <-hc.stopCh:
            return
        }
    }
}

// 检查心跳
func (hc *HeartbeatChecker) checkHeartbeats() {
    hc.master.mu.RLock()
    slaves := make([]*SlaveConnection, 0, len(hc.master.slaves))
    for _, slave := range hc.master.slaves {
        slaves = append(slaves, slave)
    }
    hc.master.mu.RUnlock()
    
    for _, slave := range slaves {
        if hc.isSlaveTimeout(slave) {
            hc.handleSlaveTimeout(slave)
        }
    }
}

// 检查从节点是否超时
func (hc *HeartbeatChecker) isSlaveTimeout(slave *SlaveConnection) bool {
    slave.mu.RLock()
    defer slave.mu.RUnlock()
    
    return time.Since(slave.lastPing) > hc.timeout
}

// 处理从节点超时
func (hc *HeartbeatChecker) handleSlaveTimeout(slave *SlaveConnection) {
    // 关闭连接
    slave.conn.Close()
    
    // 从主节点中移除
    hc.master.mu.Lock()
    delete(hc.master.slaves, slave.addr)
    hc.master.mu.Unlock()
    
    fmt.Printf("Slave %s timed out and removed\n", slave.addr)
}

// 停止心跳检测
func (hc *HeartbeatChecker) Stop() {
    hc.mu.Lock()
    defer hc.mu.Unlock()
    
    if !hc.isRunning {
        return
    }
    
    close(hc.stopCh)
    hc.wg.Wait()
    hc.isRunning = false
}

5. 断线重连实现

// replication/reconnect.go
package replication

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 重连管理器
type ReconnectManager struct {
    slave      *Slave
    maxRetries int
    interval   time.Duration
    isRunning  bool
    stopCh     chan struct{}
    wg         sync.WaitGroup
    mu         sync.RWMutex
}

// 创建重连管理器
func NewReconnectManager(slave *Slave, maxRetries int, interval time.Duration) *ReconnectManager {
    return &ReconnectManager{
        slave:      slave,
        maxRetries: maxRetries,
        interval:   interval,
        stopCh:     make(chan struct{}),
    }
}

// 启动重连管理器
func (rm *ReconnectManager) Start() error {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    if rm.isRunning {
        return fmt.Errorf("reconnect manager already running")
    }
    
    rm.isRunning = true
    rm.wg.Add(1)
    
    go rm.reconnectLoop()
    
    return nil
}

// 重连循环
func (rm *ReconnectManager) reconnectLoop() {
    defer rm.wg.Done()
    
    for {
        select {
        case <-rm.stopCh:
            return
        default:
            // 检查连接状态
            if rm.isSlaveConnected() {
                time.Sleep(rm.interval)
                continue
            }
            
            // 尝试重连
            if err := rm.attemptReconnect(); err != nil {
                fmt.Printf("Reconnect failed: %v\n", err)
                time.Sleep(rm.interval)
                continue
            }
            
            fmt.Println("Reconnected successfully")
        }
    }
}

// 检查从节点是否连接
func (rm *ReconnectManager) isSlaveConnected() bool {
    rm.slave.mu.RLock()
    defer rm.slave.mu.RUnlock()
    
    return rm.slave.isRunning && rm.slave.conn != nil
}

// 尝试重连
func (rm *ReconnectManager) attemptReconnect() error {
    // 停止当前连接
    rm.slave.Stop()
    
    // 等待一段时间
    time.Sleep(time.Second)
    
    // 重新启动
    return rm.slave.Start()
}

// 停止重连管理器
func (rm *ReconnectManager) Stop() {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    if !rm.isRunning {
        return
    }
    
    close(rm.stopCh)
    rm.wg.Wait()
    rm.isRunning = false
}

6. 主从切换实现

// replication/failover.go
package replication

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 主从切换管理器
type FailoverManager struct {
    master    *Master
    slaves    []*Slave
    isRunning bool
    stopCh    chan struct{}
    wg        sync.WaitGroup
    mu        sync.RWMutex
}

// 创建主从切换管理器
func NewFailoverManager(master *Master, slaves []*Slave) *FailoverManager {
    return &FailoverManager{
        master: master,
        slaves: slaves,
        stopCh: make(chan struct{}),
    }
}

// 启动主从切换管理器
func (fm *FailoverManager) Start() error {
    fm.mu.Lock()
    defer fm.mu.Unlock()
    
    if fm.isRunning {
        return fmt.Errorf("failover manager already running")
    }
    
    fm.isRunning = true
    fm.wg.Add(1)
    
    go fm.monitorLoop()
    
    return nil
}

// 监控循环
func (fm *FailoverManager) monitorLoop() {
    defer fm.wg.Done()
    
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            fm.checkMasterHealth()
        case <-fm.stopCh:
            return
        }
    }
}

// 检查主节点健康状态
func (fm *FailoverManager) checkMasterHealth() {
    // 简化实现,实际应该检查主节点是否响应
    if !fm.isMasterHealthy() {
        fm.performFailover()
    }
}

// 检查主节点是否健康
func (fm *FailoverManager) isMasterHealthy() bool {
    // 简化实现,实际应该发送 PING 命令
    return true
}

// 执行主从切换
func (fm *FailoverManager) performFailover() {
    fmt.Println("Master is down, performing failover...")
    
    // 选择新的主节点
    newMaster := fm.selectNewMaster()
    if newMaster == nil {
        fmt.Println("No suitable slave found for failover")
        return
    }
    
    // 执行切换
    if err := fm.executeFailover(newMaster); err != nil {
        fmt.Printf("Failover failed: %v\n", err)
        return
    }
    
    fmt.Println("Failover completed successfully")
}

// 选择新的主节点
func (fm *FailoverManager) selectNewMaster() *Slave {
    // 简化实现,选择第一个从节点
    if len(fm.slaves) > 0 {
        return fm.slaves[0]
    }
    
    return nil
}

// 执行切换
func (fm *FailoverManager) executeFailover(newMaster *Slave) error {
    // 1. 停止旧主节点
    fm.master.Stop()
    
    // 2. 提升从节点为主节点
    if err := fm.promoteSlaveToMaster(newMaster); err != nil {
        return err
    }
    
    // 3. 更新其他从节点
    if err := fm.updateOtherSlaves(newMaster); err != nil {
        return err
    }
    
    return nil
}

// 提升从节点为主节点
func (fm *FailoverManager) promoteSlaveToMaster(slave *Slave) error {
    // 简化实现,实际应该修改从节点配置
    fmt.Printf("Promoting slave %s to master\n", slave.masterAddr)
    return nil
}

// 更新其他从节点
func (fm *FailoverManager) updateOtherSlaves(newMaster *Slave) error {
    // 简化实现,实际应该更新从节点配置
    fmt.Println("Updating other slaves to point to new master")
    return nil
}

// 停止主从切换管理器
func (fm *FailoverManager) Stop() {
    fm.mu.Lock()
    defer fm.mu.Unlock()
    
    if !fm.isRunning {
        return
    }
    
    close(fm.stopCh)
    fm.wg.Wait()
    fm.isRunning = false
}

测试验证

1. 单元测试

// replication/replication_test.go
package replication

import (
    "context"
    "testing"
    "time"
)

func TestMasterSlaveReplication(t *testing.T) {
    // 创建主节点
    master := NewMaster(":6380")
    if err := master.Start(); err != nil {
        t.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    // 创建从节点
    slave := NewSlave(":6380")
    if err := slave.Start(); err != nil {
        t.Fatalf("Failed to start slave: %v", err)
    }
    defer slave.Stop()
    
    // 等待连接建立
    time.Sleep(time.Second)
    
    // 检查从节点数量
    if master.GetSlaveCount() != 1 {
        t.Errorf("Expected 1 slave, got %d", master.GetSlaveCount())
    }
    
    // 执行命令
    if err := master.ExecuteCommand("SET", []string{"key1", "value1"}); err != nil {
        t.Fatalf("Failed to execute command: %v", err)
    }
    
    // 等待同步
    time.Sleep(time.Second)
    
    // 检查复制状态
    status := master.GetReplicationStatus()
    if status["role"].(string) != "master" {
        t.Errorf("Expected role 'master', got %s", status["role"])
    }
}

func TestReplicationLog(t *testing.T) {
    log := NewReplicationLog()
    
    // 添加命令
    log.AddCommand("SET", []string{"key1", "value1"})
    log.AddCommand("GET", []string{"key1"})
    
    // 检查长度
    if log.GetLength() != 2 {
        t.Errorf("Expected length 2, got %d", log.GetLength())
    }
    
    // 检查偏移量
    if log.GetOffset() != 2 {
        t.Errorf("Expected offset 2, got %d", log.GetOffset())
    }
    
    // 获取命令
    cmd := log.GetCommand(0)
    if cmd == nil {
        t.Error("Expected command, got nil")
    }
    
    if cmd.Command != "SET" {
        t.Errorf("Expected command 'SET', got %s", cmd.Command)
    }
}

func TestHeartbeatChecker(t *testing.T) {
    master := NewMaster(":6381")
    if err := master.Start(); err != nil {
        t.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    // 创建心跳检测器
    checker := NewHeartbeatChecker(master, time.Second, time.Second*2)
    if err := checker.Start(); err != nil {
        t.Fatalf("Failed to start heartbeat checker: %v", err)
    }
    defer checker.Stop()
    
    // 等待检测
    time.Sleep(time.Second * 3)
}

func TestReconnectManager(t *testing.T) {
    slave := NewSlave(":6382")
    
    // 创建重连管理器
    manager := NewReconnectManager(slave, 3, time.Second)
    if err := manager.Start(); err != nil {
        t.Fatalf("Failed to start reconnect manager: %v", err)
    }
    defer manager.Stop()
    
    // 等待重连
    time.Sleep(time.Second * 2)
}

func TestFailoverManager(t *testing.T) {
    master := NewMaster(":6383")
    if err := master.Start(); err != nil {
        t.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    slave := NewSlave(":6383")
    if err := slave.Start(); err != nil {
        t.Fatalf("Failed to start slave: %v", err)
    }
    defer slave.Stop()
    
    // 创建主从切换管理器
    failover := NewFailoverManager(master, []*Slave{slave})
    if err := failover.Start(); err != nil {
        t.Fatalf("Failed to start failover manager: %v", err)
    }
    defer failover.Stop()
    
    // 等待监控
    time.Sleep(time.Second * 2)
}

2. 性能基准测试

// replication/benchmark_test.go
package replication

import (
    "testing"
    "time"
)

func BenchmarkMasterExecuteCommand(b *testing.B) {
    master := NewMaster(":6390")
    if err := master.Start(); err != nil {
        b.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        command := fmt.Sprintf("SET key%d value%d", i, i)
        master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
    }
}

func BenchmarkSlaveReplication(b *testing.B) {
    master := NewMaster(":6391")
    if err := master.Start(); err != nil {
        b.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    slave := NewSlave(":6391")
    if err := slave.Start(); err != nil {
        b.Fatalf("Failed to start slave: %v", err)
    }
    defer slave.Stop()
    
    // 等待连接建立
    time.Sleep(time.Second)
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
    }
}

func BenchmarkReplicationLog(b *testing.B) {
    log := NewReplicationLog()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        log.AddCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
    }
}

func BenchmarkHeartbeatChecker(b *testing.B) {
    master := NewMaster(":6392")
    if err := master.Start(); err != nil {
        b.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    checker := NewHeartbeatChecker(master, time.Second, time.Second*2)
    if err := checker.Start(); err != nil {
        b.Fatalf("Failed to start heartbeat checker: %v", err)
    }
    defer checker.Stop()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // 模拟心跳检查
        time.Sleep(time.Millisecond)
    }
}

3. 并发测试

// replication/concurrent_test.go
package replication

import (
    "sync"
    "testing"
    "time"
)

func TestMasterSlaveConcurrent(t *testing.T) {
    master := NewMaster(":6400")
    if err := master.Start(); err != nil {
        t.Fatalf("Failed to start master: %v", err)
    }
    defer master.Stop()
    
    // 创建多个从节点
    const numSlaves = 5
    slaves := make([]*Slave, numSlaves)
    
    for i := 0; i < numSlaves; i++ {
        slave := NewSlave(":6400")
        if err := slave.Start(); err != nil {
            t.Fatalf("Failed to start slave %d: %v", i, err)
        }
        slaves[i] = slave
    }
    
    // 等待连接建立
    time.Sleep(time.Second)
    
    // 检查从节点数量
    if master.GetSlaveCount() != numSlaves {
        t.Errorf("Expected %d slaves, got %d", numSlaves, master.GetSlaveCount())
    }
    
    // 并发执行命令
    const numCommands = 100
    var wg sync.WaitGroup
    
    for i := 0; i < numCommands; i++ {
        wg.Add(1)
        go func(cmdID int) {
            defer wg.Done()
            
            key := fmt.Sprintf("key%d", cmdID)
            value := fmt.Sprintf("value%d", cmdID)
            
            if err := master.ExecuteCommand("SET", []string{key, value}); err != nil {
                t.Errorf("Failed to execute command %d: %v", cmdID, err)
            }
        }(i)
    }
    
    wg.Wait()
    
    // 等待同步
    time.Sleep(time.Second)
    
    // 检查复制状态
    status := master.GetReplicationStatus()
    if status["connected_slaves"].(int) != numSlaves {
        t.Errorf("Expected %d connected slaves, got %d", numSlaves, status["connected_slaves"])
    }
    
    // 停止从节点
    for _, slave := range slaves {
        slave.Stop()
    }
}

func TestReplicationLogConcurrent(t *testing.T) {
    log := NewReplicationLog()
    
    const numGoroutines = 10
    const numCommands = 100
    
    var wg sync.WaitGroup
    
    // 并发添加命令
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numCommands; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                value := fmt.Sprintf("value_%d_%d", goroutineID, j)
                
                log.AddCommand("SET", []string{key, value})
            }
        }(i)
    }
    
    wg.Wait()
    
    // 检查结果
    expectedLength := int64(numGoroutines * numCommands)
    if log.GetLength() != expectedLength {
        t.Errorf("Expected length %d, got %d", expectedLength, log.GetLength())
    }
    
    if log.GetOffset() != expectedLength {
        t.Errorf("Expected offset %d, got %d", expectedLength, log.GetOffset())
    }
}

性能对比分析

1. 同步方式对比

同步方式优点缺点适用场景
全量同步数据完整网络开销大首次同步
增量同步网络开销小实现复杂持续同步
混合同步平衡性能实现复杂生产环境

2. 复制策略对比

策略性能一致性可用性复杂度
异步复制高最终一致高低
同步复制低强一致中等中等
半同步复制中等中等高高

3. 性能测试结果

// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
    // 主节点性能
    b.Run("Master", func(b *testing.B) {
        master := NewMaster(":6500")
        master.Start()
        defer master.Stop()
        
        for i := 0; i < b.N; i++ {
            master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
        }
    })
    
    // 从节点性能
    b.Run("Slave", func(b *testing.B) {
        master := NewMaster(":6501")
        master.Start()
        defer master.Stop()
        
        slave := NewSlave(":6501")
        slave.Start()
        defer slave.Stop()
        
        for i := 0; i < b.N; i++ {
            master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
        }
    })
    
    // 复制日志性能
    b.Run("ReplicationLog", func(b *testing.B) {
        log := NewReplicationLog()
        
        for i := 0; i < b.N; i++ {
            log.AddCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
        }
    })
}

面试要点

1. 主从复制的原理

答案要点:

  • 异步复制:主节点执行命令后异步发送给从节点
  • 复制日志:记录所有写操作,用于增量同步
  • 复制偏移量:标识复制进度,用于断点续传
  • 心跳检测:监控从节点状态,处理断线重连

2. 全量同步 vs 增量同步

答案要点:

  • 全量同步:发送完整 RDB 文件,数据完整但开销大
  • 增量同步:发送复制日志,开销小但实现复杂
  • 选择策略:首次同步用全量,后续用增量
  • 优化技巧:积压缓冲区、压缩传输、批量发送

3. 主从切换的实现

答案要点:

  • 故障检测:心跳检测、超时判断、健康检查
  • 选主算法:优先级、偏移量、运行时间
  • 数据一致性:确保新主节点数据最新
  • 服务切换:更新配置、重定向流量

4. 复制延迟的优化

答案要点:

  • 网络优化:减少网络延迟、增加带宽
  • 批量传输:合并多个命令一起发送
  • 压缩传输:使用压缩算法减少数据量
  • 并行复制:多个从节点并行同步

总结

通过本章学习,我们深入理解了:

  1. Redis 主从复制的原理和协议实现
  2. 全量同步和增量同步的完整机制
  3. 复制偏移量和积压缓冲区的管理
  4. 心跳检测和断线重连的实现
  5. 主从切换流程和故障恢复

主从复制机制为 Redis 提供了高可用性和读写分离能力。在下一章中,我们将学习哨兵模式实现,了解如何实现自动故障转移。

Prev
缓存一致性策略
Next
哨兵模式实现