主从复制机制
学习目标
- 深入理解 Redis 主从复制的原理和协议
- 掌握全量同步和增量同步的实现机制
- 实现复制偏移量和积压缓冲区的管理
- 理解心跳检测和断线重连机制
- 掌握主从切换流程和故障恢复
主从复制概述
1. 主从复制架构
Redis 主从复制通过异步复制实现数据同步,提供高可用性和读写分离:
┌─────────────────────────────────────────────────────────────┐
│ 主从复制架构 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 主节点 │ │ 从节点1 │ │ 从节点2 │ │
│ │ (Master) │ │ (Slave) │ │ (Slave) │ │
│ │ │ │ │ │ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ 数据 │ │ │ │ 数据 │ │ │ │ 数据 │ │ │
│ │ │ 写入 │ │ │ │ 同步 │ │ │ │ 同步 │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ 复制 │ │ │ │ 复制 │ │ │ │ 复制 │ │ │
│ │ │ 日志 │ │ │ │ 偏移 │ │ │ │ 偏移 │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────┬───────┴───────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 复制流程 │ │
│ │ 1. 从节点连接主节点 │ │
│ │ 2. 发送 PSYNC 命令 │ │
│ │ 3. 主节点判断同步方式(全量/增量) │ │
│ │ 4. 执行数据同步 │ │
│ │ 5. 持续增量同步 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. 复制状态机
状态 | 描述 | 转换条件 |
---|---|---|
REPL_STATE_NONE | 未连接 | 初始状态 |
REPL_STATE_CONNECT | 连接中 | 开始连接主节点 |
REPL_STATE_CONNECTED | 已连接 | 连接建立成功 |
REPL_STATE_HANDSHAKE | 握手 | 发送 PING 命令 |
REPL_STATE_SEND_PORT | 发送端口 | 发送 REPLCONF 命令 |
REPL_STATE_SEND_IP | 发送 IP | 发送 REPLCONF 命令 |
REPL_STATE_SEND_CAPA | 发送能力 | 发送 REPLCONF 命令 |
REPL_STATE_RECEIVE_PONG | 接收 PONG | 等待主节点响应 |
REPL_STATE_SEND_PSYNC | 发送 PSYNC | 发送同步命令 |
REPL_STATE_RECEIVE_PSYNC | 接收 PSYNC | 等待同步响应 |
REPL_STATE_TRANSFER | 传输数据 | 执行数据同步 |
REPL_STATE_CONNECTED | 已连接 | 同步完成 |
️ Go 语言主从复制实现
1. 主节点实现
// replication/master.go
package replication
import (
"context"
"fmt"
"net"
"sync"
"time"
)
// 主节点
type Master struct {
addr string
slaves map[string]*SlaveConnection
mu sync.RWMutex
replicationLog *ReplicationLog
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
}
// 从节点连接
type SlaveConnection struct {
conn net.Conn
addr string
port int
ip string
capabilities []string
offset int64
lastPing time.Time
mu sync.RWMutex
}
// 复制日志
type ReplicationLog struct {
commands []ReplicationCommand
offset int64
mu sync.RWMutex
}
// 复制命令
type ReplicationCommand struct {
Command string
Args []string
Offset int64
Time time.Time
}
// 创建主节点
func NewMaster(addr string) *Master {
return &Master{
addr: addr,
slaves: make(map[string]*SlaveConnection),
replicationLog: NewReplicationLog(),
isRunning: false,
stopCh: make(chan struct{}),
}
}
// 启动主节点
func (m *Master) Start() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.isRunning {
return fmt.Errorf("master already running")
}
// 启动监听
listener, err := net.Listen("tcp", m.addr)
if err != nil {
return err
}
m.isRunning = true
m.wg.Add(1)
go m.acceptConnections(listener)
return nil
}
// 接受连接
func (m *Master) acceptConnections(listener net.Listener) {
defer m.wg.Done()
for {
select {
case <-m.stopCh:
listener.Close()
return
default:
conn, err := listener.Accept()
if err != nil {
continue
}
// 处理新连接
go m.handleConnection(conn)
}
}
}
// 处理连接
func (m *Master) handleConnection(conn net.Conn) {
defer conn.Close()
// 创建从节点连接
slave := &SlaveConnection{
conn: conn,
addr: conn.RemoteAddr().String(),
lastPing: time.Now(),
}
// 添加到从节点列表
m.mu.Lock()
m.slaves[slave.addr] = slave
m.mu.Unlock()
// 处理从节点命令
m.handleSlaveCommands(slave)
}
// 处理从节点命令
func (m *Master) handleSlaveCommands(slave *SlaveConnection) {
// 简化实现,实际应该解析 RESP 协议
for {
// 读取命令
command, err := m.readCommand(slave.conn)
if err != nil {
break
}
// 处理命令
if err := m.processSlaveCommand(slave, command); err != nil {
break
}
}
// 移除从节点
m.mu.Lock()
delete(m.slaves, slave.addr)
m.mu.Unlock()
}
// 读取命令
func (m *Master) readCommand(conn net.Conn) (string, error) {
// 简化实现,实际应该解析 RESP 协议
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
return "", err
}
return string(buffer[:n]), nil
}
// 处理从节点命令
func (m *Master) processSlaveCommand(slave *SlaveConnection, command string) error {
// 简化实现,实际应该解析命令
switch command {
case "PING":
return m.handlePing(slave)
case "REPLCONF":
return m.handleReplConf(slave)
case "PSYNC":
return m.handlePsync(slave)
default:
return fmt.Errorf("unknown command: %s", command)
}
}
// 处理 PING 命令
func (m *Master) handlePing(slave *SlaveConnection) error {
slave.mu.Lock()
slave.lastPing = time.Now()
slave.mu.Unlock()
// 发送 PONG 响应
_, err := slave.conn.Write([]byte("+PONG\r\n"))
return err
}
// 处理 REPLCONF 命令
func (m *Master) handleReplConf(slave *SlaveConnection) error {
// 简化实现,实际应该解析参数
// 发送 OK 响应
_, err := slave.conn.Write([]byte("+OK\r\n"))
return err
}
// 处理 PSYNC 命令
func (m *Master) handlePsync(slave *SlaveConnection) error {
// 简化实现,实际应该解析参数
// 发送同步响应
response := fmt.Sprintf("+FULLRESYNC %s %d\r\n", "master_id", 0)
if _, err := slave.conn.Write([]byte(response)); err != nil {
return err
}
// 执行全量同步
return m.performFullSync(slave)
}
// 执行全量同步
func (m *Master) performFullSync(slave *SlaveConnection) error {
// 简化实现,实际应该发送 RDB 文件
rdbData := []byte("REDIS0001sample_rdb_data")
// 发送 RDB 数据长度
length := fmt.Sprintf("$%d\r\n", len(rdbData))
if _, err := slave.conn.Write([]byte(length)); err != nil {
return err
}
// 发送 RDB 数据
if _, err := slave.conn.Write(rdbData); err != nil {
return err
}
// 发送结束标识
if _, err := slave.conn.Write([]byte("\r\n")); err != nil {
return err
}
return nil
}
// 执行命令
func (m *Master) ExecuteCommand(command string, args []string) error {
// 执行命令
// 这里应该调用实际的命令执行逻辑
// 记录到复制日志
m.replicationLog.AddCommand(command, args)
// 发送给所有从节点
m.broadcastCommand(command, args)
return nil
}
// 广播命令
func (m *Master) broadcastCommand(command string, args []string) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, slave := range m.slaves {
go m.sendCommandToSlave(slave, command, args)
}
}
// 发送命令到从节点
func (m *Master) sendCommandToSlave(slave *SlaveConnection, command string, args []string) {
// 构建 RESP 格式的命令
resp := m.buildRESPCommand(command, args)
// 发送命令
if _, err := slave.conn.Write([]byte(resp)); err != nil {
// 连接断开,移除从节点
m.mu.Lock()
delete(m.slaves, slave.addr)
m.mu.Unlock()
}
}
// 构建 RESP 格式命令
func (m *Master) buildRESPCommand(command string, args []string) string {
var result string
// 数组开始
result += fmt.Sprintf("*%d\r\n", len(args)+1)
// 命令
result += fmt.Sprintf("$%d\r\n%s\r\n", len(command), command)
// 参数
for _, arg := range args {
result += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
}
return result
}
// 停止主节点
func (m *Master) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
if !m.isRunning {
return
}
close(m.stopCh)
m.wg.Wait()
m.isRunning = false
}
// 获取从节点数量
func (m *Master) GetSlaveCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.slaves)
}
// 获取复制状态
func (m *Master) GetReplicationStatus() map[string]interface{} {
m.mu.RLock()
defer m.mu.RUnlock()
return map[string]interface{}{
"role": "master",
"connected_slaves": len(m.slaves),
"replication_log_length": m.replicationLog.GetLength(),
"replication_offset": m.replicationLog.GetOffset(),
}
}
2. 从节点实现
// replication/slave.go
package replication
import (
"context"
"fmt"
"net"
"sync"
"time"
)
// 从节点
type Slave struct {
masterAddr string
conn net.Conn
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
offset int64
lastPing time.Time
}
// 创建从节点
func NewSlave(masterAddr string) *Slave {
return &Slave{
masterAddr: masterAddr,
isRunning: false,
stopCh: make(chan struct{}),
lastPing: time.Now(),
}
}
// 启动从节点
func (s *Slave) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.isRunning {
return fmt.Errorf("slave already running")
}
// 连接主节点
conn, err := net.Dial("tcp", s.masterAddr)
if err != nil {
return err
}
s.conn = conn
s.isRunning = true
s.wg.Add(1)
go s.replicationLoop()
return nil
}
// 复制循环
func (s *Slave) replicationLoop() {
defer s.wg.Done()
// 发送复制握手
if err := s.sendHandshake(); err != nil {
fmt.Printf("Failed to send handshake: %v\n", err)
return
}
// 发送 PSYNC 命令
if err := s.sendPsync(); err != nil {
fmt.Printf("Failed to send PSYNC: %v\n", err)
return
}
// 处理主节点响应
s.handleMasterResponse()
}
// 发送握手
func (s *Slave) sendHandshake() error {
// 发送 PING
if err := s.sendCommand("PING"); err != nil {
return err
}
// 等待 PONG 响应
if err := s.waitForResponse("PONG"); err != nil {
return err
}
// 发送 REPLCONF 命令
if err := s.sendCommand("REPLCONF", "listening-port", "6380"); err != nil {
return err
}
if err := s.waitForResponse("OK"); err != nil {
return err
}
// 发送 REPLCONF 命令
if err := s.sendCommand("REPLCONF", "capa", "eof"); err != nil {
return err
}
if err := s.waitForResponse("OK"); err != nil {
return err
}
return nil
}
// 发送 PSYNC 命令
func (s *Slave) sendPsync() error {
return s.sendCommand("PSYNC", "?", "-1")
}
// 处理主节点响应
func (s *Slave) handleMasterResponse() {
for {
select {
case <-s.stopCh:
return
default:
// 读取响应
response, err := s.readResponse()
if err != nil {
fmt.Printf("Failed to read response: %v\n", err)
return
}
// 处理响应
if err := s.processResponse(response); err != nil {
fmt.Printf("Failed to process response: %v\n", err)
return
}
}
}
}
// 发送命令
func (s *Slave) sendCommand(command string, args ...string) error {
// 构建 RESP 格式命令
resp := s.buildRESPCommand(command, args)
// 发送命令
_, err := s.conn.Write([]byte(resp))
return err
}
// 构建 RESP 格式命令
func (s *Slave) buildRESPCommand(command string, args []string) string {
var result string
// 数组开始
result += fmt.Sprintf("*%d\r\n", len(args)+1)
// 命令
result += fmt.Sprintf("$%d\r\n%s\r\n", len(command), command)
// 参数
for _, arg := range args {
result += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
}
return result
}
// 读取响应
func (s *Slave) readResponse() (string, error) {
buffer := make([]byte, 1024)
n, err := s.conn.Read(buffer)
if err != nil {
return "", err
}
return string(buffer[:n]), nil
}
// 等待响应
func (s *Slave) waitForResponse(expected string) error {
response, err := s.readResponse()
if err != nil {
return err
}
if response != expected {
return fmt.Errorf("unexpected response: %s", response)
}
return nil
}
// 处理响应
func (s *Slave) processResponse(response string) error {
// 简化实现,实际应该解析 RESP 协议
fmt.Printf("Received response: %s\n", response)
// 更新最后 ping 时间
s.mu.Lock()
s.lastPing = time.Now()
s.mu.Unlock()
return nil
}
// 停止从节点
func (s *Slave) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.isRunning {
return
}
close(s.stopCh)
s.wg.Wait()
if s.conn != nil {
s.conn.Close()
}
s.isRunning = false
}
// 获取复制状态
func (s *Slave) GetReplicationStatus() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
return map[string]interface{}{
"role": "slave",
"master_host": s.masterAddr,
"master_port": 6379,
"master_link_status": "up",
"slave_repl_offset": s.offset,
"last_ping": s.lastPing,
}
}
3. 复制日志实现
// replication/replication_log.go
package replication
import (
"sync"
"time"
)
// 创建复制日志
func NewReplicationLog() *ReplicationLog {
return &ReplicationLog{
commands: make([]ReplicationCommand, 0),
offset: 0,
}
}
// 添加命令
func (rl *ReplicationLog) AddCommand(command string, args []string) {
rl.mu.Lock()
defer rl.mu.Unlock()
cmd := ReplicationCommand{
Command: command,
Args: args,
Offset: rl.offset,
Time: time.Now(),
}
rl.commands = append(rl.commands, cmd)
rl.offset++
}
// 获取命令
func (rl *ReplicationLog) GetCommand(offset int64) *ReplicationCommand {
rl.mu.RLock()
defer rl.mu.RUnlock()
if offset < 0 || offset >= int64(len(rl.commands)) {
return nil
}
return &rl.commands[offset]
}
// 获取命令范围
func (rl *ReplicationLog) GetCommands(start, end int64) []ReplicationCommand {
rl.mu.RLock()
defer rl.mu.RUnlock()
if start < 0 || end >= int64(len(rl.commands)) || start > end {
return nil
}
return rl.commands[start:end+1]
}
// 获取长度
func (rl *ReplicationLog) GetLength() int64 {
rl.mu.RLock()
defer rl.mu.RUnlock()
return int64(len(rl.commands))
}
// 获取偏移量
func (rl *ReplicationLog) GetOffset() int64 {
rl.mu.RLock()
defer rl.mu.RUnlock()
return rl.offset
}
// 清理过期命令
func (rl *ReplicationLog) CleanupExpired(maxAge time.Duration) {
rl.mu.Lock()
defer rl.mu.Unlock()
cutoff := time.Now().Add(-maxAge)
// 找到第一个未过期的命令
start := 0
for i, cmd := range rl.commands {
if cmd.Time.After(cutoff) {
start = i
break
}
}
// 保留未过期的命令
rl.commands = rl.commands[start:]
}
4. 心跳检测实现
// replication/heartbeat.go
package replication
import (
"context"
"sync"
"time"
)
// 心跳检测器
type HeartbeatChecker struct {
master *Master
interval time.Duration
timeout time.Duration
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
}
// 创建心跳检测器
func NewHeartbeatChecker(master *Master, interval, timeout time.Duration) *HeartbeatChecker {
return &HeartbeatChecker{
master: master,
interval: interval,
timeout: timeout,
stopCh: make(chan struct{}),
}
}
// 启动心跳检测
func (hc *HeartbeatChecker) Start() error {
hc.mu.Lock()
defer hc.mu.Unlock()
if hc.isRunning {
return fmt.Errorf("heartbeat checker already running")
}
hc.isRunning = true
hc.wg.Add(1)
go hc.heartbeatLoop()
return nil
}
// 心跳循环
func (hc *HeartbeatChecker) heartbeatLoop() {
defer hc.wg.Done()
ticker := time.NewTicker(hc.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hc.checkHeartbeats()
case <-hc.stopCh:
return
}
}
}
// 检查心跳
func (hc *HeartbeatChecker) checkHeartbeats() {
hc.master.mu.RLock()
slaves := make([]*SlaveConnection, 0, len(hc.master.slaves))
for _, slave := range hc.master.slaves {
slaves = append(slaves, slave)
}
hc.master.mu.RUnlock()
for _, slave := range slaves {
if hc.isSlaveTimeout(slave) {
hc.handleSlaveTimeout(slave)
}
}
}
// 检查从节点是否超时
func (hc *HeartbeatChecker) isSlaveTimeout(slave *SlaveConnection) bool {
slave.mu.RLock()
defer slave.mu.RUnlock()
return time.Since(slave.lastPing) > hc.timeout
}
// 处理从节点超时
func (hc *HeartbeatChecker) handleSlaveTimeout(slave *SlaveConnection) {
// 关闭连接
slave.conn.Close()
// 从主节点中移除
hc.master.mu.Lock()
delete(hc.master.slaves, slave.addr)
hc.master.mu.Unlock()
fmt.Printf("Slave %s timed out and removed\n", slave.addr)
}
// 停止心跳检测
func (hc *HeartbeatChecker) Stop() {
hc.mu.Lock()
defer hc.mu.Unlock()
if !hc.isRunning {
return
}
close(hc.stopCh)
hc.wg.Wait()
hc.isRunning = false
}
5. 断线重连实现
// replication/reconnect.go
package replication
import (
"context"
"fmt"
"sync"
"time"
)
// 重连管理器
type ReconnectManager struct {
slave *Slave
maxRetries int
interval time.Duration
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
}
// 创建重连管理器
func NewReconnectManager(slave *Slave, maxRetries int, interval time.Duration) *ReconnectManager {
return &ReconnectManager{
slave: slave,
maxRetries: maxRetries,
interval: interval,
stopCh: make(chan struct{}),
}
}
// 启动重连管理器
func (rm *ReconnectManager) Start() error {
rm.mu.Lock()
defer rm.mu.Unlock()
if rm.isRunning {
return fmt.Errorf("reconnect manager already running")
}
rm.isRunning = true
rm.wg.Add(1)
go rm.reconnectLoop()
return nil
}
// 重连循环
func (rm *ReconnectManager) reconnectLoop() {
defer rm.wg.Done()
for {
select {
case <-rm.stopCh:
return
default:
// 检查连接状态
if rm.isSlaveConnected() {
time.Sleep(rm.interval)
continue
}
// 尝试重连
if err := rm.attemptReconnect(); err != nil {
fmt.Printf("Reconnect failed: %v\n", err)
time.Sleep(rm.interval)
continue
}
fmt.Println("Reconnected successfully")
}
}
}
// 检查从节点是否连接
func (rm *ReconnectManager) isSlaveConnected() bool {
rm.slave.mu.RLock()
defer rm.slave.mu.RUnlock()
return rm.slave.isRunning && rm.slave.conn != nil
}
// 尝试重连
func (rm *ReconnectManager) attemptReconnect() error {
// 停止当前连接
rm.slave.Stop()
// 等待一段时间
time.Sleep(time.Second)
// 重新启动
return rm.slave.Start()
}
// 停止重连管理器
func (rm *ReconnectManager) Stop() {
rm.mu.Lock()
defer rm.mu.Unlock()
if !rm.isRunning {
return
}
close(rm.stopCh)
rm.wg.Wait()
rm.isRunning = false
}
6. 主从切换实现
// replication/failover.go
package replication
import (
"context"
"fmt"
"sync"
"time"
)
// 主从切换管理器
type FailoverManager struct {
master *Master
slaves []*Slave
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
}
// 创建主从切换管理器
func NewFailoverManager(master *Master, slaves []*Slave) *FailoverManager {
return &FailoverManager{
master: master,
slaves: slaves,
stopCh: make(chan struct{}),
}
}
// 启动主从切换管理器
func (fm *FailoverManager) Start() error {
fm.mu.Lock()
defer fm.mu.Unlock()
if fm.isRunning {
return fmt.Errorf("failover manager already running")
}
fm.isRunning = true
fm.wg.Add(1)
go fm.monitorLoop()
return nil
}
// 监控循环
func (fm *FailoverManager) monitorLoop() {
defer fm.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fm.checkMasterHealth()
case <-fm.stopCh:
return
}
}
}
// 检查主节点健康状态
func (fm *FailoverManager) checkMasterHealth() {
// 简化实现,实际应该检查主节点是否响应
if !fm.isMasterHealthy() {
fm.performFailover()
}
}
// 检查主节点是否健康
func (fm *FailoverManager) isMasterHealthy() bool {
// 简化实现,实际应该发送 PING 命令
return true
}
// 执行主从切换
func (fm *FailoverManager) performFailover() {
fmt.Println("Master is down, performing failover...")
// 选择新的主节点
newMaster := fm.selectNewMaster()
if newMaster == nil {
fmt.Println("No suitable slave found for failover")
return
}
// 执行切换
if err := fm.executeFailover(newMaster); err != nil {
fmt.Printf("Failover failed: %v\n", err)
return
}
fmt.Println("Failover completed successfully")
}
// 选择新的主节点
func (fm *FailoverManager) selectNewMaster() *Slave {
// 简化实现,选择第一个从节点
if len(fm.slaves) > 0 {
return fm.slaves[0]
}
return nil
}
// 执行切换
func (fm *FailoverManager) executeFailover(newMaster *Slave) error {
// 1. 停止旧主节点
fm.master.Stop()
// 2. 提升从节点为主节点
if err := fm.promoteSlaveToMaster(newMaster); err != nil {
return err
}
// 3. 更新其他从节点
if err := fm.updateOtherSlaves(newMaster); err != nil {
return err
}
return nil
}
// 提升从节点为主节点
func (fm *FailoverManager) promoteSlaveToMaster(slave *Slave) error {
// 简化实现,实际应该修改从节点配置
fmt.Printf("Promoting slave %s to master\n", slave.masterAddr)
return nil
}
// 更新其他从节点
func (fm *FailoverManager) updateOtherSlaves(newMaster *Slave) error {
// 简化实现,实际应该更新从节点配置
fmt.Println("Updating other slaves to point to new master")
return nil
}
// 停止主从切换管理器
func (fm *FailoverManager) Stop() {
fm.mu.Lock()
defer fm.mu.Unlock()
if !fm.isRunning {
return
}
close(fm.stopCh)
fm.wg.Wait()
fm.isRunning = false
}
测试验证
1. 单元测试
// replication/replication_test.go
package replication
import (
"context"
"testing"
"time"
)
func TestMasterSlaveReplication(t *testing.T) {
// 创建主节点
master := NewMaster(":6380")
if err := master.Start(); err != nil {
t.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
// 创建从节点
slave := NewSlave(":6380")
if err := slave.Start(); err != nil {
t.Fatalf("Failed to start slave: %v", err)
}
defer slave.Stop()
// 等待连接建立
time.Sleep(time.Second)
// 检查从节点数量
if master.GetSlaveCount() != 1 {
t.Errorf("Expected 1 slave, got %d", master.GetSlaveCount())
}
// 执行命令
if err := master.ExecuteCommand("SET", []string{"key1", "value1"}); err != nil {
t.Fatalf("Failed to execute command: %v", err)
}
// 等待同步
time.Sleep(time.Second)
// 检查复制状态
status := master.GetReplicationStatus()
if status["role"].(string) != "master" {
t.Errorf("Expected role 'master', got %s", status["role"])
}
}
func TestReplicationLog(t *testing.T) {
log := NewReplicationLog()
// 添加命令
log.AddCommand("SET", []string{"key1", "value1"})
log.AddCommand("GET", []string{"key1"})
// 检查长度
if log.GetLength() != 2 {
t.Errorf("Expected length 2, got %d", log.GetLength())
}
// 检查偏移量
if log.GetOffset() != 2 {
t.Errorf("Expected offset 2, got %d", log.GetOffset())
}
// 获取命令
cmd := log.GetCommand(0)
if cmd == nil {
t.Error("Expected command, got nil")
}
if cmd.Command != "SET" {
t.Errorf("Expected command 'SET', got %s", cmd.Command)
}
}
func TestHeartbeatChecker(t *testing.T) {
master := NewMaster(":6381")
if err := master.Start(); err != nil {
t.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
// 创建心跳检测器
checker := NewHeartbeatChecker(master, time.Second, time.Second*2)
if err := checker.Start(); err != nil {
t.Fatalf("Failed to start heartbeat checker: %v", err)
}
defer checker.Stop()
// 等待检测
time.Sleep(time.Second * 3)
}
func TestReconnectManager(t *testing.T) {
slave := NewSlave(":6382")
// 创建重连管理器
manager := NewReconnectManager(slave, 3, time.Second)
if err := manager.Start(); err != nil {
t.Fatalf("Failed to start reconnect manager: %v", err)
}
defer manager.Stop()
// 等待重连
time.Sleep(time.Second * 2)
}
func TestFailoverManager(t *testing.T) {
master := NewMaster(":6383")
if err := master.Start(); err != nil {
t.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
slave := NewSlave(":6383")
if err := slave.Start(); err != nil {
t.Fatalf("Failed to start slave: %v", err)
}
defer slave.Stop()
// 创建主从切换管理器
failover := NewFailoverManager(master, []*Slave{slave})
if err := failover.Start(); err != nil {
t.Fatalf("Failed to start failover manager: %v", err)
}
defer failover.Stop()
// 等待监控
time.Sleep(time.Second * 2)
}
2. 性能基准测试
// replication/benchmark_test.go
package replication
import (
"testing"
"time"
)
func BenchmarkMasterExecuteCommand(b *testing.B) {
master := NewMaster(":6390")
if err := master.Start(); err != nil {
b.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
command := fmt.Sprintf("SET key%d value%d", i, i)
master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
}
}
func BenchmarkSlaveReplication(b *testing.B) {
master := NewMaster(":6391")
if err := master.Start(); err != nil {
b.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
slave := NewSlave(":6391")
if err := slave.Start(); err != nil {
b.Fatalf("Failed to start slave: %v", err)
}
defer slave.Stop()
// 等待连接建立
time.Sleep(time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
}
}
func BenchmarkReplicationLog(b *testing.B) {
log := NewReplicationLog()
b.ResetTimer()
for i := 0; i < b.N; i++ {
log.AddCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
}
}
func BenchmarkHeartbeatChecker(b *testing.B) {
master := NewMaster(":6392")
if err := master.Start(); err != nil {
b.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
checker := NewHeartbeatChecker(master, time.Second, time.Second*2)
if err := checker.Start(); err != nil {
b.Fatalf("Failed to start heartbeat checker: %v", err)
}
defer checker.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟心跳检查
time.Sleep(time.Millisecond)
}
}
3. 并发测试
// replication/concurrent_test.go
package replication
import (
"sync"
"testing"
"time"
)
func TestMasterSlaveConcurrent(t *testing.T) {
master := NewMaster(":6400")
if err := master.Start(); err != nil {
t.Fatalf("Failed to start master: %v", err)
}
defer master.Stop()
// 创建多个从节点
const numSlaves = 5
slaves := make([]*Slave, numSlaves)
for i := 0; i < numSlaves; i++ {
slave := NewSlave(":6400")
if err := slave.Start(); err != nil {
t.Fatalf("Failed to start slave %d: %v", i, err)
}
slaves[i] = slave
}
// 等待连接建立
time.Sleep(time.Second)
// 检查从节点数量
if master.GetSlaveCount() != numSlaves {
t.Errorf("Expected %d slaves, got %d", numSlaves, master.GetSlaveCount())
}
// 并发执行命令
const numCommands = 100
var wg sync.WaitGroup
for i := 0; i < numCommands; i++ {
wg.Add(1)
go func(cmdID int) {
defer wg.Done()
key := fmt.Sprintf("key%d", cmdID)
value := fmt.Sprintf("value%d", cmdID)
if err := master.ExecuteCommand("SET", []string{key, value}); err != nil {
t.Errorf("Failed to execute command %d: %v", cmdID, err)
}
}(i)
}
wg.Wait()
// 等待同步
time.Sleep(time.Second)
// 检查复制状态
status := master.GetReplicationStatus()
if status["connected_slaves"].(int) != numSlaves {
t.Errorf("Expected %d connected slaves, got %d", numSlaves, status["connected_slaves"])
}
// 停止从节点
for _, slave := range slaves {
slave.Stop()
}
}
func TestReplicationLogConcurrent(t *testing.T) {
log := NewReplicationLog()
const numGoroutines = 10
const numCommands = 100
var wg sync.WaitGroup
// 并发添加命令
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numCommands; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
value := fmt.Sprintf("value_%d_%d", goroutineID, j)
log.AddCommand("SET", []string{key, value})
}
}(i)
}
wg.Wait()
// 检查结果
expectedLength := int64(numGoroutines * numCommands)
if log.GetLength() != expectedLength {
t.Errorf("Expected length %d, got %d", expectedLength, log.GetLength())
}
if log.GetOffset() != expectedLength {
t.Errorf("Expected offset %d, got %d", expectedLength, log.GetOffset())
}
}
性能对比分析
1. 同步方式对比
同步方式 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
全量同步 | 数据完整 | 网络开销大 | 首次同步 |
增量同步 | 网络开销小 | 实现复杂 | 持续同步 |
混合同步 | 平衡性能 | 实现复杂 | 生产环境 |
2. 复制策略对比
策略 | 性能 | 一致性 | 可用性 | 复杂度 |
---|---|---|---|---|
异步复制 | 高 | 最终一致 | 高 | 低 |
同步复制 | 低 | 强一致 | 中等 | 中等 |
半同步复制 | 中等 | 中等 | 高 | 高 |
3. 性能测试结果
// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
// 主节点性能
b.Run("Master", func(b *testing.B) {
master := NewMaster(":6500")
master.Start()
defer master.Stop()
for i := 0; i < b.N; i++ {
master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
}
})
// 从节点性能
b.Run("Slave", func(b *testing.B) {
master := NewMaster(":6501")
master.Start()
defer master.Stop()
slave := NewSlave(":6501")
slave.Start()
defer slave.Stop()
for i := 0; i < b.N; i++ {
master.ExecuteCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
}
})
// 复制日志性能
b.Run("ReplicationLog", func(b *testing.B) {
log := NewReplicationLog()
for i := 0; i < b.N; i++ {
log.AddCommand("SET", []string{fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)})
}
})
}
面试要点
1. 主从复制的原理
答案要点:
- 异步复制:主节点执行命令后异步发送给从节点
- 复制日志:记录所有写操作,用于增量同步
- 复制偏移量:标识复制进度,用于断点续传
- 心跳检测:监控从节点状态,处理断线重连
2. 全量同步 vs 增量同步
答案要点:
- 全量同步:发送完整 RDB 文件,数据完整但开销大
- 增量同步:发送复制日志,开销小但实现复杂
- 选择策略:首次同步用全量,后续用增量
- 优化技巧:积压缓冲区、压缩传输、批量发送
3. 主从切换的实现
答案要点:
- 故障检测:心跳检测、超时判断、健康检查
- 选主算法:优先级、偏移量、运行时间
- 数据一致性:确保新主节点数据最新
- 服务切换:更新配置、重定向流量
4. 复制延迟的优化
答案要点:
- 网络优化:减少网络延迟、增加带宽
- 批量传输:合并多个命令一起发送
- 压缩传输:使用压缩算法减少数据量
- 并行复制:多个从节点并行同步
总结
通过本章学习,我们深入理解了:
- Redis 主从复制的原理和协议实现
- 全量同步和增量同步的完整机制
- 复制偏移量和积压缓冲区的管理
- 心跳检测和断线重连的实现
- 主从切换流程和故障恢复
主从复制机制为 Redis 提供了高可用性和读写分离能力。在下一章中,我们将学习哨兵模式实现,了解如何实现自动故障转移。