HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串与 SDS 实现
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

缓存一致性策略

学习目标

  • 深入理解 Cache Aside/Write Through/Write Behind 模式
  • 掌握缓存穿透/击穿/雪崩的解决方案
  • 实现双写一致性问题和解决策略
  • 理解 CDC 同步方案和实现
  • 掌握分布式事务和最终一致性

缓存一致性概述

1. 缓存一致性挑战

在分布式系统中,缓存和数据库之间的数据一致性是一个复杂的问题:

┌─────────────────────────────────────────────────────────────┐
│                   缓存一致性挑战                            │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   应用层    │    │   缓存层    │    │   数据库    │     │
│  │             │    │             │    │             │     │
│  │  ┌───────┐  │    │  ┌───────┐  │    │  ┌───────┐  │     │
│  │  │ 读操作 │  │    │  │ 缓存  │  │    │  │ 主库  │  │     │
│  │  └───────┘  │    │  └───────┘  │    │  └───────┘  │     │
│  │  ┌───────┐  │    │  ┌───────┐  │    │  ┌───────┐  │     │
│  │  │ 写操作 │  │    │  │ 缓存  │  │    │  │ 从库  │  │     │
│  │  └───────┘  │    │  └───────┘  │    │  └───────┘  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│           │                   │                   │        │
│           └───────────┬───────┴───────────────────┘        │
│                       │                                    │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │                一致性挑战                                │ │
│  │  • 数据更新顺序:先更新缓存还是先更新数据库?              │ │
│  │  • 并发访问:多个线程同时读写同一数据                    │ │
│  │  • 网络延迟:缓存和数据库之间的网络延迟                  │ │
│  │  • 故障恢复:系统故障后的数据恢复                        │ │
│  └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

2. 缓存一致性模式

模式读操作写操作优点缺点适用场景
Cache Aside先读缓存,未命中读数据库先更新数据库,再删除缓存简单、灵活可能不一致大多数场景
Write Through直接读缓存同时更新缓存和数据库数据一致性好性能较差对一致性要求高
Write Behind直接读缓存先更新缓存,异步写数据库性能最好可能丢失数据对性能要求高

️ Go 语言缓存一致性实现

1. Cache Aside 模式实现

// cache/cache_aside.go
package cache

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// 缓存接口
type Cache interface {
    Get(ctx context.Context, key string) (interface{}, error)
    Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
    Delete(ctx context.Context, key string) error
}

// 数据库接口
type Database interface {
    Get(ctx context.Context, key string) (interface{}, error)
    Set(ctx context.Context, key string, value interface{}) error
    Delete(ctx context.Context, key string) error
}

// Cache Aside 实现
type CacheAside struct {
    cache    Cache
    database Database
    mu       sync.RWMutex
}

// 创建 Cache Aside
func NewCacheAside(cache Cache, database Database) *CacheAside {
    return &CacheAside{
        cache:    cache,
        database: database,
    }
}

// 读操作
func (ca *CacheAside) Get(ctx context.Context, key string) (interface{}, error) {
    // 1. 先读缓存
    value, err := ca.cache.Get(ctx, key)
    if err == nil {
        return value, nil
    }
    
    // 2. 缓存未命中,读数据库
    value, err = ca.database.Get(ctx, key)
    if err != nil {
        return nil, err
    }
    
    // 3. 将数据写入缓存
    ca.cache.Set(ctx, key, value, time.Hour)
    
    return value, nil
}

// 写操作
func (ca *CacheAside) Set(ctx context.Context, key string, value interface{}) error {
    ca.mu.Lock()
    defer ca.mu.Unlock()
    
    // 1. 先更新数据库
    if err := ca.database.Set(ctx, key, value); err != nil {
        return err
    }
    
    // 2. 删除缓存
    ca.cache.Delete(ctx, key)
    
    return nil
}

// 删除操作
func (ca *CacheAside) Delete(ctx context.Context, key string) error {
    ca.mu.Lock()
    defer ca.mu.Unlock()
    
    // 1. 先删除数据库
    if err := ca.database.Delete(ctx, key); err != nil {
        return err
    }
    
    // 2. 删除缓存
    ca.cache.Delete(ctx, key)
    
    return nil
}

2. Write Through 模式实现

// cache/write_through.go
package cache

import (
    "context"
    "sync"
    "time"
)

// Write Through 实现
type WriteThrough struct {
    cache    Cache
    database Database
    mu       sync.RWMutex
}

// 创建 Write Through
func NewWriteThrough(cache Cache, database Database) *WriteThrough {
    return &WriteThrough{
        cache:    cache,
        database: database,
    }
}

// 读操作
func (wt *WriteThrough) Get(ctx context.Context, key string) (interface{}, error) {
    // 直接读缓存
    return wt.cache.Get(ctx, key)
}

// 写操作
func (wt *WriteThrough) Set(ctx context.Context, key string, value interface{}) error {
    wt.mu.Lock()
    defer wt.mu.Unlock()
    
    // 1. 同时更新缓存和数据库
    if err := wt.cache.Set(ctx, key, value, time.Hour); err != nil {
        return err
    }
    
    if err := wt.database.Set(ctx, key, value); err != nil {
        // 如果数据库更新失败,回滚缓存
        wt.cache.Delete(ctx, key)
        return err
    }
    
    return nil
}

// 删除操作
func (wt *WriteThrough) Delete(ctx context.Context, key string) error {
    wt.mu.Lock()
    defer wt.mu.Unlock()
    
    // 1. 同时删除缓存和数据库
    if err := wt.cache.Delete(ctx, key); err != nil {
        return err
    }
    
    if err := wt.database.Delete(ctx, key); err != nil {
        // 如果数据库删除失败,回滚缓存
        wt.cache.Set(ctx, key, nil, time.Hour)
        return err
    }
    
    return nil
}

3. Write Behind 模式实现

// cache/write_behind.go
package cache

import (
    "context"
    "sync"
    "time"
)

// Write Behind 实现
type WriteBehind struct {
    cache      Cache
    database   Database
    writeQueue chan WriteTask
    mu         sync.RWMutex
    stopCh     chan struct{}
    wg         sync.WaitGroup
}

// 写任务
type WriteTask struct {
    Key   string
    Value interface{}
    Op    string // "set" or "delete"
}

// 创建 Write Behind
func NewWriteBehind(cache Cache, database Database, queueSize int) *WriteBehind {
    wb := &WriteBehind{
        cache:      cache,
        database:   database,
        writeQueue: make(chan WriteTask, queueSize),
        stopCh:     make(chan struct{}),
    }
    
    // 启动后台写入协程
    wb.wg.Add(1)
    go wb.backgroundWriter()
    
    return wb
}

// 读操作
func (wb *WriteBehind) Get(ctx context.Context, key string) (interface{}, error) {
    // 直接读缓存
    return wb.cache.Get(ctx, key)
}

// 写操作
func (wb *WriteBehind) Set(ctx context.Context, key string, value interface{}) error {
    wb.mu.Lock()
    defer wb.mu.Unlock()
    
    // 1. 先更新缓存
    if err := wb.cache.Set(ctx, key, value, time.Hour); err != nil {
        return err
    }
    
    // 2. 异步写入数据库
    select {
    case wb.writeQueue <- WriteTask{Key: key, Value: value, Op: "set"}:
    default:
        // 队列满了,同步写入
        return wb.database.Set(ctx, key, value)
    }
    
    return nil
}

// 删除操作
func (wb *WriteBehind) Delete(ctx context.Context, key string) error {
    wb.mu.Lock()
    defer wb.mu.Unlock()
    
    // 1. 先删除缓存
    if err := wb.cache.Delete(ctx, key); err != nil {
        return err
    }
    
    // 2. 异步删除数据库
    select {
    case wb.writeQueue <- WriteTask{Key: key, Op: "delete"}:
    default:
        // 队列满了,同步删除
        return wb.database.Delete(ctx, key)
    }
    
    return nil
}

// 后台写入协程
func (wb *WriteBehind) backgroundWriter() {
    defer wb.wg.Done()
    
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case task := <-wb.writeQueue:
            wb.processWriteTask(task)
        case <-ticker.C:
            // 定期处理队列中的任务
            wb.processWriteQueue()
        case <-wb.stopCh:
            // 处理剩余任务
            wb.processWriteQueue()
            return
        }
    }
}

// 处理写任务
func (wb *WriteBehind) processWriteTask(task WriteTask) {
    ctx := context.Background()
    
    switch task.Op {
    case "set":
        wb.database.Set(ctx, task.Key, task.Value)
    case "delete":
        wb.database.Delete(ctx, task.Key)
    }
}

// 处理写队列
func (wb *WriteBehind) processWriteQueue() {
    for {
        select {
        case task := <-wb.writeQueue:
            wb.processWriteTask(task)
        default:
            return
        }
    }
}

// 关闭 Write Behind
func (wb *WriteBehind) Close() {
    close(wb.stopCh)
    wb.wg.Wait()
}

4. 缓存穿透解决方案

// cache/cache_penetration.go
package cache

import (
    "context"
    "crypto/md5"
    "encoding/hex"
    "fmt"
    "sync"
    "time"
)

// 布隆过滤器接口
type BloomFilter interface {
    Add(key string) error
    Contains(key string) (bool, error)
}

// 缓存穿透防护
type CachePenetrationProtection struct {
    cache        Cache
    database     Database
    bloomFilter  BloomFilter
    nullCache    map[string]bool
    mu           sync.RWMutex
    nullCacheTTL time.Duration
}

// 创建缓存穿透防护
func NewCachePenetrationProtection(cache Cache, database Database, bloomFilter BloomFilter) *CachePenetrationProtection {
    return &CachePenetrationProtection{
        cache:        cache,
        database:     database,
        bloomFilter:  bloomFilter,
        nullCache:    make(map[string]bool),
        nullCacheTTL: time.Minute * 5,
    }
}

// 读操作(带防护)
func (cpp *CachePenetrationProtection) Get(ctx context.Context, key string) (interface{}, error) {
    // 1. 检查布隆过滤器
    exists, err := cpp.bloomFilter.Contains(key)
    if err != nil {
        return nil, err
    }
    
    if !exists {
        // 布隆过滤器说不存在,直接返回
        return nil, fmt.Errorf("key not found")
    }
    
    // 2. 检查空值缓存
    cpp.mu.RLock()
    if cpp.nullCache[key] {
        cpp.mu.RUnlock()
        return nil, fmt.Errorf("key not found")
    }
    cpp.mu.RUnlock()
    
    // 3. 读缓存
    value, err := cpp.cache.Get(ctx, key)
    if err == nil {
        return value, nil
    }
    
    // 4. 读数据库
    value, err = cpp.database.Get(ctx, key)
    if err != nil {
        // 数据库中没有,加入空值缓存
        cpp.mu.Lock()
        cpp.nullCache[key] = true
        cpp.mu.Unlock()
        
        // 定期清理空值缓存
        go cpp.cleanupNullCache(key)
        
        return nil, fmt.Errorf("key not found")
    }
    
    // 5. 将数据写入缓存
    cpp.cache.Set(ctx, key, value, time.Hour)
    
    return value, nil
}

// 写操作(带防护)
func (cpp *CachePenetrationProtection) Set(ctx context.Context, key string, value interface{}) error {
    // 1. 更新数据库
    if err := cpp.database.Set(ctx, key, value); err != nil {
        return err
    }
    
    // 2. 更新布隆过滤器
    if err := cpp.bloomFilter.Add(key); err != nil {
        return err
    }
    
    // 3. 更新缓存
    cpp.cache.Set(ctx, key, value, time.Hour)
    
    // 4. 从空值缓存中移除
    cpp.mu.Lock()
    delete(cpp.nullCache, key)
    cpp.mu.Unlock()
    
    return nil
}

// 清理空值缓存
func (cpp *CachePenetrationProtection) cleanupNullCache(key string) {
    time.Sleep(cpp.nullCacheTTL)
    
    cpp.mu.Lock()
    delete(cpp.nullCache, key)
    cpp.mu.Unlock()
}

// 简单布隆过滤器实现
type SimpleBloomFilter struct {
    bitset []bool
    size   int
    hashFuncs []func(string) int
}

// 创建简单布隆过滤器
func NewSimpleBloomFilter(size int, hashCount int) *SimpleBloomFilter {
    hashFuncs := make([]func(string) int, hashCount)
    for i := 0; i < hashCount; i++ {
        hashFuncs[i] = func(s string) int {
            h := md5.Sum([]byte(s + string(rune(i))))
            return int(h[0]) % size
        }
    }
    
    return &SimpleBloomFilter{
        bitset:    make([]bool, size),
        size:      size,
        hashFuncs: hashFuncs,
    }
}

// 添加元素
func (sbf *SimpleBloomFilter) Add(key string) error {
    for _, hashFunc := range sbf.hashFuncs {
        index := hashFunc(key)
        sbf.bitset[index] = true
    }
    return nil
}

// 检查元素是否存在
func (sbf *SimpleBloomFilter) Contains(key string) (bool, error) {
    for _, hashFunc := range sbf.hashFuncs {
        index := hashFunc(key)
        if !sbf.bitset[index] {
            return false, nil
        }
    }
    return true, nil
}

5. 缓存击穿解决方案

// cache/cache_breakdown.go
package cache

import (
    "context"
    "sync"
    "time"
)

// 缓存击穿防护
type CacheBreakdownProtection struct {
    cache    Cache
    database Database
    mutexes  map[string]*sync.Mutex
    mu       sync.RWMutex
}

// 创建缓存击穿防护
func NewCacheBreakdownProtection(cache Cache, database Database) *CacheBreakdownProtection {
    return &CacheBreakdownProtection{
        cache:    cache,
        database: database,
        mutexes:  make(map[string]*sync.Mutex),
    }
}

// 读操作(带防护)
func (cbp *CacheBreakdownProtection) Get(ctx context.Context, key string) (interface{}, error) {
    // 1. 先读缓存
    value, err := cbp.cache.Get(ctx, key)
    if err == nil {
        return value, nil
    }
    
    // 2. 获取互斥锁
    mutex := cbp.getMutex(key)
    mutex.Lock()
    defer mutex.Unlock()
    
    // 3. 再次检查缓存(双重检查)
    value, err = cbp.cache.Get(ctx, key)
    if err == nil {
        return value, nil
    }
    
    // 4. 读数据库
    value, err = cbp.database.Get(ctx, key)
    if err != nil {
        return nil, err
    }
    
    // 5. 将数据写入缓存
    cbp.cache.Set(ctx, key, value, time.Hour)
    
    return value, nil
}

// 获取互斥锁
func (cbp *CacheBreakdownProtection) getMutex(key string) *sync.Mutex {
    cbp.mu.RLock()
    mutex, exists := cbp.mutexes[key]
    cbp.mu.RUnlock()
    
    if exists {
        return mutex
    }
    
    cbp.mu.Lock()
    defer cbp.mu.Unlock()
    
    // 双重检查
    if mutex, exists := cbp.mutexes[key]; exists {
        return mutex
    }
    
    mutex = &sync.Mutex{}
    cbp.mutexes[key] = mutex
    return mutex
}

// 清理互斥锁
func (cbp *CacheBreakdownProtection) CleanupMutexes() {
    cbp.mu.Lock()
    defer cbp.mu.Unlock()
    
    // 清理长时间未使用的互斥锁
    for key, mutex := range cbp.mutexes {
        // 这里可以添加更复杂的清理逻辑
        _ = key
        _ = mutex
    }
}

6. 缓存雪崩解决方案

// cache/cache_avalanche.go
package cache

import (
    "context"
    "math/rand"
    "sync"
    "time"
)

// 缓存雪崩防护
type CacheAvalancheProtection struct {
    cache    Cache
    database Database
    mu       sync.RWMutex
}

// 创建缓存雪崩防护
func NewCacheAvalancheProtection(cache Cache, database Database) *CacheAvalancheProtection {
    return &CacheAvalancheProtection{
        cache:    cache,
        database: database,
    }
}

// 读操作(带防护)
func (cap *CacheAvalancheProtection) Get(ctx context.Context, key string) (interface{}, error) {
    // 1. 先读缓存
    value, err := cap.cache.Get(ctx, key)
    if err == nil {
        return value, nil
    }
    
    // 2. 读数据库
    value, err = cap.database.Get(ctx, key)
    if err != nil {
        return nil, err
    }
    
    // 3. 将数据写入缓存,添加随机过期时间
    expiration := cap.getRandomExpiration()
    cap.cache.Set(ctx, key, value, expiration)
    
    return value, nil
}

// 获取随机过期时间
func (cap *CacheAvalancheProtection) getRandomExpiration() time.Duration {
    // 基础过期时间
    baseExpiration := time.Hour
    
    // 添加随机偏移(±10%)
    randomOffset := time.Duration(rand.Intn(20)-10) * baseExpiration / 100
    
    return baseExpiration + randomOffset
}

// 批量预热缓存
func (cap *CacheAvalancheProtection) WarmupCache(ctx context.Context, keys []string) error {
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, 10) // 限制并发数
    
    for _, key := range keys {
        wg.Add(1)
        go func(k string) {
            defer wg.Done()
            
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            // 读数据库
            value, err := cap.database.Get(ctx, k)
            if err != nil {
                return
            }
            
            // 写入缓存
            expiration := cap.getRandomExpiration()
            cap.cache.Set(ctx, k, value, expiration)
        }(key)
    }
    
    wg.Wait()
    return nil
}

7. CDC 同步方案

// cache/cdc_sync.go
package cache

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// CDC 事件
type CDCEvent struct {
    ID        string      `json:"id"`
    Table     string      `json:"table"`
    Operation string      `json:"operation"` // INSERT, UPDATE, DELETE
    Before    interface{} `json:"before"`
    After     interface{} `json:"after"`
    Timestamp time.Time   `json:"timestamp"`
}

// CDC 同步器
type CDCSynchronizer struct {
    cache     Cache
    eventCh   chan CDCEvent
    stopCh    chan struct{}
    wg        sync.WaitGroup
    mu        sync.RWMutex
    isRunning bool
}

// 创建 CDC 同步器
func NewCDCSynchronizer(cache Cache, eventCh chan CDCEvent) *CDCSynchronizer {
    return &CDCSynchronizer{
        cache:   cache,
        eventCh: eventCh,
        stopCh:  make(chan struct{}),
    }
}

// 启动 CDC 同步
func (cdc *CDCSynchronizer) Start() error {
    cdc.mu.Lock()
    defer cdc.mu.Unlock()
    
    if cdc.isRunning {
        return fmt.Errorf("CDC synchronizer already running")
    }
    
    cdc.isRunning = true
    cdc.wg.Add(1)
    
    go cdc.syncLoop()
    
    return nil
}

// 停止 CDC 同步
func (cdc *CDCSynchronizer) Stop() {
    cdc.mu.Lock()
    defer cdc.mu.Unlock()
    
    if !cdc.isRunning {
        return
    }
    
    close(cdc.stopCh)
    cdc.wg.Wait()
    cdc.isRunning = false
}

// 同步循环
func (cdc *CDCSynchronizer) syncLoop() {
    defer cdc.wg.Done()
    
    for {
        select {
        case event := <-cdc.eventCh:
            cdc.processEvent(event)
        case <-cdc.stopCh:
            return
        }
    }
}

// 处理 CDC 事件
func (cdc *CDCSynchronizer) processEvent(event CDCEvent) {
    ctx := context.Background()
    
    // 根据操作类型处理
    switch event.Operation {
    case "INSERT", "UPDATE":
        // 更新缓存
        key := cdc.getCacheKey(event.Table, event.After)
        value := event.After
        
        if err := cdc.cache.Set(ctx, key, value, time.Hour); err != nil {
            fmt.Printf("Failed to update cache: %v\n", err)
        }
        
    case "DELETE":
        // 删除缓存
        key := cdc.getCacheKey(event.Table, event.Before)
        
        if err := cdc.cache.Delete(ctx, key); err != nil {
            fmt.Printf("Failed to delete from cache: %v\n", err)
        }
    }
}

// 获取缓存键
func (cdc *CDCSynchronizer) getCacheKey(table string, data interface{}) string {
    // 简化实现,实际应该根据主键生成
    return fmt.Sprintf("%s:%v", table, data)
}

// 模拟 CDC 事件生成
func (cdc *CDCSynchronizer) SimulateEvent(table string, operation string, data interface{}) {
    event := CDCEvent{
        ID:        fmt.Sprintf("%d", time.Now().UnixNano()),
        Table:     table,
        Operation: operation,
        After:     data,
        Timestamp: time.Now(),
    }
    
    select {
    case cdc.eventCh <- event:
    default:
        fmt.Println("Event channel full, dropping event")
    }
}

8. 分布式事务实现

// cache/distributed_transaction.go
package cache

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 分布式事务状态
type TransactionStatus int

const (
    TRANSACTION_PREPARING TransactionStatus = iota
    TRANSACTION_PREPARED
    TRANSACTION_COMMITTING
    TRANSACTION_COMMITTED
    TRANSACTION_ABORTING
    TRANSACTION_ABORTED
)

// 分布式事务
type DistributedTransaction struct {
    ID        string
    Status    TransactionStatus
    Operations []TransactionOperation
    mu        sync.RWMutex
    startTime time.Time
    timeout   time.Duration
}

// 事务操作
type TransactionOperation struct {
    Type      string      // "cache_set", "cache_delete", "db_set", "db_delete"
    Key       string
    Value     interface{}
    Rollback  func() error
}

// 分布式事务管理器
type DistributedTransactionManager struct {
    cache     Cache
    database  Database
    mu        sync.RWMutex
    timeout   time.Duration
}

// 创建分布式事务管理器
func NewDistributedTransactionManager(cache Cache, database Database, timeout time.Duration) *DistributedTransactionManager {
    return &DistributedTransactionManager{
        cache:    cache,
        database: database,
        timeout:  timeout,
    }
}

// 开始事务
func (dtm *DistributedTransactionManager) BeginTransaction() *DistributedTransaction {
    return &DistributedTransaction{
        ID:        fmt.Sprintf("tx_%d", time.Now().UnixNano()),
        Status:    TRANSACTION_PREPARING,
        Operations: make([]TransactionOperation, 0),
        startTime: time.Now(),
        timeout:   dtm.timeout,
    }
}

// 添加操作
func (tx *DistributedTransaction) AddOperation(opType, key string, value interface{}) {
    tx.mu.Lock()
    defer tx.mu.Unlock()
    
    operation := TransactionOperation{
        Type:  opType,
        Key:   key,
        Value: value,
    }
    
    tx.Operations = append(tx.Operations, operation)
}

// 准备事务
func (tx *DistributedTransaction) Prepare() error {
    tx.mu.Lock()
    defer tx.mu.Unlock()
    
    if tx.Status != TRANSACTION_PREPARING {
        return fmt.Errorf("transaction not in preparing state")
    }
    
    // 检查超时
    if time.Since(tx.startTime) > tx.timeout {
        tx.Status = TRANSACTION_ABORTED
        return fmt.Errorf("transaction timeout")
    }
    
    // 准备所有操作
    for i, op := range tx.Operations {
        rollback, err := tx.prepareOperation(op)
        if err != nil {
            // 回滚已准备的操作
            tx.rollbackOperations(i)
            tx.Status = TRANSACTION_ABORTED
            return err
        }
        
        tx.Operations[i].Rollback = rollback
    }
    
    tx.Status = TRANSACTION_PREPARED
    return nil
}

// 准备单个操作
func (tx *DistributedTransaction) prepareOperation(op TransactionOperation) (func() error, error) {
    // 简化实现,实际应该调用相应的准备方法
    switch op.Type {
    case "cache_set":
        return func() error {
            // 回滚缓存设置
            return nil
        }, nil
    case "cache_delete":
        return func() error {
            // 回滚缓存删除
            return nil
        }, nil
    case "db_set":
        return func() error {
            // 回滚数据库设置
            return nil
        }, nil
    case "db_delete":
        return func() error {
            // 回滚数据库删除
            return nil
        }, nil
    default:
        return nil, fmt.Errorf("unknown operation type: %s", op.Type)
    }
}

// 回滚操作
func (tx *DistributedTransaction) rollbackOperations(count int) {
    for i := count - 1; i >= 0; i-- {
        if tx.Operations[i].Rollback != nil {
            tx.Operations[i].Rollback()
        }
    }
}

// 提交事务
func (tx *DistributedTransaction) Commit() error {
    tx.mu.Lock()
    defer tx.mu.Unlock()
    
    if tx.Status != TRANSACTION_PREPARED {
        return fmt.Errorf("transaction not in prepared state")
    }
    
    tx.Status = TRANSACTION_COMMITTING
    
    // 执行所有操作
    for _, op := range tx.Operations {
        if err := tx.executeOperation(op); err != nil {
            // 回滚事务
            tx.rollbackOperations(len(tx.Operations))
            tx.Status = TRANSACTION_ABORTED
            return err
        }
    }
    
    tx.Status = TRANSACTION_COMMITTED
    return nil
}

// 执行单个操作
func (tx *DistributedTransaction) executeOperation(op TransactionOperation) error {
    // 简化实现,实际应该调用相应的执行方法
    switch op.Type {
    case "cache_set":
        // 执行缓存设置
        return nil
    case "cache_delete":
        // 执行缓存删除
        return nil
    case "db_set":
        // 执行数据库设置
        return nil
    case "db_delete":
        // 执行数据库删除
        return nil
    default:
        return fmt.Errorf("unknown operation type: %s", op.Type)
    }
}

// 中止事务
func (tx *DistributedTransaction) Abort() error {
    tx.mu.Lock()
    defer tx.mu.Unlock()
    
    if tx.Status == TRANSACTION_COMMITTED {
        return fmt.Errorf("transaction already committed")
    }
    
    tx.Status = TRANSACTION_ABORTING
    
    // 回滚所有操作
    tx.rollbackOperations(len(tx.Operations))
    
    tx.Status = TRANSACTION_ABORTED
    return nil
}

测试验证

1. 单元测试

// cache/cache_consistency_test.go
package cache

import (
    "context"
    "os"
    "testing"
    "time"
)

// 模拟缓存实现
type MockCache struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func NewMockCache() *MockCache {
    return &MockCache{
        data: make(map[string]interface{}),
    }
}

func (mc *MockCache) Get(ctx context.Context, key string) (interface{}, error) {
    mc.mu.RLock()
    defer mc.mu.RUnlock()
    
    value, exists := mc.data[key]
    if !exists {
        return nil, fmt.Errorf("key not found")
    }
    
    return value, nil
}

func (mc *MockCache) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
    mc.mu.Lock()
    defer mc.mu.Unlock()
    
    mc.data[key] = value
    return nil
}

func (mc *MockCache) Delete(ctx context.Context, key string) error {
    mc.mu.Lock()
    defer mc.mu.Unlock()
    
    delete(mc.data, key)
    return nil
}

// 模拟数据库实现
type MockDatabase struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func NewMockDatabase() *MockDatabase {
    return &MockDatabase{
        data: make(map[string]interface{}),
    }
}

func (md *MockDatabase) Get(ctx context.Context, key string) (interface{}, error) {
    md.mu.RLock()
    defer md.mu.RUnlock()
    
    value, exists := md.data[key]
    if !exists {
        return nil, fmt.Errorf("key not found")
    }
    
    return value, nil
}

func (md *MockDatabase) Set(ctx context.Context, key string, value interface{}) error {
    md.mu.Lock()
    defer md.mu.Unlock()
    
    md.data[key] = value
    return nil
}

func (md *MockDatabase) Delete(ctx context.Context, key string) error {
    md.mu.Lock()
    defer md.mu.Unlock()
    
    delete(md.data, key)
    return nil
}

func TestCacheAside(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    ca := NewCacheAside(cache, database)
    
    ctx := context.Background()
    
    // 测试写操作
    if err := ca.Set(ctx, "key1", "value1"); err != nil {
        t.Fatalf("Failed to set: %v", err)
    }
    
    // 测试读操作
    value, err := ca.Get(ctx, "key1")
    if err != nil {
        t.Fatalf("Failed to get: %v", err)
    }
    
    if value != "value1" {
        t.Errorf("Expected 'value1', got %v", value)
    }
}

func TestWriteThrough(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    wt := NewWriteThrough(cache, database)
    
    ctx := context.Background()
    
    // 测试写操作
    if err := wt.Set(ctx, "key1", "value1"); err != nil {
        t.Fatalf("Failed to set: %v", err)
    }
    
    // 测试读操作
    value, err := wt.Get(ctx, "key1")
    if err != nil {
        t.Fatalf("Failed to get: %v", err)
    }
    
    if value != "value1" {
        t.Errorf("Expected 'value1', got %v", value)
    }
}

func TestWriteBehind(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    wb := NewWriteBehind(cache, database, 100)
    defer wb.Close()
    
    ctx := context.Background()
    
    // 测试写操作
    if err := wb.Set(ctx, "key1", "value1"); err != nil {
        t.Fatalf("Failed to set: %v", err)
    }
    
    // 测试读操作
    value, err := wb.Get(ctx, "key1")
    if err != nil {
        t.Fatalf("Failed to get: %v", err)
    }
    
    if value != "value1" {
        t.Errorf("Expected 'value1', got %v", value)
    }
}

func TestCachePenetrationProtection(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    bloomFilter := NewSimpleBloomFilter(1000, 3)
    cpp := NewCachePenetrationProtection(cache, database, bloomFilter)
    
    ctx := context.Background()
    
    // 测试读操作(应该被布隆过滤器拦截)
    _, err := cpp.Get(ctx, "nonexistent_key")
    if err == nil {
        t.Error("Expected error for nonexistent key")
    }
}

func TestCacheBreakdownProtection(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    cbp := NewCacheBreakdownProtection(cache, database)
    
    ctx := context.Background()
    
    // 测试读操作
    value, err := cbp.Get(ctx, "key1")
    if err != nil {
        t.Fatalf("Failed to get: %v", err)
    }
    
    if value != "value1" {
        t.Errorf("Expected 'value1', got %v", value)
    }
}

func TestCacheAvalancheProtection(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    cap := NewCacheAvalancheProtection(cache, database)
    
    ctx := context.Background()
    
    // 测试读操作
    value, err := cap.Get(ctx, "key1")
    if err != nil {
        t.Fatalf("Failed to get: %v", err)
    }
    
    if value != "value1" {
        t.Errorf("Expected 'value1', got %v", value)
    }
}

func TestCDCSynchronizer(t *testing.T) {
    cache := NewMockCache()
    eventCh := make(chan CDCEvent, 100)
    cdc := NewCDCSynchronizer(cache, eventCh)
    
    // 启动 CDC 同步
    if err := cdc.Start(); err != nil {
        t.Fatalf("Failed to start CDC: %v", err)
    }
    defer cdc.Stop()
    
    // 模拟 CDC 事件
    cdc.SimulateEvent("users", "INSERT", map[string]interface{}{
        "id":   1,
        "name": "John",
    })
    
    // 等待事件处理
    time.Sleep(100 * time.Millisecond)
}

func TestDistributedTransaction(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    dtm := NewDistributedTransactionManager(cache, database, time.Minute)
    
    // 开始事务
    tx := dtm.BeginTransaction()
    
    // 添加操作
    tx.AddOperation("cache_set", "key1", "value1")
    tx.AddOperation("db_set", "key1", "value1")
    
    // 准备事务
    if err := tx.Prepare(); err != nil {
        t.Fatalf("Failed to prepare transaction: %v", err)
    }
    
    // 提交事务
    if err := tx.Commit(); err != nil {
        t.Fatalf("Failed to commit transaction: %v", err)
    }
}

2. 性能基准测试

// cache/benchmark_test.go
package cache

import (
    "context"
    "testing"
    "time"
)

func BenchmarkCacheAside(b *testing.B) {
    cache := NewMockCache()
    database := NewMockDatabase()
    ca := NewCacheAside(cache, database)
    
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i%1000)
        ca.Get(ctx, key)
    }
}

func BenchmarkWriteThrough(b *testing.B) {
    cache := NewMockCache()
    database := NewMockDatabase()
    wt := NewWriteThrough(cache, database)
    
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i%1000)
        value := fmt.Sprintf("value%d", i%1000)
        wt.Set(ctx, key, value)
    }
}

func BenchmarkWriteBehind(b *testing.B) {
    cache := NewMockCache()
    database := NewMockDatabase()
    wb := NewWriteBehind(cache, database, 1000)
    defer wb.Close()
    
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i%1000)
        value := fmt.Sprintf("value%d", i%1000)
        wb.Set(ctx, key, value)
    }
}

func BenchmarkCachePenetrationProtection(b *testing.B) {
    cache := NewMockCache()
    database := NewMockDatabase()
    bloomFilter := NewSimpleBloomFilter(10000, 3)
    cpp := NewCachePenetrationProtection(cache, database, bloomFilter)
    
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i%1000)
        cpp.Get(ctx, key)
    }
}

func BenchmarkCacheBreakdownProtection(b *testing.B) {
    cache := NewMockCache()
    database := NewMockDatabase()
    cbp := NewCacheBreakdownProtection(cache, database)
    
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i%1000)
        cbp.Get(ctx, key)
    }
}

func BenchmarkCacheAvalancheProtection(b *testing.B) {
    cache := NewMockCache()
    database := NewMockDatabase()
    cap := NewCacheAvalancheProtection(cache, database)
    
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i%1000)
        cap.Get(ctx, key)
    }
}

3. 并发测试

// cache/concurrent_test.go
package cache

import (
    "context"
    "sync"
    "testing"
    "time"
)

func TestCacheAsideConcurrent(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    ca := NewCacheAside(cache, database)
    
    ctx := context.Background()
    const numGoroutines = 10
    const numOperations = 100
    
    var wg sync.WaitGroup
    
    // 并发写入
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                value := fmt.Sprintf("value_%d_%d", goroutineID, j)
                
                if err := ca.Set(ctx, key, value); err != nil {
                    t.Errorf("Failed to set: %v", err)
                }
            }
        }(i)
    }
    
    // 并发读取
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                
                if _, err := ca.Get(ctx, key); err != nil {
                    t.Errorf("Failed to get: %v", err)
                }
            }
        }(i)
    }
    
    wg.Wait()
}

func TestWriteThroughConcurrent(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    wt := NewWriteThrough(cache, database)
    
    ctx := context.Background()
    const numGoroutines = 10
    const numOperations = 100
    
    var wg sync.WaitGroup
    
    // 并发写入
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                value := fmt.Sprintf("value_%d_%d", goroutineID, j)
                
                if err := wt.Set(ctx, key, value); err != nil {
                    t.Errorf("Failed to set: %v", err)
                }
            }
        }(i)
    }
    
    // 并发读取
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                
                if _, err := wt.Get(ctx, key); err != nil {
                    t.Errorf("Failed to get: %v", err)
                }
            }
        }(i)
    }
    
    wg.Wait()
}

func TestWriteBehindConcurrent(t *testing.T) {
    cache := NewMockCache()
    database := NewMockDatabase()
    wb := NewWriteBehind(cache, database, 1000)
    defer wb.Close()
    
    ctx := context.Background()
    const numGoroutines = 10
    const numOperations = 100
    
    var wg sync.WaitGroup
    
    // 并发写入
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                value := fmt.Sprintf("value_%d_%d", goroutineID, j)
                
                if err := wb.Set(ctx, key, value); err != nil {
                    t.Errorf("Failed to set: %v", err)
                }
            }
        }(i)
    }
    
    // 并发读取
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()
            
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key_%d_%d", goroutineID, j)
                
                if _, err := wb.Get(ctx, key); err != nil {
                    t.Errorf("Failed to get: %v", err)
                }
            }
        }(i)
    }
    
    wg.Wait()
}

性能对比分析

1. 缓存模式对比

模式读性能写性能一致性复杂度适用场景
Cache Aside高中等中等低大多数场景
Write Through高低高中等对一致性要求高
Write Behind高高低高对性能要求高

2. 防护策略对比

策略性能影响内存开销实现复杂度防护效果
缓存穿透防护低中等中等高
缓存击穿防护中等低低高
缓存雪崩防护低低低中等

3. 性能测试结果

// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
    // Cache Aside 性能
    b.Run("CacheAside", func(b *testing.B) {
        cache := NewMockCache()
        database := NewMockDatabase()
        ca := NewCacheAside(cache, database)
        
        ctx := context.Background()
        for i := 0; i < b.N; i++ {
            key := fmt.Sprintf("key%d", i%1000)
            ca.Get(ctx, key)
        }
    })
    
    // Write Through 性能
    b.Run("WriteThrough", func(b *testing.B) {
        cache := NewMockCache()
        database := NewMockDatabase()
        wt := NewWriteThrough(cache, database)
        
        ctx := context.Background()
        for i := 0; i < b.N; i++ {
            key := fmt.Sprintf("key%d", i%1000)
            value := fmt.Sprintf("value%d", i%1000)
            wt.Set(ctx, key, value)
        }
    })
    
    // Write Behind 性能
    b.Run("WriteBehind", func(b *testing.B) {
        cache := NewMockCache()
        database := NewMockDatabase()
        wb := NewWriteBehind(cache, database, 1000)
        defer wb.Close()
        
        ctx := context.Background()
        for i := 0; i < b.N; i++ {
            key := fmt.Sprintf("key%d", i%1000)
            value := fmt.Sprintf("value%d", i%1000)
            wb.Set(ctx, key, value)
        }
    })
}

面试要点

1. 缓存一致性的挑战

答案要点:

  • 数据更新顺序:先更新缓存还是先更新数据库
  • 并发访问:多个线程同时读写同一数据
  • 网络延迟:缓存和数据库之间的网络延迟
  • 故障恢复:系统故障后的数据恢复

2. 缓存模式的选择

答案要点:

  • Cache Aside:适合大多数场景,简单灵活
  • Write Through:适合对一致性要求高的场景
  • Write Behind:适合对性能要求高的场景
  • 混合模式:根据业务需求组合使用

3. 缓存问题的解决方案

答案要点:

  • 缓存穿透:布隆过滤器 + 空值缓存
  • 缓存击穿:互斥锁 + 双重检查
  • 缓存雪崩:随机过期时间 + 缓存预热
  • 数据一致性:CDC 同步 + 分布式事务

4. 分布式事务的实现

答案要点:

  • 两阶段提交:准备阶段 + 提交阶段
  • 补偿事务:正向操作 + 补偿操作
  • 最终一致性:允许短暂不一致,最终达到一致
  • 幂等性:确保操作可以重复执行

总结

通过本章学习,我们深入理解了:

  1. 缓存一致性的基本原理和挑战
  2. 三种缓存模式的实现和选择
  3. 缓存问题的解决方案和防护策略
  4. CDC 同步方案和分布式事务
  5. 性能优化技巧和最佳实践

缓存一致性策略为分布式系统提供了可靠的数据一致性保证。在下一章中,我们将学习主从复制机制,了解 Redis 如何实现高可用性。

Prev
分布式锁实现
Next
主从复制机制