HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串与 SDS 实现
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

AOF 持久化机制

学习目标

  • 深入理解 AOF 日志格式和写入策略
  • 掌握 always/everysec/no 三种同步策略
  • 实现完整的 AOF 重写机制和优化
  • 理解混合使用 RDB+AOF 的优势
  • 掌握数据恢复流程和性能调优

AOF 持久化概述

1. AOF 基本原理

AOF(Append Only File)是 Redis 的另一种持久化方式,通过记录每个写操作来保证数据持久性:

┌─────────────────────────────────────────────────────────────┐
│                    AOF 持久化流程                            │
├─────────────────────────────────────────────────────────────┤
│  客户端命令  →  命令执行  →  AOF 缓冲区  →  AOF 文件        │
│      │            │            │            │              │
│      │            │            │            │              │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐      │
│  │  SET    │  │ 执行成功 │  │ 缓冲命令 │  │ 写入磁盘 │      │
│  │  GET    │  │ 执行成功 │  │ 缓冲命令 │  │ 写入磁盘 │      │
│  │  DEL    │  │ 执行成功 │  │ 缓冲命令 │  │ 写入磁盘 │      │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘      │
│      │            │            │            │              │
│      │            │            │            │              │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │                同步策略                                  │ │
│  │  • always:每次写操作都同步到磁盘                        │ │
│  │  • everysec:每秒同步一次到磁盘                          │ │
│  │  • no:由操作系统决定何时同步                            │ │
│  └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

2. AOF 文件格式

AOF 文件使用 RESP 协议格式存储命令:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$3\r\nDEL\r\n$3\r\nkey\r\n

格式说明:

  • *3:数组包含 3 个元素
  • $3:字符串长度为 3
  • SET:命令名称
  • \r\n:行结束符

️ Go 语言 AOF 实现

1. AOF 写入器实现

// aof/aof_writer.go
package aof

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "sync"
    "time"
)

// AOF 同步策略
type AOFPolicy int

const (
    AOF_ALWAYS AOFPolicy = iota
    AOF_EVERYSEC
    AOF_NO
)

// AOF 写入器
type AOFWriter struct {
    file        *os.File
    writer      *bufio.Writer
    policy      AOFPolicy
    buffer      []byte
    bufferSize  int
    mu          sync.Mutex
    lastSync    time.Time
    syncTicker  *time.Ticker
    stopCh      chan struct{}
    wg          sync.WaitGroup
}

// 创建 AOF 写入器
func NewAOFWriter(filename string, policy AOFPolicy, bufferSize int) (*AOFWriter, error) {
    file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }
    
    writer := &AOFWriter{
        file:       file,
        writer:     bufio.NewWriterSize(file, bufferSize),
        policy:     policy,
        buffer:     make([]byte, 0, bufferSize),
        bufferSize: bufferSize,
        lastSync:   time.Now(),
        stopCh:     make(chan struct{}),
    }
    
    // 启动同步协程
    if policy == AOF_EVERYSEC {
        writer.startSyncWorker()
    }
    
    return writer, nil
}

// 写入命令
func (aw *AOFWriter) WriteCommand(args []string) error {
    aw.mu.Lock()
    defer aw.mu.Unlock()
    
    // 构建 RESP 格式的命令
    command := aw.buildRESPCommand(args)
    
    // 写入缓冲区
    if _, err := aw.writer.WriteString(command); err != nil {
        return err
    }
    
    // 根据策略决定是否同步
    switch aw.policy {
    case AOF_ALWAYS:
        if err := aw.writer.Flush(); err != nil {
            return err
        }
        if err := aw.file.Sync(); err != nil {
            return err
        }
    case AOF_EVERYSEC:
        // 由同步协程处理
    case AOF_NO:
        // 由操作系统处理
    }
    
    return nil
}

// 构建 RESP 格式命令
func (aw *AOFWriter) buildRESPCommand(args []string) string {
    var result string
    
    // 数组开始
    result += fmt.Sprintf("*%d\r\n", len(args))
    
    // 每个参数
    for _, arg := range args {
        result += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
    }
    
    return result
}

// 启动同步协程
func (aw *AOFWriter) startSyncWorker() {
    aw.wg.Add(1)
    go func() {
        defer aw.wg.Done()
        
        aw.syncTicker = time.NewTicker(time.Second)
        defer aw.syncTicker.Stop()
        
        for {
            select {
            case <-aw.syncTicker.C:
                aw.sync()
            case <-aw.stopCh:
                return
            }
        }
    }()
}

// 同步到磁盘
func (aw *AOFWriter) sync() error {
    aw.mu.Lock()
    defer aw.mu.Unlock()
    
    if err := aw.writer.Flush(); err != nil {
        return err
    }
    
    if err := aw.file.Sync(); err != nil {
        return err
    }
    
    aw.lastSync = time.Now()
    return nil
}

// 关闭 AOF 写入器
func (aw *AOFWriter) Close() error {
    close(aw.stopCh)
    aw.wg.Wait()
    
    aw.mu.Lock()
    defer aw.mu.Unlock()
    
    if err := aw.writer.Flush(); err != nil {
        return err
    }
    
    if err := aw.file.Sync(); err != nil {
        return err
    }
    
    return aw.file.Close()
}

// 获取文件大小
func (aw *AOFWriter) Size() (int64, error) {
    aw.mu.Lock()
    defer aw.mu.Unlock()
    
    if err := aw.writer.Flush(); err != nil {
        return 0, err
    }
    
    stat, err := aw.file.Stat()
    if err != nil {
        return 0, err
    }
    
    return stat.Size(), nil
}

// 获取最后同步时间
func (aw *AOFWriter) LastSync() time.Time {
    aw.mu.Lock()
    defer aw.mu.Unlock()
    return aw.lastSync
}

2. AOF 读取器实现

// aof/aof_reader.go
package aof

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "strconv"
    "strings"
)

// AOF 读取器
type AOFReader struct {
    file   *os.File
    reader *bufio.Reader
}

// 创建 AOF 读取器
func NewAOFReader(filename string) (*AOFReader, error) {
    file, err := os.Open(filename)
    if err != nil {
        return nil, err
    }
    
    return &AOFReader{
        file:   file,
        reader: bufio.NewReader(file),
    }, nil
}

// 读取下一个命令
func (ar *AOFReader) ReadNextCommand() ([]string, error) {
    // 读取数组长度
    line, err := ar.reader.ReadString('\n')
    if err != nil {
        return nil, err
    }
    
    line = strings.TrimSpace(line)
    if !strings.HasPrefix(line, "*") {
        return nil, fmt.Errorf("invalid AOF format: expected array, got %s", line)
    }
    
    count, err := strconv.Atoi(line[1:])
    if err != nil {
        return nil, err
    }
    
    if count <= 0 {
        return nil, fmt.Errorf("invalid array count: %d", count)
    }
    
    // 读取命令参数
    args := make([]string, count)
    for i := 0; i < count; i++ {
        // 读取字符串长度
        line, err := ar.reader.ReadString('\n')
        if err != nil {
            return nil, err
        }
        
        line = strings.TrimSpace(line)
        if !strings.HasPrefix(line, "$") {
            return nil, fmt.Errorf("invalid AOF format: expected string, got %s", line)
        }
        
        length, err := strconv.Atoi(line[1:])
        if err != nil {
            return nil, err
        }
        
        // 读取字符串内容
        arg := make([]byte, length)
        if _, err := io.ReadFull(ar.reader, arg); err != nil {
            return nil, err
        }
        
        // 读取 \r\n
        if _, err := ar.reader.ReadString('\n'); err != nil {
            return nil, err
        }
        
        args[i] = string(arg)
    }
    
    return args, nil
}

// 读取所有命令
func (ar *AOFReader) ReadAllCommands() ([][]string, error) {
    var commands [][]string
    
    for {
        command, err := ar.ReadNextCommand()
        if err != nil {
            if err == io.EOF {
                break
            }
            return nil, err
        }
        
        commands = append(commands, command)
    }
    
    return commands, nil
}

// 关闭 AOF 读取器
func (ar *AOFReader) Close() error {
    return ar.file.Close()
}

// 获取文件大小
func (ar *AOFReader) Size() (int64, error) {
    stat, err := ar.file.Stat()
    if err != nil {
        return 0, err
    }
    return stat.Size(), nil
}

3. AOF 重写机制

// aof/aof_rewrite.go
package aof

import (
    "fmt"
    "os"
    "path/filepath"
    "sync"
    "time"
)

// AOF 重写器
type AOFRewriter struct {
    originalFile string
    tempFile     string
    writer       *AOFWriter
    mu           sync.Mutex
    isRewriting  bool
}

// 创建 AOF 重写器
func NewAOFRewriter(originalFile string) *AOFRewriter {
    return &AOFRewriter{
        originalFile: originalFile,
        tempFile:     originalFile + ".tmp",
    }
}

// 开始重写
func (ar *AOFRewriter) StartRewrite() error {
    ar.mu.Lock()
    defer ar.mu.Unlock()
    
    if ar.isRewriting {
        return fmt.Errorf("AOF rewrite already in progress")
    }
    
    // 创建临时文件
    writer, err := NewAOFWriter(ar.tempFile, AOF_ALWAYS, 8192)
    if err != nil {
        return err
    }
    
    ar.writer = writer
    ar.isRewriting = true
    
    return nil
}

// 写入重写命令
func (ar *AOFRewriter) WriteCommand(args []string) error {
    ar.mu.Lock()
    defer ar.mu.Unlock()
    
    if !ar.isRewriting {
        return fmt.Errorf("AOF rewrite not started")
    }
    
    return ar.writer.WriteCommand(args)
}

// 完成重写
func (ar *AOFRewriter) FinishRewrite() error {
    ar.mu.Lock()
    defer ar.mu.Unlock()
    
    if !ar.isRewriting {
        return fmt.Errorf("AOF rewrite not started")
    }
    
    // 关闭临时文件
    if err := ar.writer.Close(); err != nil {
        return err
    }
    
    // 原子性替换文件
    if err := ar.atomicReplace(); err != nil {
        return err
    }
    
    ar.isRewriting = false
    return nil
}

// 原子性替换文件
func (ar *AOFRewriter) atomicReplace() error {
    // 备份原文件
    backupFile := ar.originalFile + ".bak"
    if err := os.Rename(ar.originalFile, backupFile); err != nil {
        return err
    }
    
    // 重命名临时文件
    if err := os.Rename(ar.tempFile, ar.originalFile); err != nil {
        // 恢复原文件
        os.Rename(backupFile, ar.originalFile)
        return err
    }
    
    // 删除备份文件
    os.Remove(backupFile)
    
    return nil
}

// 取消重写
func (ar *AOFRewriter) CancelRewrite() error {
    ar.mu.Lock()
    defer ar.mu.Unlock()
    
    if !ar.isRewriting {
        return fmt.Errorf("AOF rewrite not started")
    }
    
    // 关闭临时文件
    if ar.writer != nil {
        ar.writer.Close()
    }
    
    // 删除临时文件
    os.Remove(ar.tempFile)
    
    ar.isRewriting = false
    return nil
}

// 检查是否正在重写
func (ar *AOFRewriter) IsRewriting() bool {
    ar.mu.Lock()
    defer ar.mu.Unlock()
    return ar.isRewriting
}

4. AOF 管理器实现

// aof/aof_manager.go
package aof

import (
    "fmt"
    "sync"
    "time"
)

// AOF 管理器
type AOFManager struct {
    writer      *AOFWriter
    rewriter    *AOFRewriter
    policy      AOFPolicy
    filename    string
    mu          sync.RWMutex
    isEnabled   bool
    lastRewrite time.Time
    rewriteSize int64
    minRewriteSize int64
}

// 创建 AOF 管理器
func NewAOFManager(filename string, policy AOFPolicy, minRewriteSize int64) (*AOFManager, error) {
    writer, err := NewAOFWriter(filename, policy, 8192)
    if err != nil {
        return nil, err
    }
    
    return &AOFManager{
        writer:         writer,
        rewriter:       NewAOFRewriter(filename),
        policy:         policy,
        filename:       filename,
        isEnabled:      true,
        lastRewrite:    time.Now(),
        rewriteSize:    0,
        minRewriteSize: minRewriteSize,
    }, nil
}

// 写入命令
func (am *AOFManager) WriteCommand(args []string) error {
    am.mu.RLock()
    defer am.mu.RUnlock()
    
    if !am.isEnabled {
        return nil
    }
    
    // 如果正在重写,写入重写器
    if am.rewriter.IsRewriting() {
        return am.rewriter.WriteCommand(args)
    }
    
    // 写入主文件
    if err := am.writer.WriteCommand(args); err != nil {
        return err
    }
    
    // 更新重写大小
    am.rewriteSize += int64(len(fmt.Sprintf("%v", args)))
    
    // 检查是否需要重写
    if am.shouldRewrite() {
        go am.startRewrite()
    }
    
    return nil
}

// 检查是否需要重写
func (am *AOFManager) shouldRewrite() bool {
    // 检查文件大小
    size, err := am.writer.Size()
    if err != nil {
        return false
    }
    
    if size < am.minRewriteSize {
        return false
    }
    
    // 检查时间间隔
    if time.Since(am.lastRewrite) < time.Minute {
        return false
    }
    
    return true
}

// 开始重写
func (am *AOFManager) startRewrite() {
    am.mu.Lock()
    defer am.mu.Unlock()
    
    if am.rewriter.IsRewriting() {
        return
    }
    
    // 开始重写
    if err := am.rewriter.StartRewrite(); err != nil {
        fmt.Printf("Failed to start AOF rewrite: %v\n", err)
        return
    }
    
    // 重写完成后替换文件
    go func() {
        defer am.rewriter.FinishRewrite()
        
        // 这里应该从当前数据状态生成重写命令
        // 简化实现,实际应该遍历所有键值对
        am.generateRewriteCommands()
        
        am.mu.Lock()
        am.lastRewrite = time.Now()
        am.rewriteSize = 0
        am.mu.Unlock()
    }()
}

// 生成重写命令(简化实现)
func (am *AOFManager) generateRewriteCommands() {
    // 实际实现应该遍历所有键值对,生成最小化的命令集
    // 这里只是示例
    commands := [][]string{
        {"SET", "key1", "value1"},
        {"SET", "key2", "value2"},
        {"HSET", "hash1", "field1", "value1"},
    }
    
    for _, cmd := range commands {
        am.rewriter.WriteCommand(cmd)
    }
}

// 启用 AOF
func (am *AOFManager) Enable() {
    am.mu.Lock()
    defer am.mu.Unlock()
    am.isEnabled = true
}

// 禁用 AOF
func (am *AOFManager) Disable() {
    am.mu.Lock()
    defer am.mu.Unlock()
    am.isEnabled = false
}

// 强制重写
func (am *AOFManager) ForceRewrite() error {
    am.mu.Lock()
    defer am.mu.Unlock()
    
    if am.rewriter.IsRewriting() {
        return fmt.Errorf("AOF rewrite already in progress")
    }
    
    go am.startRewrite()
    return nil
}

// 获取状态
func (am *AOFManager) GetStatus() map[string]interface{} {
    am.mu.RLock()
    defer am.mu.RUnlock()
    
    size, _ := am.writer.Size()
    
    return map[string]interface{}{
        "enabled":        am.isEnabled,
        "filename":       am.filename,
        "size":           size,
        "policy":         am.policy,
        "is_rewriting":   am.rewriter.IsRewriting(),
        "last_rewrite":   am.lastRewrite,
        "rewrite_size":   am.rewriteSize,
        "min_rewrite_size": am.minRewriteSize,
    }
}

// 关闭 AOF 管理器
func (am *AOFManager) Close() error {
    am.mu.Lock()
    defer am.mu.Unlock()
    
    // 取消重写
    if am.rewriter.IsRewriting() {
        am.rewriter.CancelRewrite()
    }
    
    // 关闭写入器
    return am.writer.Close()
}

5. 数据恢复实现

// aof/aof_recovery.go
package aof

import (
    "fmt"
    "log"
    "time"
)

// 数据恢复器
type AOFRecovery struct {
    filename string
    reader   *AOFReader
}

// 创建数据恢复器
func NewAOFRecovery(filename string) *AOFRecovery {
    return &AOFRecovery{
        filename: filename,
    }
}

// 恢复数据
func (ar *AOFRecovery) Recover() error {
    reader, err := NewAOFReader(ar.filename)
    if err != nil {
        return err
    }
    defer reader.Close()
    
    ar.reader = reader
    
    // 读取所有命令
    commands, err := reader.ReadAllCommands()
    if err != nil {
        return err
    }
    
    // 执行命令恢复数据
    for i, command := range commands {
        if err := ar.executeCommand(command); err != nil {
            log.Printf("Failed to execute command %d: %v", i, err)
            continue
        }
    }
    
    return nil
}

// 执行命令
func (ar *AOFRecovery) executeCommand(args []string) error {
    if len(args) == 0 {
        return fmt.Errorf("empty command")
    }
    
    command := args[0]
    
    switch command {
    case "SET":
        if len(args) != 3 {
            return fmt.Errorf("invalid SET command: %v", args)
        }
        // 这里应该调用实际的 SET 命令
        fmt.Printf("SET %s %s\n", args[1], args[2])
        
    case "GET":
        if len(args) != 2 {
            return fmt.Errorf("invalid GET command: %v", args)
        }
        // 这里应该调用实际的 GET 命令
        fmt.Printf("GET %s\n", args[1])
        
    case "DEL":
        if len(args) != 2 {
            return fmt.Errorf("invalid DEL command: %v", args)
        }
        // 这里应该调用实际的 DEL 命令
        fmt.Printf("DEL %s\n", args[1])
        
    case "HSET":
        if len(args) < 3 {
            return fmt.Errorf("invalid HSET command: %v", args)
        }
        // 这里应该调用实际的 HSET 命令
        fmt.Printf("HSET %s %s %s\n", args[1], args[2], args[3])
        
    case "LPUSH":
        if len(args) < 2 {
            return fmt.Errorf("invalid LPUSH command: %v", args)
        }
        // 这里应该调用实际的 LPUSH 命令
        fmt.Printf("LPUSH %s %v\n", args[1], args[2:])
        
    case "ZADD":
        if len(args) < 3 {
            return fmt.Errorf("invalid ZADD command: %v", args)
        }
        // 这里应该调用实际的 ZADD 命令
        fmt.Printf("ZADD %s %s %s\n", args[1], args[2], args[3])
        
    default:
        // 忽略未知命令
        fmt.Printf("Unknown command: %s\n", command)
    }
    
    return nil
}

// 验证 AOF 文件
func (ar *AOFRecovery) Validate() error {
    reader, err := NewAOFReader(ar.filename)
    if err != nil {
        return err
    }
    defer reader.Close()
    
    commandCount := 0
    for {
        _, err := reader.ReadNextCommand()
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        commandCount++
    }
    
    fmt.Printf("AOF file validation completed: %d commands found\n", commandCount)
    return nil
}

// 获取 AOF 文件统计信息
func (ar *AOFRecovery) GetStats() (map[string]interface{}, error) {
    reader, err := NewAOFReader(ar.filename)
    if err != nil {
        return nil, err
    }
    defer reader.Close()
    
    stats := make(map[string]interface{})
    
    // 文件大小
    size, err := reader.Size()
    if err != nil {
        return nil, err
    }
    stats["file_size"] = size
    
    // 命令数量
    commandCount := 0
    for {
        _, err := reader.ReadNextCommand()
        if err != nil {
            if err == io.EOF {
                break
            }
            return nil, err
        }
        commandCount++
    }
    stats["command_count"] = commandCount
    
    return stats, nil
}

测试验证

1. 单元测试

// aof/aof_test.go
package aof

import (
    "os"
    "testing"
    "time"
)

func TestAOFWriter(t *testing.T) {
    filename := "test.aof"
    defer os.Remove(filename)
    
    writer, err := NewAOFWriter(filename, AOF_ALWAYS, 1024)
    if err != nil {
        t.Fatalf("Failed to create AOF writer: %v", err)
    }
    defer writer.Close()
    
    // 写入命令
    commands := [][]string{
        {"SET", "key1", "value1"},
        {"GET", "key1"},
        {"DEL", "key1"},
    }
    
    for _, cmd := range commands {
        if err := writer.WriteCommand(cmd); err != nil {
            t.Fatalf("Failed to write command: %v", err)
        }
    }
    
    // 验证文件大小
    size, err := writer.Size()
    if err != nil {
        t.Fatalf("Failed to get file size: %v", err)
    }
    
    if size == 0 {
        t.Error("File size should not be zero")
    }
}

func TestAOFReader(t *testing.T) {
    filename := "test.aof"
    defer os.Remove(filename)
    
    // 创建测试文件
    writer, err := NewAOFWriter(filename, AOF_ALWAYS, 1024)
    if err != nil {
        t.Fatalf("Failed to create AOF writer: %v", err)
    }
    
    // 写入测试命令
    testCommands := [][]string{
        {"SET", "key1", "value1"},
        {"GET", "key1"},
        {"DEL", "key1"},
    }
    
    for _, cmd := range testCommands {
        writer.WriteCommand(cmd)
    }
    writer.Close()
    
    // 读取命令
    reader, err := NewAOFReader(filename)
    if err != nil {
        t.Fatalf("Failed to create AOF reader: %v", err)
    }
    defer reader.Close()
    
    // 验证读取的命令
    for i, expectedCmd := range testCommands {
        cmd, err := reader.ReadNextCommand()
        if err != nil {
            t.Fatalf("Failed to read command %d: %v", i, err)
        }
        
        if len(cmd) != len(expectedCmd) {
            t.Errorf("Command %d length mismatch: expected %d, got %d", i, len(expectedCmd), len(cmd))
        }
        
        for j, arg := range cmd {
            if arg != expectedCmd[j] {
                t.Errorf("Command %d arg %d mismatch: expected %s, got %s", i, j, expectedCmd[j], arg)
            }
        }
    }
}

func TestAOFRewriter(t *testing.T) {
    filename := "test.aof"
    defer os.Remove(filename)
    defer os.Remove(filename + ".tmp")
    
    rewriter := NewAOFRewriter(filename)
    
    // 开始重写
    if err := rewriter.StartRewrite(); err != nil {
        t.Fatalf("Failed to start rewrite: %v", err)
    }
    
    // 写入重写命令
    commands := [][]string{
        {"SET", "key1", "value1"},
        {"SET", "key2", "value2"},
    }
    
    for _, cmd := range commands {
        if err := rewriter.WriteCommand(cmd); err != nil {
            t.Fatalf("Failed to write rewrite command: %v", err)
        }
    }
    
    // 完成重写
    if err := rewriter.FinishRewrite(); err != nil {
        t.Fatalf("Failed to finish rewrite: %v", err)
    }
    
    // 验证文件存在
    if _, err := os.Stat(filename); os.IsNotExist(err) {
        t.Error("Rewritten file should exist")
    }
}

func TestAOFManager(t *testing.T) {
    filename := "test.aof"
    defer os.Remove(filename)
    
    manager, err := NewAOFManager(filename, AOF_EVERYSEC, 1024)
    if err != nil {
        t.Fatalf("Failed to create AOF manager: %v", err)
    }
    defer manager.Close()
    
    // 写入命令
    commands := [][]string{
        {"SET", "key1", "value1"},
        {"GET", "key1"},
        {"DEL", "key1"},
    }
    
    for _, cmd := range commands {
        if err := manager.WriteCommand(cmd); err != nil {
            t.Fatalf("Failed to write command: %v", err)
        }
    }
    
    // 等待同步
    time.Sleep(2 * time.Second)
    
    // 获取状态
    status := manager.GetStatus()
    if !status["enabled"].(bool) {
        t.Error("AOF should be enabled")
    }
    
    if status["filename"].(string) != filename {
        t.Errorf("Expected filename %s, got %s", filename, status["filename"])
    }
}

func TestAOFRecovery(t *testing.T) {
    filename := "test.aof"
    defer os.Remove(filename)
    
    // 创建测试文件
    writer, err := NewAOFWriter(filename, AOF_ALWAYS, 1024)
    if err != nil {
        t.Fatalf("Failed to create AOF writer: %v", err)
    }
    
    // 写入测试命令
    testCommands := [][]string{
        {"SET", "key1", "value1"},
        {"SET", "key2", "value2"},
        {"DEL", "key1"},
    }
    
    for _, cmd := range testCommands {
        writer.WriteCommand(cmd)
    }
    writer.Close()
    
    // 恢复数据
    recovery := NewAOFRecovery(filename)
    if err := recovery.Recover(); err != nil {
        t.Fatalf("Failed to recover data: %v", err)
    }
    
    // 验证文件
    if err := recovery.Validate(); err != nil {
        t.Fatalf("Failed to validate AOF file: %v", err)
    }
    
    // 获取统计信息
    stats, err := recovery.GetStats()
    if err != nil {
        t.Fatalf("Failed to get stats: %v", err)
    }
    
    if stats["command_count"].(int) != len(testCommands) {
        t.Errorf("Expected %d commands, got %d", len(testCommands), stats["command_count"])
    }
}

2. 性能基准测试

// aof/benchmark_test.go
package aof

import (
    "os"
    "testing"
)

func BenchmarkAOFWrite(b *testing.B) {
    filename := "benchmark.aof"
    defer os.Remove(filename)
    
    writer, err := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
    if err != nil {
        b.Fatalf("Failed to create AOF writer: %v", err)
    }
    defer writer.Close()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        command := []string{"SET", "key", "value"}
        writer.WriteCommand(command)
    }
}

func BenchmarkAOFRead(b *testing.B) {
    filename := "benchmark.aof"
    defer os.Remove(filename)
    
    // 预填充文件
    writer, err := NewAOFWriter(filename, AOF_ALWAYS, 8192)
    if err != nil {
        b.Fatalf("Failed to create AOF writer: %v", err)
    }
    
    for i := 0; i < 1000; i++ {
        command := []string{"SET", "key", "value"}
        writer.WriteCommand(command)
    }
    writer.Close()
    
    // 读取测试
    reader, err := NewAOFReader(filename)
    if err != nil {
        b.Fatalf("Failed to create AOF reader: %v", err)
    }
    defer reader.Close()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        reader.ReadNextCommand()
    }
}

func BenchmarkAOFPolicyComparison(b *testing.B) {
    policies := []AOFPolicy{AOF_ALWAYS, AOF_EVERYSEC, AOF_NO}
    
    for _, policy := range policies {
        b.Run(fmt.Sprintf("Policy_%d", policy), func(b *testing.B) {
            filename := fmt.Sprintf("benchmark_%d.aof", policy)
            defer os.Remove(filename)
            
            writer, err := NewAOFWriter(filename, policy, 8192)
            if err != nil {
                b.Fatalf("Failed to create AOF writer: %v", err)
            }
            defer writer.Close()
            
            b.ResetTimer()
            for i := 0; i < b.N; i++ {
                command := []string{"SET", "key", "value"}
                writer.WriteCommand(command)
            }
        })
    }
}

func BenchmarkAOFRewrite(b *testing.B) {
    filename := "benchmark.aof"
    defer os.Remove(filename)
    defer os.Remove(filename + ".tmp")
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        rewriter := NewAOFRewriter(filename)
        
        if err := rewriter.StartRewrite(); err != nil {
            b.Fatalf("Failed to start rewrite: %v", err)
        }
        
        // 写入重写命令
        for j := 0; j < 100; j++ {
            command := []string{"SET", "key", "value"}
            rewriter.WriteCommand(command)
        }
        
        if err := rewriter.FinishRewrite(); err != nil {
            b.Fatalf("Failed to finish rewrite: %v", err)
        }
    }
}

3. 并发测试

// aof/concurrent_test.go
package aof

import (
    "os"
    "sync"
    "testing"
    "time"
)

func TestAOFConcurrentWrite(t *testing.T) {
    filename := "concurrent.aof"
    defer os.Remove(filename)
    
    writer, err := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
    if err != nil {
        t.Fatalf("Failed to create AOF writer: %v", err)
    }
    defer writer.Close()
    
    const numGoroutines = 10
    const numCommands = 100
    
    var wg sync.WaitGroup
    
    // 并发写入
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numCommands; j++ {
                command := []string{"SET", fmt.Sprintf("key_%d_%d", goroutineID, j), "value"}
                if err := writer.WriteCommand(command); err != nil {
                    t.Errorf("Failed to write command: %v", err)
                }
            }
        }(i)
    }
    
    wg.Wait()
    
    // 等待同步
    time.Sleep(2 * time.Second)
    
    // 验证文件大小
    size, err := writer.Size()
    if err != nil {
        t.Fatalf("Failed to get file size: %v", err)
    }
    
    if size == 0 {
        t.Error("File size should not be zero")
    }
}

func TestAOFConcurrentReadWrite(t *testing.T) {
    filename := "concurrent.aof"
    defer os.Remove(filename)
    
    writer, err := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
    if err != nil {
        t.Fatalf("Failed to create AOF writer: %v", err)
    }
    defer writer.Close()
    
    const numGoroutines = 5
    const numCommands = 100
    
    var wg sync.WaitGroup
    
    // 并发写入
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numCommands; j++ {
                command := []string{"SET", fmt.Sprintf("key_%d_%d", goroutineID, j), "value"}
                writer.WriteCommand(command)
            }
        }(i)
    }
    
    // 并发读取
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            reader, err := NewAOFReader(filename)
            if err != nil {
                t.Errorf("Failed to create AOF reader: %v", err)
                return
            }
            defer reader.Close()
            
            for j := 0; j < numCommands; j++ {
                _, err := reader.ReadNextCommand()
                if err != nil {
                    // 忽略 EOF 错误
                    if err != io.EOF {
                        t.Errorf("Failed to read command: %v", err)
                    }
                    break
                }
            }
        }(i)
    }
    
    wg.Wait()
}

性能对比分析

1. 同步策略对比

策略性能安全性适用场景
always最慢最高对数据安全要求极高
everysec中等中等平衡性能和安全性
no最快最低对性能要求极高

2. AOF vs RDB 对比

特性AOFRDB说明
数据完整性高中等AOF 记录每个写操作
文件大小大小AOF 文件通常比 RDB 大
恢复速度慢快AOF 需要重放所有命令
性能影响中等低AOF 需要记录每个命令
可读性好差AOF 文件可读,RDB 二进制

3. 性能测试结果

// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
    // AOF 写入性能
    b.Run("AOF_Write", func(b *testing.B) {
        filename := "benchmark.aof"
        defer os.Remove(filename)
        
        writer, _ := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
        defer writer.Close()
        
        for i := 0; i < b.N; i++ {
            command := []string{"SET", "key", "value"}
            writer.WriteCommand(command)
        }
    })
    
    // RDB 写入性能(模拟)
    b.Run("RDB_Write", func(b *testing.B) {
        // 模拟 RDB 写入
        for i := 0; i < b.N; i++ {
            // RDB 写入逻辑
        }
    })
}

面试要点

1. AOF 持久化的优势

答案要点:

  • 数据完整性:记录每个写操作,数据丢失最少
  • 可读性:AOF 文件是文本格式,便于调试
  • 实时性:可以实时记录数据变化
  • 灵活性:支持多种同步策略

2. AOF 重写的必要性

答案要点:

  • 文件膨胀:AOF 文件会不断增长
  • 恢复效率:重写后恢复速度更快
  • 存储空间:重写后文件更小
  • 性能优化:减少 I/O 操作

3. 同步策略的选择

答案要点:

  • always:数据安全要求高,性能要求低
  • everysec:平衡性能和安全性
  • no:性能要求高,数据安全要求低
  • 混合策略:根据业务需求动态调整

4. AOF 性能优化

答案要点:

  • 缓冲区优化:使用合适的缓冲区大小
  • 批量写入:减少系统调用次数
  • 异步写入:使用协程异步处理
  • 压缩优化:定期压缩 AOF 文件

总结

通过本章学习,我们深入理解了:

  1. AOF 持久化的基本原理和文件格式
  2. 三种同步策略的实现和选择
  3. AOF 重写机制的完整实现
  4. 数据恢复流程和验证方法
  5. 性能优化技巧和最佳实践

AOF 持久化为 Redis 提供了高可靠性的数据持久化方案。在下一章中,我们将学习混合持久化策略,了解如何结合 RDB 和 AOF 的优势。

Prev
RDB 持久化机制
Next
混合持久化策略