哨兵模式实现
学习目标
- 深入理解 Redis 哨兵的监控机制和故障检测
- 掌握主观下线和客观下线的判断逻辑
- 实现自动故障转移流程和选举算法
- 理解哨兵集群的通信协议和配置传播
- 掌握哨兵模式的高可用性保证
哨兵模式概述
1. 哨兵架构设计
Redis 哨兵模式通过多个哨兵节点监控主从节点,实现自动故障转移和高可用性:
┌─────────────────────────────────────────────────────────────┐
│ 哨兵模式架构 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 哨兵1 │ │ 哨兵2 │ │ 哨兵3 │ │
│ │ (Sentinel) │ │ (Sentinel) │ │ (Sentinel) │ │
│ │ │ │ │ │ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ 监控 │ │ │ │ 监控 │ │ │ │ 监控 │ │ │
│ │ │ 主节点 │ │ │ │ 主节点 │ │ │ │ 主节点 │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ 监控 │ │ │ │ 监控 │ │ │ │ 监控 │ │ │
│ │ │ 从节点 │ │ │ │ 从节点 │ │ │ │ 从节点 │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────┬───────┴───────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 监控目标 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 主节点 │ │ 从节点1 │ │ 从节点2 │ │ │
│ │ │ (Master) │ │ (Slave) │ │ (Slave) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. 哨兵状态机
状态 | 描述 | 转换条件 |
---|---|---|
SENTINEL_STATE_NONE | 未初始化 | 初始状态 |
SENTINEL_STATE_CONNECT | 连接中 | 开始连接主节点 |
SENTINEL_STATE_CONNECTED | 已连接 | 连接建立成功 |
SENTINEL_STATE_SUBSCRIBE | 订阅中 | 订阅主节点事件 |
SENTINEL_STATE_MONITORING | 监控中 | 开始监控主节点 |
SENTINEL_STATE_FAILOVER | 故障转移 | 执行故障转移 |
SENTINEL_STATE_RECONFIG | 重新配置 | 更新配置 |
️ Go 语言哨兵实现
1. 哨兵节点实现
// sentinel/sentinel.go
package sentinel
import (
"context"
"fmt"
"net"
"sync"
"time"
)
// 哨兵状态
type SentinelState int
const (
SENTINEL_STATE_NONE SentinelState = iota
SENTINEL_STATE_CONNECT
SENTINEL_STATE_CONNECTED
SENTINEL_STATE_SUBSCRIBE
SENTINEL_STATE_MONITORING
SENTINEL_STATE_FAILOVER
SENTINEL_STATE_RECONFIG
)
// 监控目标
type MonitoredTarget struct {
Name string
Host string
Port int
Role string // "master" or "slave"
State string // "up" or "down"
LastPing time.Time
DownTime time.Time
FailoverStartTime time.Time
mu sync.RWMutex
}
// 哨兵节点
type Sentinel struct {
ID string
Addr string
State SentinelState
Targets map[string]*MonitoredTarget
OtherSentinels map[string]*Sentinel
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
config *SentinelConfig
}
// 哨兵配置
type SentinelConfig struct {
MasterName string
MasterHost string
MasterPort int
DownAfterMs int
FailoverTimeout int
ParallelSyncs int
}
// 创建哨兵节点
func NewSentinel(id, addr string, config *SentinelConfig) *Sentinel {
return &Sentinel{
ID: id,
Addr: addr,
State: SENTINEL_STATE_NONE,
Targets: make(map[string]*MonitoredTarget),
OtherSentinels: make(map[string]*Sentinel),
stopCh: make(chan struct{}),
config: config,
}
}
// 启动哨兵
func (s *Sentinel) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.isRunning {
return fmt.Errorf("sentinel already running")
}
s.isRunning = true
s.wg.Add(1)
go s.sentinelLoop()
return nil
}
// 哨兵主循环
func (s *Sentinel) sentinelLoop() {
defer s.wg.Done()
// 初始化监控目标
s.initializeTargets()
// 开始监控
s.startMonitoring()
// 主循环
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.performMonitoring()
case <-s.stopCh:
return
}
}
}
// 初始化监控目标
func (s *Sentinel) initializeTargets() {
// 添加主节点
master := &MonitoredTarget{
Name: s.config.MasterName,
Host: s.config.MasterHost,
Port: s.config.MasterPort,
Role: "master",
State: "up",
LastPing: time.Now(),
}
s.Targets[s.config.MasterName] = master
}
// 开始监控
func (s *Sentinel) startMonitoring() {
s.mu.Lock()
s.State = SENTINEL_STATE_MONITORING
s.mu.Unlock()
// 连接主节点
if err := s.connectToMaster(); err != nil {
fmt.Printf("Failed to connect to master: %v\n", err)
return
}
// 发现从节点
s.discoverSlaves()
// 发现其他哨兵
s.discoverOtherSentinels()
}
// 连接主节点
func (s *Sentinel) connectToMaster() error {
master := s.Targets[s.config.MasterName]
// 连接主节点
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
if err != nil {
return err
}
defer conn.Close()
// 发送 PING 命令
if err := s.sendPing(conn); err != nil {
return err
}
// 更新状态
master.mu.Lock()
master.State = "up"
master.LastPing = time.Now()
master.mu.Unlock()
return nil
}
// 发送 PING 命令
func (s *Sentinel) sendPing(conn net.Conn) error {
// 发送 PING 命令
pingCmd := "*1\r\n$4\r\nPING\r\n"
if _, err := conn.Write([]byte(pingCmd)); err != nil {
return err
}
// 读取响应
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
return err
}
response := string(buffer[:n])
if response != "+PONG\r\n" {
return fmt.Errorf("unexpected response: %s", response)
}
return nil
}
// 发现从节点
func (s *Sentinel) discoverSlaves() {
master := s.Targets[s.config.MasterName]
// 连接主节点
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
if err != nil {
return
}
defer conn.Close()
// 发送 INFO replication 命令
infoCmd := "*2\r\n$4\r\nINFO\r\n$10\r\nreplication\r\n"
if _, err := conn.Write([]byte(infoCmd)); err != nil {
return
}
// 读取响应
buffer := make([]byte, 4096)
n, err := conn.Read(buffer)
if err != nil {
return
}
// 解析从节点信息
s.parseSlaveInfo(string(buffer[:n]))
}
// 解析从节点信息
func (s *Sentinel) parseSlaveInfo(info string) {
// 简化实现,实际应该解析 INFO 输出
// 这里假设发现了一个从节点
slave := &MonitoredTarget{
Name: "slave1",
Host: "127.0.0.1",
Port: 6380,
Role: "slave",
State: "up",
LastPing: time.Now(),
}
s.Targets["slave1"] = slave
}
// 发现其他哨兵
func (s *Sentinel) discoverOtherSentinels() {
master := s.Targets[s.config.MasterName]
// 连接主节点
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
if err != nil {
return
}
defer conn.Close()
// 发送 PUBLISH 命令订阅哨兵事件
pubCmd := "*3\r\n$7\r\nPUBLISH\r\n$8\r\n__sentinel__:hello\r\n$10\r\nsentinel1\r\n"
if _, err := conn.Write([]byte(pubCmd)); err != nil {
return
}
// 读取响应
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
return
}
// 解析哨兵信息
s.parseSentinelInfo(string(buffer[:n]))
}
// 解析哨兵信息
func (s *Sentinel) parseSentinelInfo(info string) {
// 简化实现,实际应该解析 PUBLISH 输出
// 这里假设发现了另一个哨兵
otherSentinel := &Sentinel{
ID: "sentinel2",
Addr: "127.0.0.1:26380",
}
s.OtherSentinels["sentinel2"] = otherSentinel
}
// 执行监控
func (s *Sentinel) performMonitoring() {
// 检查主节点状态
s.checkMasterHealth()
// 检查从节点状态
s.checkSlavesHealth()
// 检查其他哨兵状态
s.checkOtherSentinelsHealth()
// 执行故障转移
s.performFailoverIfNeeded()
}
// 检查主节点健康状态
func (s *Sentinel) checkMasterHealth() {
master := s.Targets[s.config.MasterName]
// 连接主节点
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", master.Host, master.Port))
if err != nil {
s.handleMasterDown(master)
return
}
defer conn.Close()
// 发送 PING 命令
if err := s.sendPing(conn); err != nil {
s.handleMasterDown(master)
return
}
// 更新状态
master.mu.Lock()
master.State = "up"
master.LastPing = time.Now()
master.mu.Unlock()
}
// 处理主节点下线
func (s *Sentinel) handleMasterDown(master *MonitoredTarget) {
master.mu.Lock()
defer master.mu.Unlock()
if master.State == "up" {
master.State = "down"
master.DownTime = time.Now()
// 检查是否达到主观下线条件
if s.isSubjectivelyDown(master) {
s.handleSubjectiveDown(master)
}
}
}
// 检查是否主观下线
func (s *Sentinel) isSubjectivelyDown(master *MonitoredTarget) bool {
return time.Since(master.DownTime) > time.Duration(s.config.DownAfterMs)*time.Millisecond
}
// 处理主观下线
func (s *Sentinel) handleSubjectiveDown(master *MonitoredTarget) {
fmt.Printf("Master %s is subjectively down\n", master.Name)
// 询问其他哨兵
s.askOtherSentinels(master)
}
// 询问其他哨兵
func (s *Sentinel) askOtherSentinels(master *MonitoredTarget) {
for _, sentinel := range s.OtherSentinels {
go s.askSentinel(sentinel, master)
}
}
// 询问单个哨兵
func (s *Sentinel) askSentinel(sentinel *Sentinel, master *MonitoredTarget) {
// 连接哨兵
conn, err := net.Dial("tcp", sentinel.Addr)
if err != nil {
return
}
defer conn.Close()
// 发送 SENTINEL is-master-down-by-addr 命令
cmd := fmt.Sprintf("*4\r\n$7\r\nSENTINEL\r\n$20\r\nis-master-down-by-addr\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n",
len(master.Host), master.Host, len(fmt.Sprintf("%d", master.Port)), master.Port)
if _, err := conn.Write([]byte(cmd)); err != nil {
return
}
// 读取响应
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
return
}
// 处理响应
s.handleSentinelResponse(string(buffer[:n]), master)
}
// 处理哨兵响应
func (s *Sentinel) handleSentinelResponse(response string, master *MonitoredTarget) {
// 简化实现,实际应该解析响应
if response == ":1\r\n" {
// 其他哨兵也认为主节点下线
s.handleObjectiveDown(master)
}
}
// 处理客观下线
func (s *Sentinel) handleObjectiveDown(master *MonitoredTarget) {
fmt.Printf("Master %s is objectively down\n", master.Name)
// 开始故障转移
s.startFailover(master)
}
// 开始故障转移
func (s *Sentinel) startFailover(master *MonitoredTarget) {
master.mu.Lock()
master.FailoverStartTime = time.Now()
master.mu.Unlock()
s.mu.Lock()
s.State = SENTINEL_STATE_FAILOVER
s.mu.Unlock()
// 选择新的主节点
newMaster := s.selectNewMaster()
if newMaster == nil {
fmt.Println("No suitable slave found for failover")
return
}
// 执行故障转移
s.executeFailover(master, newMaster)
}
// 选择新的主节点
func (s *Sentinel) selectNewMaster() *MonitoredTarget {
// 简化实现,选择第一个从节点
for _, target := range s.Targets {
if target.Role == "slave" && target.State == "up" {
return target
}
}
return nil
}
// 执行故障转移
func (s *Sentinel) executeFailover(oldMaster, newMaster *MonitoredTarget) {
fmt.Printf("Executing failover from %s to %s\n", oldMaster.Name, newMaster.Name)
// 1. 停止旧主节点
s.stopMaster(oldMaster)
// 2. 提升从节点为主节点
s.promoteSlaveToMaster(newMaster)
// 3. 更新其他从节点
s.updateOtherSlaves(newMaster)
// 4. 更新哨兵配置
s.updateSentinelConfig(newMaster)
// 5. 完成故障转移
s.completeFailover(newMaster)
}
// 停止主节点
func (s *Sentinel) stopMaster(master *MonitoredTarget) {
// 简化实现,实际应该发送 SHUTDOWN 命令
fmt.Printf("Stopping master %s\n", master.Name)
}
// 提升从节点为主节点
func (s *Sentinel) promoteSlaveToMaster(slave *MonitoredTarget) {
// 简化实现,实际应该发送 SLAVEOF NO ONE 命令
fmt.Printf("Promoting slave %s to master\n", slave.Name)
slave.mu.Lock()
slave.Role = "master"
slave.mu.Unlock()
}
// 更新其他从节点
func (s *Sentinel) updateOtherSlaves(newMaster *MonitoredTarget) {
for _, target := range s.Targets {
if target.Role == "slave" && target != newMaster {
// 发送 SLAVEOF 命令
s.sendSlaveOfCommand(target, newMaster)
}
}
}
// 发送 SLAVEOF 命令
func (s *Sentinel) sendSlaveOfCommand(slave, master *MonitoredTarget) {
// 简化实现,实际应该发送 SLAVEOF 命令
fmt.Printf("Updating slave %s to follow master %s\n", slave.Name, master.Name)
}
// 更新哨兵配置
func (s *Sentinel) updateSentinelConfig(newMaster *MonitoredTarget) {
// 简化实现,实际应该更新配置文件
fmt.Printf("Updating sentinel config to point to new master %s\n", newMaster.Name)
}
// 完成故障转移
func (s *Sentinel) completeFailover(newMaster *MonitoredTarget) {
fmt.Printf("Failover completed successfully, new master is %s\n", newMaster.Name)
s.mu.Lock()
s.State = SENTINEL_STATE_MONITORING
s.mu.Unlock()
}
// 检查从节点健康状态
func (s *Sentinel) checkSlavesHealth() {
for _, target := range s.Targets {
if target.Role == "slave" {
s.checkTargetHealth(target)
}
}
}
// 检查其他哨兵健康状态
func (s *Sentinel) checkOtherSentinelsHealth() {
for _, sentinel := range s.OtherSentinels {
s.checkSentinelHealth(sentinel)
}
}
// 检查目标健康状态
func (s *Sentinel) checkTargetHealth(target *MonitoredTarget) {
// 连接目标
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", target.Host, target.Port))
if err != nil {
s.handleTargetDown(target)
return
}
defer conn.Close()
// 发送 PING 命令
if err := s.sendPing(conn); err != nil {
s.handleTargetDown(target)
return
}
// 更新状态
target.mu.Lock()
target.State = "up"
target.LastPing = time.Now()
target.mu.Unlock()
}
// 处理目标下线
func (s *Sentinel) handleTargetDown(target *MonitoredTarget) {
target.mu.Lock()
defer target.mu.Unlock()
if target.State == "up" {
target.State = "down"
target.DownTime = time.Now()
fmt.Printf("Target %s is down\n", target.Name)
}
}
// 检查哨兵健康状态
func (s *Sentinel) checkSentinelHealth(sentinel *Sentinel) {
// 连接哨兵
conn, err := net.Dial("tcp", sentinel.Addr)
if err != nil {
fmt.Printf("Sentinel %s is down\n", sentinel.ID)
return
}
defer conn.Close()
// 发送 PING 命令
if err := s.sendPing(conn); err != nil {
fmt.Printf("Sentinel %s is down\n", sentinel.ID)
return
}
fmt.Printf("Sentinel %s is up\n", sentinel.ID)
}
// 执行故障转移(如果需要)
func (s *Sentinel) performFailoverIfNeeded() {
// 检查是否有主节点需要故障转移
for _, target := range s.Targets {
if target.Role == "master" && target.State == "down" {
if s.shouldStartFailover(target) {
s.startFailover(target)
}
}
}
}
// 检查是否应该开始故障转移
func (s *Sentinel) shouldStartFailover(master *MonitoredTarget) bool {
master.mu.RLock()
defer master.mu.RUnlock()
// 检查是否已经超时
if time.Since(master.FailoverStartTime) > time.Duration(s.config.FailoverTimeout)*time.Millisecond {
return true
}
return false
}
// 停止哨兵
func (s *Sentinel) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.isRunning {
return
}
close(s.stopCh)
s.wg.Wait()
s.isRunning = false
}
// 获取哨兵状态
func (s *Sentinel) GetStatus() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
return map[string]interface{}{
"id": s.ID,
"addr": s.Addr,
"state": s.State,
"targets_count": len(s.Targets),
"sentinels_count": len(s.OtherSentinels),
"is_running": s.isRunning,
}
}
2. 哨兵集群实现
// sentinel/sentinel_cluster.go
package sentinel
import (
"context"
"fmt"
"sync"
"time"
)
// 哨兵集群
type SentinelCluster struct {
Sentinels map[string]*Sentinel
mu sync.RWMutex
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
}
// 创建哨兵集群
func NewSentinelCluster() *SentinelCluster {
return &SentinelCluster{
Sentinels: make(map[string]*Sentinel),
stopCh: make(chan struct{}),
}
}
// 添加哨兵
func (sc *SentinelCluster) AddSentinel(sentinel *Sentinel) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.Sentinels[sentinel.ID] = sentinel
}
// 启动集群
func (sc *SentinelCluster) Start() error {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.isRunning {
return fmt.Errorf("cluster already running")
}
sc.isRunning = true
sc.wg.Add(1)
go sc.clusterLoop()
// 启动所有哨兵
for _, sentinel := range sc.Sentinels {
if err := sentinel.Start(); err != nil {
return err
}
}
return nil
}
// 集群主循环
func (sc *SentinelCluster) clusterLoop() {
defer sc.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sc.performClusterOperations()
case <-sc.stopCh:
return
}
}
}
// 执行集群操作
func (sc *SentinelCluster) performClusterOperations() {
// 同步哨兵状态
sc.syncSentinelStates()
// 执行故障转移
sc.performFailoverIfNeeded()
// 更新配置
sc.updateConfigurations()
}
// 同步哨兵状态
func (sc *SentinelCluster) syncSentinelStates() {
sc.mu.RLock()
sentinels := make([]*Sentinel, 0, len(sc.Sentinels))
for _, sentinel := range sc.Sentinels {
sentinels = append(sentinels, sentinel)
}
sc.mu.RUnlock()
// 同步状态
for _, sentinel := range sentinels {
go sc.syncSentinelState(sentinel)
}
}
// 同步单个哨兵状态
func (sc *SentinelCluster) syncSentinelState(sentinel *Sentinel) {
// 简化实现,实际应该同步状态
status := sentinel.GetStatus()
fmt.Printf("Sentinel %s status: %v\n", sentinel.ID, status)
}
// 执行故障转移(如果需要)
func (sc *SentinelCluster) performFailoverIfNeeded() {
// 检查是否有哨兵需要故障转移
for _, sentinel := range sc.Sentinels {
if sentinel.State == SENTINEL_STATE_FAILOVER {
// 执行故障转移
go sentinel.performFailoverIfNeeded()
}
}
}
// 更新配置
func (sc *SentinelCluster) updateConfigurations() {
// 简化实现,实际应该更新配置
fmt.Println("Updating configurations...")
}
// 停止集群
func (sc *SentinelCluster) Stop() {
sc.mu.Lock()
defer sc.mu.Unlock()
if !sc.isRunning {
return
}
close(sc.stopCh)
sc.wg.Wait()
// 停止所有哨兵
for _, sentinel := range sc.Sentinels {
sentinel.Stop()
}
sc.isRunning = false
}
// 获取集群状态
func (sc *SentinelCluster) GetStatus() map[string]interface{} {
sc.mu.RLock()
defer sc.mu.RUnlock()
return map[string]interface{}{
"sentinels_count": len(sc.Sentinels),
"is_running": sc.isRunning,
}
}
3. 故障转移实现
// sentinel/failover.go
package sentinel
import (
"context"
"fmt"
"sync"
"time"
)
// 故障转移管理器
type FailoverManager struct {
sentinel *Sentinel
isRunning bool
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
}
// 创建故障转移管理器
func NewFailoverManager(sentinel *Sentinel) *FailoverManager {
return &FailoverManager{
sentinel: sentinel,
stopCh: make(chan struct{}),
}
}
// 启动故障转移管理器
func (fm *FailoverManager) Start() error {
fm.mu.Lock()
defer fm.mu.Unlock()
if fm.isRunning {
return fmt.Errorf("failover manager already running")
}
fm.isRunning = true
fm.wg.Add(1)
go fm.failoverLoop()
return nil
}
// 故障转移循环
func (fm *FailoverManager) failoverLoop() {
defer fm.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fm.checkFailoverConditions()
case <-fm.stopCh:
return
}
}
}
// 检查故障转移条件
func (fm *FailoverManager) checkFailoverConditions() {
// 检查主节点状态
for _, target := range fm.sentinel.Targets {
if target.Role == "master" && target.State == "down" {
if fm.shouldStartFailover(target) {
fm.startFailover(target)
}
}
}
}
// 检查是否应该开始故障转移
func (fm *FailoverManager) shouldStartFailover(master *MonitoredTarget) bool {
master.mu.RLock()
defer master.mu.RUnlock()
// 检查是否已经超时
if time.Since(master.FailoverStartTime) > time.Duration(fm.sentinel.config.FailoverTimeout)*time.Millisecond {
return true
}
return false
}
// 开始故障转移
func (fm *FailoverManager) startFailover(master *MonitoredTarget) {
fmt.Printf("Starting failover for master %s\n", master.Name)
// 选择新的主节点
newMaster := fm.selectNewMaster()
if newMaster == nil {
fmt.Println("No suitable slave found for failover")
return
}
// 执行故障转移
fm.executeFailover(master, newMaster)
}
// 选择新的主节点
func (fm *FailoverManager) selectNewMaster() *MonitoredTarget {
// 简化实现,选择第一个从节点
for _, target := range fm.sentinel.Targets {
if target.Role == "slave" && target.State == "up" {
return target
}
}
return nil
}
// 执行故障转移
func (fm *FailoverManager) executeFailover(oldMaster, newMaster *MonitoredTarget) {
fmt.Printf("Executing failover from %s to %s\n", oldMaster.Name, newMaster.Name)
// 1. 停止旧主节点
fm.stopMaster(oldMaster)
// 2. 提升从节点为主节点
fm.promoteSlaveToMaster(newMaster)
// 3. 更新其他从节点
fm.updateOtherSlaves(newMaster)
// 4. 更新哨兵配置
fm.updateSentinelConfig(newMaster)
// 5. 完成故障转移
fm.completeFailover(newMaster)
}
// 停止主节点
func (fm *FailoverManager) stopMaster(master *MonitoredTarget) {
// 简化实现,实际应该发送 SHUTDOWN 命令
fmt.Printf("Stopping master %s\n", master.Name)
master.mu.Lock()
master.State = "down"
master.mu.Unlock()
}
// 提升从节点为主节点
func (fm *FailoverManager) promoteSlaveToMaster(slave *MonitoredTarget) {
// 简化实现,实际应该发送 SLAVEOF NO ONE 命令
fmt.Printf("Promoting slave %s to master\n", slave.Name)
slave.mu.Lock()
slave.Role = "master"
slave.State = "up"
slave.mu.Unlock()
}
// 更新其他从节点
func (fm *FailoverManager) updateOtherSlaves(newMaster *MonitoredTarget) {
for _, target := range fm.sentinel.Targets {
if target.Role == "slave" && target != newMaster {
// 发送 SLAVEOF 命令
fm.sendSlaveOfCommand(target, newMaster)
}
}
}
// 发送 SLAVEOF 命令
func (fm *FailoverManager) sendSlaveOfCommand(slave, master *MonitoredTarget) {
// 简化实现,实际应该发送 SLAVEOF 命令
fmt.Printf("Updating slave %s to follow master %s\n", slave.Name, master.Name)
}
// 更新哨兵配置
func (fm *FailoverManager) updateSentinelConfig(newMaster *MonitoredTarget) {
// 简化实现,实际应该更新配置文件
fmt.Printf("Updating sentinel config to point to new master %s\n", newMaster.Name)
}
// 完成故障转移
func (fm *FailoverManager) completeFailover(newMaster *MonitoredTarget) {
fmt.Printf("Failover completed successfully, new master is %s\n", newMaster.Name)
// 更新哨兵状态
fm.sentinel.mu.Lock()
fm.sentinel.State = SENTINEL_STATE_MONITORING
fm.sentinel.mu.Unlock()
}
// 停止故障转移管理器
func (fm *FailoverManager) Stop() {
fm.mu.Lock()
defer fm.mu.Unlock()
if !fm.isRunning {
return
}
close(fm.stopCh)
fm.wg.Wait()
fm.isRunning = false
}
4. 配置管理实现
// sentinel/config.go
package sentinel
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
)
// 哨兵配置管理器
type ConfigManager struct {
configFile string
config *SentinelConfig
mu sync.RWMutex
}
// 创建配置管理器
func NewConfigManager(configFile string) *ConfigManager {
return &ConfigManager{
configFile: configFile,
config: &SentinelConfig{},
}
}
// 加载配置
func (cm *ConfigManager) LoadConfig() error {
cm.mu.Lock()
defer cm.mu.Unlock()
// 检查配置文件是否存在
if _, err := os.Stat(cm.configFile); os.IsNotExist(err) {
// 创建默认配置
return cm.createDefaultConfig()
}
// 读取配置文件
data, err := ioutil.ReadFile(cm.configFile)
if err != nil {
return err
}
// 解析配置
if err := json.Unmarshal(data, cm.config); err != nil {
return err
}
return nil
}
// 保存配置
func (cm *ConfigManager) SaveConfig() error {
cm.mu.RLock()
defer cm.mu.RUnlock()
// 序列化配置
data, err := json.MarshalIndent(cm.config, "", " ")
if err != nil {
return err
}
// 写入文件
return ioutil.WriteFile(cm.configFile, data, 0644)
}
// 创建默认配置
func (cm *ConfigManager) createDefaultConfig() error {
cm.config = &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
return cm.SaveConfig()
}
// 获取配置
func (cm *ConfigManager) GetConfig() *SentinelConfig {
cm.mu.RLock()
defer cm.mu.RUnlock()
return cm.config
}
// 更新配置
func (cm *ConfigManager) UpdateConfig(config *SentinelConfig) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.config = config
return cm.SaveConfig()
}
// 更新主节点配置
func (cm *ConfigManager) UpdateMasterConfig(host string, port int) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.config.MasterHost = host
cm.config.MasterPort = port
return cm.SaveConfig()
}
// 更新超时配置
func (cm *ConfigManager) UpdateTimeoutConfig(downAfterMs, failoverTimeout int) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.config.DownAfterMs = downAfterMs
cm.config.FailoverTimeout = failoverTimeout
return cm.SaveConfig()
}
// 获取配置状态
func (cm *ConfigManager) GetConfigStatus() map[string]interface{} {
cm.mu.RLock()
defer cm.mu.RUnlock()
return map[string]interface{}{
"master_name": cm.config.MasterName,
"master_host": cm.config.MasterHost,
"master_port": cm.config.MasterPort,
"down_after_ms": cm.config.DownAfterMs,
"failover_timeout": cm.config.FailoverTimeout,
"parallel_syncs": cm.config.ParallelSyncs,
}
}
测试验证
1. 单元测试
// sentinel/sentinel_test.go
package sentinel
import (
"testing"
"time"
)
func TestSentinel(t *testing.T) {
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 创建哨兵
sentinel := NewSentinel("sentinel1", ":26379", config)
// 启动哨兵
if err := sentinel.Start(); err != nil {
t.Fatalf("Failed to start sentinel: %v", err)
}
defer sentinel.Stop()
// 等待监控开始
time.Sleep(time.Second)
// 检查状态
status := sentinel.GetStatus()
if status["is_running"].(bool) != true {
t.Error("Sentinel should be running")
}
if status["targets_count"].(int) != 1 {
t.Errorf("Expected 1 target, got %d", status["targets_count"])
}
}
func TestSentinelCluster(t *testing.T) {
// 创建哨兵集群
cluster := NewSentinelCluster()
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 添加哨兵
sentinel1 := NewSentinel("sentinel1", ":26379", config)
sentinel2 := NewSentinel("sentinel2", ":26380", config)
sentinel3 := NewSentinel("sentinel3", ":26381", config)
cluster.AddSentinel(sentinel1)
cluster.AddSentinel(sentinel2)
cluster.AddSentinel(sentinel3)
// 启动集群
if err := cluster.Start(); err != nil {
t.Fatalf("Failed to start cluster: %v", err)
}
defer cluster.Stop()
// 等待集群启动
time.Sleep(time.Second)
// 检查状态
status := cluster.GetStatus()
if status["sentinels_count"].(int) != 3 {
t.Errorf("Expected 3 sentinels, got %d", status["sentinels_count"])
}
if status["is_running"].(bool) != true {
t.Error("Cluster should be running")
}
}
func TestFailoverManager(t *testing.T) {
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 创建哨兵
sentinel := NewSentinel("sentinel1", ":26379", config)
// 创建故障转移管理器
manager := NewFailoverManager(sentinel)
// 启动管理器
if err := manager.Start(); err != nil {
t.Fatalf("Failed to start failover manager: %v", err)
}
defer manager.Stop()
// 等待管理器启动
time.Sleep(time.Second)
}
func TestConfigManager(t *testing.T) {
// 创建配置管理器
manager := NewConfigManager("test_config.json")
defer os.Remove("test_config.json")
// 加载配置
if err := manager.LoadConfig(); err != nil {
t.Fatalf("Failed to load config: %v", err)
}
// 检查配置
config := manager.GetConfig()
if config.MasterName != "mymaster" {
t.Errorf("Expected master name 'mymaster', got %s", config.MasterName)
}
if config.MasterHost != "127.0.0.1" {
t.Errorf("Expected master host '127.0.0.1', got %s", config.MasterHost)
}
if config.MasterPort != 6379 {
t.Errorf("Expected master port 6379, got %d", config.MasterPort)
}
// 更新配置
config.MasterHost = "192.168.1.100"
config.MasterPort = 6380
if err := manager.UpdateConfig(config); err != nil {
t.Fatalf("Failed to update config: %v", err)
}
// 检查更新后的配置
updatedConfig := manager.GetConfig()
if updatedConfig.MasterHost != "192.168.1.100" {
t.Errorf("Expected master host '192.168.1.100', got %s", updatedConfig.MasterHost)
}
if updatedConfig.MasterPort != 6380 {
t.Errorf("Expected master port 6380, got %d", updatedConfig.MasterPort)
}
}
2. 性能基准测试
// sentinel/benchmark_test.go
package sentinel
import (
"testing"
"time"
)
func BenchmarkSentinelMonitoring(b *testing.B) {
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 创建哨兵
sentinel := NewSentinel("sentinel1", ":26400", config)
// 启动哨兵
sentinel.Start()
defer sentinel.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟监控操作
sentinel.performMonitoring()
}
}
func BenchmarkSentinelCluster(b *testing.B) {
// 创建哨兵集群
cluster := NewSentinelCluster()
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 添加哨兵
for i := 0; i < 3; i++ {
sentinel := NewSentinel(fmt.Sprintf("sentinel%d", i), fmt.Sprintf(":2640%d", i), config)
cluster.AddSentinel(sentinel)
}
// 启动集群
cluster.Start()
defer cluster.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟集群操作
cluster.performClusterOperations()
}
}
func BenchmarkFailoverManager(b *testing.B) {
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 创建哨兵
sentinel := NewSentinel("sentinel1", ":26410", config)
// 创建故障转移管理器
manager := NewFailoverManager(sentinel)
// 启动管理器
manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟故障转移检查
manager.checkFailoverConditions()
}
}
func BenchmarkConfigManager(b *testing.B) {
// 创建配置管理器
manager := NewConfigManager("benchmark_config.json")
defer os.Remove("benchmark_config.json")
// 加载配置
manager.LoadConfig()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟配置操作
config := manager.GetConfig()
config.MasterHost = fmt.Sprintf("192.168.1.%d", i%255)
manager.UpdateConfig(config)
}
}
3. 并发测试
// sentinel/concurrent_test.go
package sentinel
import (
"sync"
"testing"
"time"
)
func TestSentinelConcurrent(t *testing.T) {
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 创建哨兵
sentinel := NewSentinel("sentinel1", ":26500", config)
// 启动哨兵
if err := sentinel.Start(); err != nil {
t.Fatalf("Failed to start sentinel: %v", err)
}
defer sentinel.Stop()
// 等待监控开始
time.Sleep(time.Second)
// 并发监控
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
// 模拟监控操作
sentinel.performMonitoring()
}
}(i)
}
wg.Wait()
// 检查状态
status := sentinel.GetStatus()
if status["is_running"].(bool) != true {
t.Error("Sentinel should be running")
}
}
func TestSentinelClusterConcurrent(t *testing.T) {
// 创建哨兵集群
cluster := NewSentinelCluster()
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 添加哨兵
for i := 0; i < 3; i++ {
sentinel := NewSentinel(fmt.Sprintf("sentinel%d", i), fmt.Sprintf(":2650%d", i), config)
cluster.AddSentinel(sentinel)
}
// 启动集群
if err := cluster.Start(); err != nil {
t.Fatalf("Failed to start cluster: %v", err)
}
defer cluster.Stop()
// 等待集群启动
time.Sleep(time.Second)
// 并发操作
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
// 模拟集群操作
cluster.performClusterOperations()
}
}(i)
}
wg.Wait()
// 检查状态
status := cluster.GetStatus()
if status["is_running"].(bool) != true {
t.Error("Cluster should be running")
}
}
func TestFailoverManagerConcurrent(t *testing.T) {
// 创建哨兵配置
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
// 创建哨兵
sentinel := NewSentinel("sentinel1", ":26600", config)
// 创建故障转移管理器
manager := NewFailoverManager(sentinel)
// 启动管理器
if err := manager.Start(); err != nil {
t.Fatalf("Failed to start failover manager: %v", err)
}
defer manager.Stop()
// 等待管理器启动
time.Sleep(time.Second)
// 并发故障转移检查
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
// 模拟故障转移检查
manager.checkFailoverConditions()
}
}(i)
}
wg.Wait()
}
性能对比分析
1. 哨兵模式对比
特性 | 单哨兵 | 多哨兵 | 哨兵集群 | 说明 |
---|---|---|---|---|
可用性 | 低 | 中等 | 高 | 哨兵数量越多,可用性越高 |
性能 | 高 | 中等 | 低 | 哨兵数量越多,性能越低 |
复杂度 | 低 | 中等 | 高 | 哨兵数量越多,复杂度越高 |
成本 | 低 | 中等 | 高 | 哨兵数量越多,成本越高 |
2. 故障转移策略对比
策略 | 速度 | 准确性 | 复杂度 | 适用场景 |
---|---|---|---|---|
主观下线 | 快 | 低 | 低 | 单哨兵 |
客观下线 | 中等 | 高 | 中等 | 多哨兵 |
自动故障转移 | 慢 | 高 | 高 | 生产环境 |
3. 性能测试结果
// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
// 单哨兵性能
b.Run("SingleSentinel", func(b *testing.B) {
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
sentinel := NewSentinel("sentinel1", ":26700", config)
sentinel.Start()
defer sentinel.Stop()
for i := 0; i < b.N; i++ {
sentinel.performMonitoring()
}
})
// 多哨兵性能
b.Run("MultipleSentinels", func(b *testing.B) {
cluster := NewSentinelCluster()
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
for i := 0; i < 3; i++ {
sentinel := NewSentinel(fmt.Sprintf("sentinel%d", i), fmt.Sprintf(":2670%d", i), config)
cluster.AddSentinel(sentinel)
}
cluster.Start()
defer cluster.Stop()
for i := 0; i < b.N; i++ {
cluster.performClusterOperations()
}
})
// 故障转移性能
b.Run("Failover", func(b *testing.B) {
config := &SentinelConfig{
MasterName: "mymaster",
MasterHost: "127.0.0.1",
MasterPort: 6379,
DownAfterMs: 30000,
FailoverTimeout: 180000,
ParallelSyncs: 1,
}
sentinel := NewSentinel("sentinel1", ":26800", config)
manager := NewFailoverManager(sentinel)
manager.Start()
defer manager.Stop()
for i := 0; i < b.N; i++ {
manager.checkFailoverConditions()
}
})
}
面试要点
1. 哨兵模式的优势
答案要点:
- 自动故障转移:无需人工干预
- 高可用性:多个哨兵节点保证服务可用
- 配置管理:自动更新配置
- 监控告警:实时监控主从节点状态
2. 主观下线 vs 客观下线
答案要点:
- 主观下线:单个哨兵认为主节点下线
- 客观下线:多个哨兵认为主节点下线
- 判断条件:超时时间、哨兵数量
- 处理策略:主观下线询问其他哨兵,客观下线开始故障转移
3. 故障转移流程
答案要点:
- 故障检测:心跳检测、超时判断
- 选主算法:优先级、偏移量、运行时间
- 数据一致性:确保新主节点数据最新
- 服务切换:更新配置、重定向流量
4. 哨兵集群的通信
答案要点:
- 发布订阅:使用 Redis 的发布订阅功能
- 命令传播:通过命令传播配置变更
- 状态同步:定期同步哨兵状态
- 故障检测:相互监控哨兵健康状态
总结
通过本章学习,我们深入理解了:
- Redis 哨兵模式的监控机制和故障检测
- 主观下线和客观下线的判断逻辑
- 自动故障转移流程和选举算法
- 哨兵集群的通信协议和配置传播
- 哨兵模式的高可用性保证
哨兵模式为 Redis 提供了自动故障转移和高可用性保证。在下一章中,我们将学习热点数据隔离,了解如何优化热点数据的访问性能。