AOF 持久化机制
学习目标
- 深入理解 AOF 日志格式和写入策略
- 掌握 always/everysec/no 三种同步策略
- 实现完整的 AOF 重写机制和优化
- 理解混合使用 RDB+AOF 的优势
- 掌握数据恢复流程和性能调优
AOF 持久化概述
1. AOF 基本原理
AOF(Append Only File)是 Redis 的另一种持久化方式,通过记录每个写操作来保证数据持久性:
┌─────────────────────────────────────────────────────────────┐
│ AOF 持久化流程 │
├─────────────────────────────────────────────────────────────┤
│ 客户端命令 → 命令执行 → AOF 缓冲区 → AOF 文件 │
│ │ │ │ │ │
│ │ │ │ │ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ SET │ │ 执行成功 │ │ 缓冲命令 │ │ 写入磁盘 │ │
│ │ GET │ │ 执行成功 │ │ 缓冲命令 │ │ 写入磁盘 │ │
│ │ DEL │ │ 执行成功 │ │ 缓冲命令 │ │ 写入磁盘 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 同步策略 │ │
│ │ • always:每次写操作都同步到磁盘 │ │
│ │ • everysec:每秒同步一次到磁盘 │ │
│ │ • no:由操作系统决定何时同步 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. AOF 文件格式
AOF 文件使用 RESP 协议格式存储命令:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$3\r\nDEL\r\n$3\r\nkey\r\n
格式说明:
*3
:数组包含 3 个元素$3
:字符串长度为 3SET
:命令名称\r\n
:行结束符
️ Go 语言 AOF 实现
1. AOF 写入器实现
// aof/aof_writer.go
package aof
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"time"
)
// AOF 同步策略
type AOFPolicy int
const (
AOF_ALWAYS AOFPolicy = iota
AOF_EVERYSEC
AOF_NO
)
// AOF 写入器
type AOFWriter struct {
file *os.File
writer *bufio.Writer
policy AOFPolicy
buffer []byte
bufferSize int
mu sync.Mutex
lastSync time.Time
syncTicker *time.Ticker
stopCh chan struct{}
wg sync.WaitGroup
}
// 创建 AOF 写入器
func NewAOFWriter(filename string, policy AOFPolicy, bufferSize int) (*AOFWriter, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
writer := &AOFWriter{
file: file,
writer: bufio.NewWriterSize(file, bufferSize),
policy: policy,
buffer: make([]byte, 0, bufferSize),
bufferSize: bufferSize,
lastSync: time.Now(),
stopCh: make(chan struct{}),
}
// 启动同步协程
if policy == AOF_EVERYSEC {
writer.startSyncWorker()
}
return writer, nil
}
// 写入命令
func (aw *AOFWriter) WriteCommand(args []string) error {
aw.mu.Lock()
defer aw.mu.Unlock()
// 构建 RESP 格式的命令
command := aw.buildRESPCommand(args)
// 写入缓冲区
if _, err := aw.writer.WriteString(command); err != nil {
return err
}
// 根据策略决定是否同步
switch aw.policy {
case AOF_ALWAYS:
if err := aw.writer.Flush(); err != nil {
return err
}
if err := aw.file.Sync(); err != nil {
return err
}
case AOF_EVERYSEC:
// 由同步协程处理
case AOF_NO:
// 由操作系统处理
}
return nil
}
// 构建 RESP 格式命令
func (aw *AOFWriter) buildRESPCommand(args []string) string {
var result string
// 数组开始
result += fmt.Sprintf("*%d\r\n", len(args))
// 每个参数
for _, arg := range args {
result += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
}
return result
}
// 启动同步协程
func (aw *AOFWriter) startSyncWorker() {
aw.wg.Add(1)
go func() {
defer aw.wg.Done()
aw.syncTicker = time.NewTicker(time.Second)
defer aw.syncTicker.Stop()
for {
select {
case <-aw.syncTicker.C:
aw.sync()
case <-aw.stopCh:
return
}
}
}()
}
// 同步到磁盘
func (aw *AOFWriter) sync() error {
aw.mu.Lock()
defer aw.mu.Unlock()
if err := aw.writer.Flush(); err != nil {
return err
}
if err := aw.file.Sync(); err != nil {
return err
}
aw.lastSync = time.Now()
return nil
}
// 关闭 AOF 写入器
func (aw *AOFWriter) Close() error {
close(aw.stopCh)
aw.wg.Wait()
aw.mu.Lock()
defer aw.mu.Unlock()
if err := aw.writer.Flush(); err != nil {
return err
}
if err := aw.file.Sync(); err != nil {
return err
}
return aw.file.Close()
}
// 获取文件大小
func (aw *AOFWriter) Size() (int64, error) {
aw.mu.Lock()
defer aw.mu.Unlock()
if err := aw.writer.Flush(); err != nil {
return 0, err
}
stat, err := aw.file.Stat()
if err != nil {
return 0, err
}
return stat.Size(), nil
}
// 获取最后同步时间
func (aw *AOFWriter) LastSync() time.Time {
aw.mu.Lock()
defer aw.mu.Unlock()
return aw.lastSync
}
2. AOF 读取器实现
// aof/aof_reader.go
package aof
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
)
// AOF 读取器
type AOFReader struct {
file *os.File
reader *bufio.Reader
}
// 创建 AOF 读取器
func NewAOFReader(filename string) (*AOFReader, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
return &AOFReader{
file: file,
reader: bufio.NewReader(file),
}, nil
}
// 读取下一个命令
func (ar *AOFReader) ReadNextCommand() ([]string, error) {
// 读取数组长度
line, err := ar.reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "*") {
return nil, fmt.Errorf("invalid AOF format: expected array, got %s", line)
}
count, err := strconv.Atoi(line[1:])
if err != nil {
return nil, err
}
if count <= 0 {
return nil, fmt.Errorf("invalid array count: %d", count)
}
// 读取命令参数
args := make([]string, count)
for i := 0; i < count; i++ {
// 读取字符串长度
line, err := ar.reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "$") {
return nil, fmt.Errorf("invalid AOF format: expected string, got %s", line)
}
length, err := strconv.Atoi(line[1:])
if err != nil {
return nil, err
}
// 读取字符串内容
arg := make([]byte, length)
if _, err := io.ReadFull(ar.reader, arg); err != nil {
return nil, err
}
// 读取 \r\n
if _, err := ar.reader.ReadString('\n'); err != nil {
return nil, err
}
args[i] = string(arg)
}
return args, nil
}
// 读取所有命令
func (ar *AOFReader) ReadAllCommands() ([][]string, error) {
var commands [][]string
for {
command, err := ar.ReadNextCommand()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
commands = append(commands, command)
}
return commands, nil
}
// 关闭 AOF 读取器
func (ar *AOFReader) Close() error {
return ar.file.Close()
}
// 获取文件大小
func (ar *AOFReader) Size() (int64, error) {
stat, err := ar.file.Stat()
if err != nil {
return 0, err
}
return stat.Size(), nil
}
3. AOF 重写机制
// aof/aof_rewrite.go
package aof
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// AOF 重写器
type AOFRewriter struct {
originalFile string
tempFile string
writer *AOFWriter
mu sync.Mutex
isRewriting bool
}
// 创建 AOF 重写器
func NewAOFRewriter(originalFile string) *AOFRewriter {
return &AOFRewriter{
originalFile: originalFile,
tempFile: originalFile + ".tmp",
}
}
// 开始重写
func (ar *AOFRewriter) StartRewrite() error {
ar.mu.Lock()
defer ar.mu.Unlock()
if ar.isRewriting {
return fmt.Errorf("AOF rewrite already in progress")
}
// 创建临时文件
writer, err := NewAOFWriter(ar.tempFile, AOF_ALWAYS, 8192)
if err != nil {
return err
}
ar.writer = writer
ar.isRewriting = true
return nil
}
// 写入重写命令
func (ar *AOFRewriter) WriteCommand(args []string) error {
ar.mu.Lock()
defer ar.mu.Unlock()
if !ar.isRewriting {
return fmt.Errorf("AOF rewrite not started")
}
return ar.writer.WriteCommand(args)
}
// 完成重写
func (ar *AOFRewriter) FinishRewrite() error {
ar.mu.Lock()
defer ar.mu.Unlock()
if !ar.isRewriting {
return fmt.Errorf("AOF rewrite not started")
}
// 关闭临时文件
if err := ar.writer.Close(); err != nil {
return err
}
// 原子性替换文件
if err := ar.atomicReplace(); err != nil {
return err
}
ar.isRewriting = false
return nil
}
// 原子性替换文件
func (ar *AOFRewriter) atomicReplace() error {
// 备份原文件
backupFile := ar.originalFile + ".bak"
if err := os.Rename(ar.originalFile, backupFile); err != nil {
return err
}
// 重命名临时文件
if err := os.Rename(ar.tempFile, ar.originalFile); err != nil {
// 恢复原文件
os.Rename(backupFile, ar.originalFile)
return err
}
// 删除备份文件
os.Remove(backupFile)
return nil
}
// 取消重写
func (ar *AOFRewriter) CancelRewrite() error {
ar.mu.Lock()
defer ar.mu.Unlock()
if !ar.isRewriting {
return fmt.Errorf("AOF rewrite not started")
}
// 关闭临时文件
if ar.writer != nil {
ar.writer.Close()
}
// 删除临时文件
os.Remove(ar.tempFile)
ar.isRewriting = false
return nil
}
// 检查是否正在重写
func (ar *AOFRewriter) IsRewriting() bool {
ar.mu.Lock()
defer ar.mu.Unlock()
return ar.isRewriting
}
4. AOF 管理器实现
// aof/aof_manager.go
package aof
import (
"fmt"
"sync"
"time"
)
// AOF 管理器
type AOFManager struct {
writer *AOFWriter
rewriter *AOFRewriter
policy AOFPolicy
filename string
mu sync.RWMutex
isEnabled bool
lastRewrite time.Time
rewriteSize int64
minRewriteSize int64
}
// 创建 AOF 管理器
func NewAOFManager(filename string, policy AOFPolicy, minRewriteSize int64) (*AOFManager, error) {
writer, err := NewAOFWriter(filename, policy, 8192)
if err != nil {
return nil, err
}
return &AOFManager{
writer: writer,
rewriter: NewAOFRewriter(filename),
policy: policy,
filename: filename,
isEnabled: true,
lastRewrite: time.Now(),
rewriteSize: 0,
minRewriteSize: minRewriteSize,
}, nil
}
// 写入命令
func (am *AOFManager) WriteCommand(args []string) error {
am.mu.RLock()
defer am.mu.RUnlock()
if !am.isEnabled {
return nil
}
// 如果正在重写,写入重写器
if am.rewriter.IsRewriting() {
return am.rewriter.WriteCommand(args)
}
// 写入主文件
if err := am.writer.WriteCommand(args); err != nil {
return err
}
// 更新重写大小
am.rewriteSize += int64(len(fmt.Sprintf("%v", args)))
// 检查是否需要重写
if am.shouldRewrite() {
go am.startRewrite()
}
return nil
}
// 检查是否需要重写
func (am *AOFManager) shouldRewrite() bool {
// 检查文件大小
size, err := am.writer.Size()
if err != nil {
return false
}
if size < am.minRewriteSize {
return false
}
// 检查时间间隔
if time.Since(am.lastRewrite) < time.Minute {
return false
}
return true
}
// 开始重写
func (am *AOFManager) startRewrite() {
am.mu.Lock()
defer am.mu.Unlock()
if am.rewriter.IsRewriting() {
return
}
// 开始重写
if err := am.rewriter.StartRewrite(); err != nil {
fmt.Printf("Failed to start AOF rewrite: %v\n", err)
return
}
// 重写完成后替换文件
go func() {
defer am.rewriter.FinishRewrite()
// 这里应该从当前数据状态生成重写命令
// 简化实现,实际应该遍历所有键值对
am.generateRewriteCommands()
am.mu.Lock()
am.lastRewrite = time.Now()
am.rewriteSize = 0
am.mu.Unlock()
}()
}
// 生成重写命令(简化实现)
func (am *AOFManager) generateRewriteCommands() {
// 实际实现应该遍历所有键值对,生成最小化的命令集
// 这里只是示例
commands := [][]string{
{"SET", "key1", "value1"},
{"SET", "key2", "value2"},
{"HSET", "hash1", "field1", "value1"},
}
for _, cmd := range commands {
am.rewriter.WriteCommand(cmd)
}
}
// 启用 AOF
func (am *AOFManager) Enable() {
am.mu.Lock()
defer am.mu.Unlock()
am.isEnabled = true
}
// 禁用 AOF
func (am *AOFManager) Disable() {
am.mu.Lock()
defer am.mu.Unlock()
am.isEnabled = false
}
// 强制重写
func (am *AOFManager) ForceRewrite() error {
am.mu.Lock()
defer am.mu.Unlock()
if am.rewriter.IsRewriting() {
return fmt.Errorf("AOF rewrite already in progress")
}
go am.startRewrite()
return nil
}
// 获取状态
func (am *AOFManager) GetStatus() map[string]interface{} {
am.mu.RLock()
defer am.mu.RUnlock()
size, _ := am.writer.Size()
return map[string]interface{}{
"enabled": am.isEnabled,
"filename": am.filename,
"size": size,
"policy": am.policy,
"is_rewriting": am.rewriter.IsRewriting(),
"last_rewrite": am.lastRewrite,
"rewrite_size": am.rewriteSize,
"min_rewrite_size": am.minRewriteSize,
}
}
// 关闭 AOF 管理器
func (am *AOFManager) Close() error {
am.mu.Lock()
defer am.mu.Unlock()
// 取消重写
if am.rewriter.IsRewriting() {
am.rewriter.CancelRewrite()
}
// 关闭写入器
return am.writer.Close()
}
5. 数据恢复实现
// aof/aof_recovery.go
package aof
import (
"fmt"
"log"
"time"
)
// 数据恢复器
type AOFRecovery struct {
filename string
reader *AOFReader
}
// 创建数据恢复器
func NewAOFRecovery(filename string) *AOFRecovery {
return &AOFRecovery{
filename: filename,
}
}
// 恢复数据
func (ar *AOFRecovery) Recover() error {
reader, err := NewAOFReader(ar.filename)
if err != nil {
return err
}
defer reader.Close()
ar.reader = reader
// 读取所有命令
commands, err := reader.ReadAllCommands()
if err != nil {
return err
}
// 执行命令恢复数据
for i, command := range commands {
if err := ar.executeCommand(command); err != nil {
log.Printf("Failed to execute command %d: %v", i, err)
continue
}
}
return nil
}
// 执行命令
func (ar *AOFRecovery) executeCommand(args []string) error {
if len(args) == 0 {
return fmt.Errorf("empty command")
}
command := args[0]
switch command {
case "SET":
if len(args) != 3 {
return fmt.Errorf("invalid SET command: %v", args)
}
// 这里应该调用实际的 SET 命令
fmt.Printf("SET %s %s\n", args[1], args[2])
case "GET":
if len(args) != 2 {
return fmt.Errorf("invalid GET command: %v", args)
}
// 这里应该调用实际的 GET 命令
fmt.Printf("GET %s\n", args[1])
case "DEL":
if len(args) != 2 {
return fmt.Errorf("invalid DEL command: %v", args)
}
// 这里应该调用实际的 DEL 命令
fmt.Printf("DEL %s\n", args[1])
case "HSET":
if len(args) < 3 {
return fmt.Errorf("invalid HSET command: %v", args)
}
// 这里应该调用实际的 HSET 命令
fmt.Printf("HSET %s %s %s\n", args[1], args[2], args[3])
case "LPUSH":
if len(args) < 2 {
return fmt.Errorf("invalid LPUSH command: %v", args)
}
// 这里应该调用实际的 LPUSH 命令
fmt.Printf("LPUSH %s %v\n", args[1], args[2:])
case "ZADD":
if len(args) < 3 {
return fmt.Errorf("invalid ZADD command: %v", args)
}
// 这里应该调用实际的 ZADD 命令
fmt.Printf("ZADD %s %s %s\n", args[1], args[2], args[3])
default:
// 忽略未知命令
fmt.Printf("Unknown command: %s\n", command)
}
return nil
}
// 验证 AOF 文件
func (ar *AOFRecovery) Validate() error {
reader, err := NewAOFReader(ar.filename)
if err != nil {
return err
}
defer reader.Close()
commandCount := 0
for {
_, err := reader.ReadNextCommand()
if err != nil {
if err == io.EOF {
break
}
return err
}
commandCount++
}
fmt.Printf("AOF file validation completed: %d commands found\n", commandCount)
return nil
}
// 获取 AOF 文件统计信息
func (ar *AOFRecovery) GetStats() (map[string]interface{}, error) {
reader, err := NewAOFReader(ar.filename)
if err != nil {
return nil, err
}
defer reader.Close()
stats := make(map[string]interface{})
// 文件大小
size, err := reader.Size()
if err != nil {
return nil, err
}
stats["file_size"] = size
// 命令数量
commandCount := 0
for {
_, err := reader.ReadNextCommand()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
commandCount++
}
stats["command_count"] = commandCount
return stats, nil
}
测试验证
1. 单元测试
// aof/aof_test.go
package aof
import (
"os"
"testing"
"time"
)
func TestAOFWriter(t *testing.T) {
filename := "test.aof"
defer os.Remove(filename)
writer, err := NewAOFWriter(filename, AOF_ALWAYS, 1024)
if err != nil {
t.Fatalf("Failed to create AOF writer: %v", err)
}
defer writer.Close()
// 写入命令
commands := [][]string{
{"SET", "key1", "value1"},
{"GET", "key1"},
{"DEL", "key1"},
}
for _, cmd := range commands {
if err := writer.WriteCommand(cmd); err != nil {
t.Fatalf("Failed to write command: %v", err)
}
}
// 验证文件大小
size, err := writer.Size()
if err != nil {
t.Fatalf("Failed to get file size: %v", err)
}
if size == 0 {
t.Error("File size should not be zero")
}
}
func TestAOFReader(t *testing.T) {
filename := "test.aof"
defer os.Remove(filename)
// 创建测试文件
writer, err := NewAOFWriter(filename, AOF_ALWAYS, 1024)
if err != nil {
t.Fatalf("Failed to create AOF writer: %v", err)
}
// 写入测试命令
testCommands := [][]string{
{"SET", "key1", "value1"},
{"GET", "key1"},
{"DEL", "key1"},
}
for _, cmd := range testCommands {
writer.WriteCommand(cmd)
}
writer.Close()
// 读取命令
reader, err := NewAOFReader(filename)
if err != nil {
t.Fatalf("Failed to create AOF reader: %v", err)
}
defer reader.Close()
// 验证读取的命令
for i, expectedCmd := range testCommands {
cmd, err := reader.ReadNextCommand()
if err != nil {
t.Fatalf("Failed to read command %d: %v", i, err)
}
if len(cmd) != len(expectedCmd) {
t.Errorf("Command %d length mismatch: expected %d, got %d", i, len(expectedCmd), len(cmd))
}
for j, arg := range cmd {
if arg != expectedCmd[j] {
t.Errorf("Command %d arg %d mismatch: expected %s, got %s", i, j, expectedCmd[j], arg)
}
}
}
}
func TestAOFRewriter(t *testing.T) {
filename := "test.aof"
defer os.Remove(filename)
defer os.Remove(filename + ".tmp")
rewriter := NewAOFRewriter(filename)
// 开始重写
if err := rewriter.StartRewrite(); err != nil {
t.Fatalf("Failed to start rewrite: %v", err)
}
// 写入重写命令
commands := [][]string{
{"SET", "key1", "value1"},
{"SET", "key2", "value2"},
}
for _, cmd := range commands {
if err := rewriter.WriteCommand(cmd); err != nil {
t.Fatalf("Failed to write rewrite command: %v", err)
}
}
// 完成重写
if err := rewriter.FinishRewrite(); err != nil {
t.Fatalf("Failed to finish rewrite: %v", err)
}
// 验证文件存在
if _, err := os.Stat(filename); os.IsNotExist(err) {
t.Error("Rewritten file should exist")
}
}
func TestAOFManager(t *testing.T) {
filename := "test.aof"
defer os.Remove(filename)
manager, err := NewAOFManager(filename, AOF_EVERYSEC, 1024)
if err != nil {
t.Fatalf("Failed to create AOF manager: %v", err)
}
defer manager.Close()
// 写入命令
commands := [][]string{
{"SET", "key1", "value1"},
{"GET", "key1"},
{"DEL", "key1"},
}
for _, cmd := range commands {
if err := manager.WriteCommand(cmd); err != nil {
t.Fatalf("Failed to write command: %v", err)
}
}
// 等待同步
time.Sleep(2 * time.Second)
// 获取状态
status := manager.GetStatus()
if !status["enabled"].(bool) {
t.Error("AOF should be enabled")
}
if status["filename"].(string) != filename {
t.Errorf("Expected filename %s, got %s", filename, status["filename"])
}
}
func TestAOFRecovery(t *testing.T) {
filename := "test.aof"
defer os.Remove(filename)
// 创建测试文件
writer, err := NewAOFWriter(filename, AOF_ALWAYS, 1024)
if err != nil {
t.Fatalf("Failed to create AOF writer: %v", err)
}
// 写入测试命令
testCommands := [][]string{
{"SET", "key1", "value1"},
{"SET", "key2", "value2"},
{"DEL", "key1"},
}
for _, cmd := range testCommands {
writer.WriteCommand(cmd)
}
writer.Close()
// 恢复数据
recovery := NewAOFRecovery(filename)
if err := recovery.Recover(); err != nil {
t.Fatalf("Failed to recover data: %v", err)
}
// 验证文件
if err := recovery.Validate(); err != nil {
t.Fatalf("Failed to validate AOF file: %v", err)
}
// 获取统计信息
stats, err := recovery.GetStats()
if err != nil {
t.Fatalf("Failed to get stats: %v", err)
}
if stats["command_count"].(int) != len(testCommands) {
t.Errorf("Expected %d commands, got %d", len(testCommands), stats["command_count"])
}
}
2. 性能基准测试
// aof/benchmark_test.go
package aof
import (
"os"
"testing"
)
func BenchmarkAOFWrite(b *testing.B) {
filename := "benchmark.aof"
defer os.Remove(filename)
writer, err := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
if err != nil {
b.Fatalf("Failed to create AOF writer: %v", err)
}
defer writer.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
command := []string{"SET", "key", "value"}
writer.WriteCommand(command)
}
}
func BenchmarkAOFRead(b *testing.B) {
filename := "benchmark.aof"
defer os.Remove(filename)
// 预填充文件
writer, err := NewAOFWriter(filename, AOF_ALWAYS, 8192)
if err != nil {
b.Fatalf("Failed to create AOF writer: %v", err)
}
for i := 0; i < 1000; i++ {
command := []string{"SET", "key", "value"}
writer.WriteCommand(command)
}
writer.Close()
// 读取测试
reader, err := NewAOFReader(filename)
if err != nil {
b.Fatalf("Failed to create AOF reader: %v", err)
}
defer reader.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
reader.ReadNextCommand()
}
}
func BenchmarkAOFPolicyComparison(b *testing.B) {
policies := []AOFPolicy{AOF_ALWAYS, AOF_EVERYSEC, AOF_NO}
for _, policy := range policies {
b.Run(fmt.Sprintf("Policy_%d", policy), func(b *testing.B) {
filename := fmt.Sprintf("benchmark_%d.aof", policy)
defer os.Remove(filename)
writer, err := NewAOFWriter(filename, policy, 8192)
if err != nil {
b.Fatalf("Failed to create AOF writer: %v", err)
}
defer writer.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
command := []string{"SET", "key", "value"}
writer.WriteCommand(command)
}
})
}
}
func BenchmarkAOFRewrite(b *testing.B) {
filename := "benchmark.aof"
defer os.Remove(filename)
defer os.Remove(filename + ".tmp")
b.ResetTimer()
for i := 0; i < b.N; i++ {
rewriter := NewAOFRewriter(filename)
if err := rewriter.StartRewrite(); err != nil {
b.Fatalf("Failed to start rewrite: %v", err)
}
// 写入重写命令
for j := 0; j < 100; j++ {
command := []string{"SET", "key", "value"}
rewriter.WriteCommand(command)
}
if err := rewriter.FinishRewrite(); err != nil {
b.Fatalf("Failed to finish rewrite: %v", err)
}
}
}
3. 并发测试
// aof/concurrent_test.go
package aof
import (
"os"
"sync"
"testing"
"time"
)
func TestAOFConcurrentWrite(t *testing.T) {
filename := "concurrent.aof"
defer os.Remove(filename)
writer, err := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
if err != nil {
t.Fatalf("Failed to create AOF writer: %v", err)
}
defer writer.Close()
const numGoroutines = 10
const numCommands = 100
var wg sync.WaitGroup
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numCommands; j++ {
command := []string{"SET", fmt.Sprintf("key_%d_%d", goroutineID, j), "value"}
if err := writer.WriteCommand(command); err != nil {
t.Errorf("Failed to write command: %v", err)
}
}
}(i)
}
wg.Wait()
// 等待同步
time.Sleep(2 * time.Second)
// 验证文件大小
size, err := writer.Size()
if err != nil {
t.Fatalf("Failed to get file size: %v", err)
}
if size == 0 {
t.Error("File size should not be zero")
}
}
func TestAOFConcurrentReadWrite(t *testing.T) {
filename := "concurrent.aof"
defer os.Remove(filename)
writer, err := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
if err != nil {
t.Fatalf("Failed to create AOF writer: %v", err)
}
defer writer.Close()
const numGoroutines = 5
const numCommands = 100
var wg sync.WaitGroup
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numCommands; j++ {
command := []string{"SET", fmt.Sprintf("key_%d_%d", goroutineID, j), "value"}
writer.WriteCommand(command)
}
}(i)
}
// 并发读取
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
reader, err := NewAOFReader(filename)
if err != nil {
t.Errorf("Failed to create AOF reader: %v", err)
return
}
defer reader.Close()
for j := 0; j < numCommands; j++ {
_, err := reader.ReadNextCommand()
if err != nil {
// 忽略 EOF 错误
if err != io.EOF {
t.Errorf("Failed to read command: %v", err)
}
break
}
}
}(i)
}
wg.Wait()
}
性能对比分析
1. 同步策略对比
策略 | 性能 | 安全性 | 适用场景 |
---|---|---|---|
always | 最慢 | 最高 | 对数据安全要求极高 |
everysec | 中等 | 中等 | 平衡性能和安全性 |
no | 最快 | 最低 | 对性能要求极高 |
2. AOF vs RDB 对比
特性 | AOF | RDB | 说明 |
---|---|---|---|
数据完整性 | 高 | 中等 | AOF 记录每个写操作 |
文件大小 | 大 | 小 | AOF 文件通常比 RDB 大 |
恢复速度 | 慢 | 快 | AOF 需要重放所有命令 |
性能影响 | 中等 | 低 | AOF 需要记录每个命令 |
可读性 | 好 | 差 | AOF 文件可读,RDB 二进制 |
3. 性能测试结果
// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
// AOF 写入性能
b.Run("AOF_Write", func(b *testing.B) {
filename := "benchmark.aof"
defer os.Remove(filename)
writer, _ := NewAOFWriter(filename, AOF_EVERYSEC, 8192)
defer writer.Close()
for i := 0; i < b.N; i++ {
command := []string{"SET", "key", "value"}
writer.WriteCommand(command)
}
})
// RDB 写入性能(模拟)
b.Run("RDB_Write", func(b *testing.B) {
// 模拟 RDB 写入
for i := 0; i < b.N; i++ {
// RDB 写入逻辑
}
})
}
面试要点
1. AOF 持久化的优势
答案要点:
- 数据完整性:记录每个写操作,数据丢失最少
- 可读性:AOF 文件是文本格式,便于调试
- 实时性:可以实时记录数据变化
- 灵活性:支持多种同步策略
2. AOF 重写的必要性
答案要点:
- 文件膨胀:AOF 文件会不断增长
- 恢复效率:重写后恢复速度更快
- 存储空间:重写后文件更小
- 性能优化:减少 I/O 操作
3. 同步策略的选择
答案要点:
- always:数据安全要求高,性能要求低
- everysec:平衡性能和安全性
- no:性能要求高,数据安全要求低
- 混合策略:根据业务需求动态调整
4. AOF 性能优化
答案要点:
- 缓冲区优化:使用合适的缓冲区大小
- 批量写入:减少系统调用次数
- 异步写入:使用协程异步处理
- 压缩优化:定期压缩 AOF 文件
总结
通过本章学习,我们深入理解了:
- AOF 持久化的基本原理和文件格式
- 三种同步策略的实现和选择
- AOF 重写机制的完整实现
- 数据恢复流程和验证方法
- 性能优化技巧和最佳实践
AOF 持久化为 Redis 提供了高可靠性的数据持久化方案。在下一章中,我们将学习混合持久化策略,了解如何结合 RDB 和 AOF 的优势。