缓存一致性策略
学习目标
- 深入理解 Cache Aside/Write Through/Write Behind 模式
- 掌握缓存穿透/击穿/雪崩的解决方案
- 实现双写一致性问题和解决策略
- 理解 CDC 同步方案和实现
- 掌握分布式事务和最终一致性
缓存一致性概述
1. 缓存一致性挑战
在分布式系统中,缓存和数据库之间的数据一致性是一个复杂的问题:
┌─────────────────────────────────────────────────────────────┐
│ 缓存一致性挑战 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 应用层 │ │ 缓存层 │ │ 数据库 │ │
│ │ │ │ │ │ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ 读操作 │ │ │ │ 缓存 │ │ │ │ 主库 │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ 写操作 │ │ │ │ 缓存 │ │ │ │ 从库 │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────┬───────┴───────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 一致性挑战 │ │
│ │ • 数据更新顺序:先更新缓存还是先更新数据库? │ │
│ │ • 并发访问:多个线程同时读写同一数据 │ │
│ │ • 网络延迟:缓存和数据库之间的网络延迟 │ │
│ │ • 故障恢复:系统故障后的数据恢复 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. 缓存一致性模式
模式 | 读操作 | 写操作 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|---|
Cache Aside | 先读缓存,未命中读数据库 | 先更新数据库,再删除缓存 | 简单、灵活 | 可能不一致 | 大多数场景 |
Write Through | 直接读缓存 | 同时更新缓存和数据库 | 数据一致性好 | 性能较差 | 对一致性要求高 |
Write Behind | 直接读缓存 | 先更新缓存,异步写数据库 | 性能最好 | 可能丢失数据 | 对性能要求高 |
️ Go 语言缓存一致性实现
1. Cache Aside 模式实现
// cache/cache_aside.go
package cache
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
)
// 缓存接口
type Cache interface {
Get(ctx context.Context, key string) (interface{}, error)
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
Delete(ctx context.Context, key string) error
}
// 数据库接口
type Database interface {
Get(ctx context.Context, key string) (interface{}, error)
Set(ctx context.Context, key string, value interface{}) error
Delete(ctx context.Context, key string) error
}
// Cache Aside 实现
type CacheAside struct {
cache Cache
database Database
mu sync.RWMutex
}
// 创建 Cache Aside
func NewCacheAside(cache Cache, database Database) *CacheAside {
return &CacheAside{
cache: cache,
database: database,
}
}
// 读操作
func (ca *CacheAside) Get(ctx context.Context, key string) (interface{}, error) {
// 1. 先读缓存
value, err := ca.cache.Get(ctx, key)
if err == nil {
return value, nil
}
// 2. 缓存未命中,读数据库
value, err = ca.database.Get(ctx, key)
if err != nil {
return nil, err
}
// 3. 将数据写入缓存
ca.cache.Set(ctx, key, value, time.Hour)
return value, nil
}
// 写操作
func (ca *CacheAside) Set(ctx context.Context, key string, value interface{}) error {
ca.mu.Lock()
defer ca.mu.Unlock()
// 1. 先更新数据库
if err := ca.database.Set(ctx, key, value); err != nil {
return err
}
// 2. 删除缓存
ca.cache.Delete(ctx, key)
return nil
}
// 删除操作
func (ca *CacheAside) Delete(ctx context.Context, key string) error {
ca.mu.Lock()
defer ca.mu.Unlock()
// 1. 先删除数据库
if err := ca.database.Delete(ctx, key); err != nil {
return err
}
// 2. 删除缓存
ca.cache.Delete(ctx, key)
return nil
}
2. Write Through 模式实现
// cache/write_through.go
package cache
import (
"context"
"sync"
"time"
)
// Write Through 实现
type WriteThrough struct {
cache Cache
database Database
mu sync.RWMutex
}
// 创建 Write Through
func NewWriteThrough(cache Cache, database Database) *WriteThrough {
return &WriteThrough{
cache: cache,
database: database,
}
}
// 读操作
func (wt *WriteThrough) Get(ctx context.Context, key string) (interface{}, error) {
// 直接读缓存
return wt.cache.Get(ctx, key)
}
// 写操作
func (wt *WriteThrough) Set(ctx context.Context, key string, value interface{}) error {
wt.mu.Lock()
defer wt.mu.Unlock()
// 1. 同时更新缓存和数据库
if err := wt.cache.Set(ctx, key, value, time.Hour); err != nil {
return err
}
if err := wt.database.Set(ctx, key, value); err != nil {
// 如果数据库更新失败,回滚缓存
wt.cache.Delete(ctx, key)
return err
}
return nil
}
// 删除操作
func (wt *WriteThrough) Delete(ctx context.Context, key string) error {
wt.mu.Lock()
defer wt.mu.Unlock()
// 1. 同时删除缓存和数据库
if err := wt.cache.Delete(ctx, key); err != nil {
return err
}
if err := wt.database.Delete(ctx, key); err != nil {
// 如果数据库删除失败,回滚缓存
wt.cache.Set(ctx, key, nil, time.Hour)
return err
}
return nil
}
3. Write Behind 模式实现
// cache/write_behind.go
package cache
import (
"context"
"sync"
"time"
)
// Write Behind 实现
type WriteBehind struct {
cache Cache
database Database
writeQueue chan WriteTask
mu sync.RWMutex
stopCh chan struct{}
wg sync.WaitGroup
}
// 写任务
type WriteTask struct {
Key string
Value interface{}
Op string // "set" or "delete"
}
// 创建 Write Behind
func NewWriteBehind(cache Cache, database Database, queueSize int) *WriteBehind {
wb := &WriteBehind{
cache: cache,
database: database,
writeQueue: make(chan WriteTask, queueSize),
stopCh: make(chan struct{}),
}
// 启动后台写入协程
wb.wg.Add(1)
go wb.backgroundWriter()
return wb
}
// 读操作
func (wb *WriteBehind) Get(ctx context.Context, key string) (interface{}, error) {
// 直接读缓存
return wb.cache.Get(ctx, key)
}
// 写操作
func (wb *WriteBehind) Set(ctx context.Context, key string, value interface{}) error {
wb.mu.Lock()
defer wb.mu.Unlock()
// 1. 先更新缓存
if err := wb.cache.Set(ctx, key, value, time.Hour); err != nil {
return err
}
// 2. 异步写入数据库
select {
case wb.writeQueue <- WriteTask{Key: key, Value: value, Op: "set"}:
default:
// 队列满了,同步写入
return wb.database.Set(ctx, key, value)
}
return nil
}
// 删除操作
func (wb *WriteBehind) Delete(ctx context.Context, key string) error {
wb.mu.Lock()
defer wb.mu.Unlock()
// 1. 先删除缓存
if err := wb.cache.Delete(ctx, key); err != nil {
return err
}
// 2. 异步删除数据库
select {
case wb.writeQueue <- WriteTask{Key: key, Op: "delete"}:
default:
// 队列满了,同步删除
return wb.database.Delete(ctx, key)
}
return nil
}
// 后台写入协程
func (wb *WriteBehind) backgroundWriter() {
defer wb.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case task := <-wb.writeQueue:
wb.processWriteTask(task)
case <-ticker.C:
// 定期处理队列中的任务
wb.processWriteQueue()
case <-wb.stopCh:
// 处理剩余任务
wb.processWriteQueue()
return
}
}
}
// 处理写任务
func (wb *WriteBehind) processWriteTask(task WriteTask) {
ctx := context.Background()
switch task.Op {
case "set":
wb.database.Set(ctx, task.Key, task.Value)
case "delete":
wb.database.Delete(ctx, task.Key)
}
}
// 处理写队列
func (wb *WriteBehind) processWriteQueue() {
for {
select {
case task := <-wb.writeQueue:
wb.processWriteTask(task)
default:
return
}
}
}
// 关闭 Write Behind
func (wb *WriteBehind) Close() {
close(wb.stopCh)
wb.wg.Wait()
}
4. 缓存穿透解决方案
// cache/cache_penetration.go
package cache
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"sync"
"time"
)
// 布隆过滤器接口
type BloomFilter interface {
Add(key string) error
Contains(key string) (bool, error)
}
// 缓存穿透防护
type CachePenetrationProtection struct {
cache Cache
database Database
bloomFilter BloomFilter
nullCache map[string]bool
mu sync.RWMutex
nullCacheTTL time.Duration
}
// 创建缓存穿透防护
func NewCachePenetrationProtection(cache Cache, database Database, bloomFilter BloomFilter) *CachePenetrationProtection {
return &CachePenetrationProtection{
cache: cache,
database: database,
bloomFilter: bloomFilter,
nullCache: make(map[string]bool),
nullCacheTTL: time.Minute * 5,
}
}
// 读操作(带防护)
func (cpp *CachePenetrationProtection) Get(ctx context.Context, key string) (interface{}, error) {
// 1. 检查布隆过滤器
exists, err := cpp.bloomFilter.Contains(key)
if err != nil {
return nil, err
}
if !exists {
// 布隆过滤器说不存在,直接返回
return nil, fmt.Errorf("key not found")
}
// 2. 检查空值缓存
cpp.mu.RLock()
if cpp.nullCache[key] {
cpp.mu.RUnlock()
return nil, fmt.Errorf("key not found")
}
cpp.mu.RUnlock()
// 3. 读缓存
value, err := cpp.cache.Get(ctx, key)
if err == nil {
return value, nil
}
// 4. 读数据库
value, err = cpp.database.Get(ctx, key)
if err != nil {
// 数据库中没有,加入空值缓存
cpp.mu.Lock()
cpp.nullCache[key] = true
cpp.mu.Unlock()
// 定期清理空值缓存
go cpp.cleanupNullCache(key)
return nil, fmt.Errorf("key not found")
}
// 5. 将数据写入缓存
cpp.cache.Set(ctx, key, value, time.Hour)
return value, nil
}
// 写操作(带防护)
func (cpp *CachePenetrationProtection) Set(ctx context.Context, key string, value interface{}) error {
// 1. 更新数据库
if err := cpp.database.Set(ctx, key, value); err != nil {
return err
}
// 2. 更新布隆过滤器
if err := cpp.bloomFilter.Add(key); err != nil {
return err
}
// 3. 更新缓存
cpp.cache.Set(ctx, key, value, time.Hour)
// 4. 从空值缓存中移除
cpp.mu.Lock()
delete(cpp.nullCache, key)
cpp.mu.Unlock()
return nil
}
// 清理空值缓存
func (cpp *CachePenetrationProtection) cleanupNullCache(key string) {
time.Sleep(cpp.nullCacheTTL)
cpp.mu.Lock()
delete(cpp.nullCache, key)
cpp.mu.Unlock()
}
// 简单布隆过滤器实现
type SimpleBloomFilter struct {
bitset []bool
size int
hashFuncs []func(string) int
}
// 创建简单布隆过滤器
func NewSimpleBloomFilter(size int, hashCount int) *SimpleBloomFilter {
hashFuncs := make([]func(string) int, hashCount)
for i := 0; i < hashCount; i++ {
hashFuncs[i] = func(s string) int {
h := md5.Sum([]byte(s + string(rune(i))))
return int(h[0]) % size
}
}
return &SimpleBloomFilter{
bitset: make([]bool, size),
size: size,
hashFuncs: hashFuncs,
}
}
// 添加元素
func (sbf *SimpleBloomFilter) Add(key string) error {
for _, hashFunc := range sbf.hashFuncs {
index := hashFunc(key)
sbf.bitset[index] = true
}
return nil
}
// 检查元素是否存在
func (sbf *SimpleBloomFilter) Contains(key string) (bool, error) {
for _, hashFunc := range sbf.hashFuncs {
index := hashFunc(key)
if !sbf.bitset[index] {
return false, nil
}
}
return true, nil
}
5. 缓存击穿解决方案
// cache/cache_breakdown.go
package cache
import (
"context"
"sync"
"time"
)
// 缓存击穿防护
type CacheBreakdownProtection struct {
cache Cache
database Database
mutexes map[string]*sync.Mutex
mu sync.RWMutex
}
// 创建缓存击穿防护
func NewCacheBreakdownProtection(cache Cache, database Database) *CacheBreakdownProtection {
return &CacheBreakdownProtection{
cache: cache,
database: database,
mutexes: make(map[string]*sync.Mutex),
}
}
// 读操作(带防护)
func (cbp *CacheBreakdownProtection) Get(ctx context.Context, key string) (interface{}, error) {
// 1. 先读缓存
value, err := cbp.cache.Get(ctx, key)
if err == nil {
return value, nil
}
// 2. 获取互斥锁
mutex := cbp.getMutex(key)
mutex.Lock()
defer mutex.Unlock()
// 3. 再次检查缓存(双重检查)
value, err = cbp.cache.Get(ctx, key)
if err == nil {
return value, nil
}
// 4. 读数据库
value, err = cbp.database.Get(ctx, key)
if err != nil {
return nil, err
}
// 5. 将数据写入缓存
cbp.cache.Set(ctx, key, value, time.Hour)
return value, nil
}
// 获取互斥锁
func (cbp *CacheBreakdownProtection) getMutex(key string) *sync.Mutex {
cbp.mu.RLock()
mutex, exists := cbp.mutexes[key]
cbp.mu.RUnlock()
if exists {
return mutex
}
cbp.mu.Lock()
defer cbp.mu.Unlock()
// 双重检查
if mutex, exists := cbp.mutexes[key]; exists {
return mutex
}
mutex = &sync.Mutex{}
cbp.mutexes[key] = mutex
return mutex
}
// 清理互斥锁
func (cbp *CacheBreakdownProtection) CleanupMutexes() {
cbp.mu.Lock()
defer cbp.mu.Unlock()
// 清理长时间未使用的互斥锁
for key, mutex := range cbp.mutexes {
// 这里可以添加更复杂的清理逻辑
_ = key
_ = mutex
}
}
6. 缓存雪崩解决方案
// cache/cache_avalanche.go
package cache
import (
"context"
"math/rand"
"sync"
"time"
)
// 缓存雪崩防护
type CacheAvalancheProtection struct {
cache Cache
database Database
mu sync.RWMutex
}
// 创建缓存雪崩防护
func NewCacheAvalancheProtection(cache Cache, database Database) *CacheAvalancheProtection {
return &CacheAvalancheProtection{
cache: cache,
database: database,
}
}
// 读操作(带防护)
func (cap *CacheAvalancheProtection) Get(ctx context.Context, key string) (interface{}, error) {
// 1. 先读缓存
value, err := cap.cache.Get(ctx, key)
if err == nil {
return value, nil
}
// 2. 读数据库
value, err = cap.database.Get(ctx, key)
if err != nil {
return nil, err
}
// 3. 将数据写入缓存,添加随机过期时间
expiration := cap.getRandomExpiration()
cap.cache.Set(ctx, key, value, expiration)
return value, nil
}
// 获取随机过期时间
func (cap *CacheAvalancheProtection) getRandomExpiration() time.Duration {
// 基础过期时间
baseExpiration := time.Hour
// 添加随机偏移(±10%)
randomOffset := time.Duration(rand.Intn(20)-10) * baseExpiration / 100
return baseExpiration + randomOffset
}
// 批量预热缓存
func (cap *CacheAvalancheProtection) WarmupCache(ctx context.Context, keys []string) error {
var wg sync.WaitGroup
semaphore := make(chan struct{}, 10) // 限制并发数
for _, key := range keys {
wg.Add(1)
go func(k string) {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
// 读数据库
value, err := cap.database.Get(ctx, k)
if err != nil {
return
}
// 写入缓存
expiration := cap.getRandomExpiration()
cap.cache.Set(ctx, k, value, expiration)
}(key)
}
wg.Wait()
return nil
}
7. CDC 同步方案
// cache/cdc_sync.go
package cache
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
)
// CDC 事件
type CDCEvent struct {
ID string `json:"id"`
Table string `json:"table"`
Operation string `json:"operation"` // INSERT, UPDATE, DELETE
Before interface{} `json:"before"`
After interface{} `json:"after"`
Timestamp time.Time `json:"timestamp"`
}
// CDC 同步器
type CDCSynchronizer struct {
cache Cache
eventCh chan CDCEvent
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
isRunning bool
}
// 创建 CDC 同步器
func NewCDCSynchronizer(cache Cache, eventCh chan CDCEvent) *CDCSynchronizer {
return &CDCSynchronizer{
cache: cache,
eventCh: eventCh,
stopCh: make(chan struct{}),
}
}
// 启动 CDC 同步
func (cdc *CDCSynchronizer) Start() error {
cdc.mu.Lock()
defer cdc.mu.Unlock()
if cdc.isRunning {
return fmt.Errorf("CDC synchronizer already running")
}
cdc.isRunning = true
cdc.wg.Add(1)
go cdc.syncLoop()
return nil
}
// 停止 CDC 同步
func (cdc *CDCSynchronizer) Stop() {
cdc.mu.Lock()
defer cdc.mu.Unlock()
if !cdc.isRunning {
return
}
close(cdc.stopCh)
cdc.wg.Wait()
cdc.isRunning = false
}
// 同步循环
func (cdc *CDCSynchronizer) syncLoop() {
defer cdc.wg.Done()
for {
select {
case event := <-cdc.eventCh:
cdc.processEvent(event)
case <-cdc.stopCh:
return
}
}
}
// 处理 CDC 事件
func (cdc *CDCSynchronizer) processEvent(event CDCEvent) {
ctx := context.Background()
// 根据操作类型处理
switch event.Operation {
case "INSERT", "UPDATE":
// 更新缓存
key := cdc.getCacheKey(event.Table, event.After)
value := event.After
if err := cdc.cache.Set(ctx, key, value, time.Hour); err != nil {
fmt.Printf("Failed to update cache: %v\n", err)
}
case "DELETE":
// 删除缓存
key := cdc.getCacheKey(event.Table, event.Before)
if err := cdc.cache.Delete(ctx, key); err != nil {
fmt.Printf("Failed to delete from cache: %v\n", err)
}
}
}
// 获取缓存键
func (cdc *CDCSynchronizer) getCacheKey(table string, data interface{}) string {
// 简化实现,实际应该根据主键生成
return fmt.Sprintf("%s:%v", table, data)
}
// 模拟 CDC 事件生成
func (cdc *CDCSynchronizer) SimulateEvent(table string, operation string, data interface{}) {
event := CDCEvent{
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
Table: table,
Operation: operation,
After: data,
Timestamp: time.Now(),
}
select {
case cdc.eventCh <- event:
default:
fmt.Println("Event channel full, dropping event")
}
}
8. 分布式事务实现
// cache/distributed_transaction.go
package cache
import (
"context"
"fmt"
"sync"
"time"
)
// 分布式事务状态
type TransactionStatus int
const (
TRANSACTION_PREPARING TransactionStatus = iota
TRANSACTION_PREPARED
TRANSACTION_COMMITTING
TRANSACTION_COMMITTED
TRANSACTION_ABORTING
TRANSACTION_ABORTED
)
// 分布式事务
type DistributedTransaction struct {
ID string
Status TransactionStatus
Operations []TransactionOperation
mu sync.RWMutex
startTime time.Time
timeout time.Duration
}
// 事务操作
type TransactionOperation struct {
Type string // "cache_set", "cache_delete", "db_set", "db_delete"
Key string
Value interface{}
Rollback func() error
}
// 分布式事务管理器
type DistributedTransactionManager struct {
cache Cache
database Database
mu sync.RWMutex
timeout time.Duration
}
// 创建分布式事务管理器
func NewDistributedTransactionManager(cache Cache, database Database, timeout time.Duration) *DistributedTransactionManager {
return &DistributedTransactionManager{
cache: cache,
database: database,
timeout: timeout,
}
}
// 开始事务
func (dtm *DistributedTransactionManager) BeginTransaction() *DistributedTransaction {
return &DistributedTransaction{
ID: fmt.Sprintf("tx_%d", time.Now().UnixNano()),
Status: TRANSACTION_PREPARING,
Operations: make([]TransactionOperation, 0),
startTime: time.Now(),
timeout: dtm.timeout,
}
}
// 添加操作
func (tx *DistributedTransaction) AddOperation(opType, key string, value interface{}) {
tx.mu.Lock()
defer tx.mu.Unlock()
operation := TransactionOperation{
Type: opType,
Key: key,
Value: value,
}
tx.Operations = append(tx.Operations, operation)
}
// 准备事务
func (tx *DistributedTransaction) Prepare() error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.Status != TRANSACTION_PREPARING {
return fmt.Errorf("transaction not in preparing state")
}
// 检查超时
if time.Since(tx.startTime) > tx.timeout {
tx.Status = TRANSACTION_ABORTED
return fmt.Errorf("transaction timeout")
}
// 准备所有操作
for i, op := range tx.Operations {
rollback, err := tx.prepareOperation(op)
if err != nil {
// 回滚已准备的操作
tx.rollbackOperations(i)
tx.Status = TRANSACTION_ABORTED
return err
}
tx.Operations[i].Rollback = rollback
}
tx.Status = TRANSACTION_PREPARED
return nil
}
// 准备单个操作
func (tx *DistributedTransaction) prepareOperation(op TransactionOperation) (func() error, error) {
// 简化实现,实际应该调用相应的准备方法
switch op.Type {
case "cache_set":
return func() error {
// 回滚缓存设置
return nil
}, nil
case "cache_delete":
return func() error {
// 回滚缓存删除
return nil
}, nil
case "db_set":
return func() error {
// 回滚数据库设置
return nil
}, nil
case "db_delete":
return func() error {
// 回滚数据库删除
return nil
}, nil
default:
return nil, fmt.Errorf("unknown operation type: %s", op.Type)
}
}
// 回滚操作
func (tx *DistributedTransaction) rollbackOperations(count int) {
for i := count - 1; i >= 0; i-- {
if tx.Operations[i].Rollback != nil {
tx.Operations[i].Rollback()
}
}
}
// 提交事务
func (tx *DistributedTransaction) Commit() error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.Status != TRANSACTION_PREPARED {
return fmt.Errorf("transaction not in prepared state")
}
tx.Status = TRANSACTION_COMMITTING
// 执行所有操作
for _, op := range tx.Operations {
if err := tx.executeOperation(op); err != nil {
// 回滚事务
tx.rollbackOperations(len(tx.Operations))
tx.Status = TRANSACTION_ABORTED
return err
}
}
tx.Status = TRANSACTION_COMMITTED
return nil
}
// 执行单个操作
func (tx *DistributedTransaction) executeOperation(op TransactionOperation) error {
// 简化实现,实际应该调用相应的执行方法
switch op.Type {
case "cache_set":
// 执行缓存设置
return nil
case "cache_delete":
// 执行缓存删除
return nil
case "db_set":
// 执行数据库设置
return nil
case "db_delete":
// 执行数据库删除
return nil
default:
return fmt.Errorf("unknown operation type: %s", op.Type)
}
}
// 中止事务
func (tx *DistributedTransaction) Abort() error {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.Status == TRANSACTION_COMMITTED {
return fmt.Errorf("transaction already committed")
}
tx.Status = TRANSACTION_ABORTING
// 回滚所有操作
tx.rollbackOperations(len(tx.Operations))
tx.Status = TRANSACTION_ABORTED
return nil
}
测试验证
1. 单元测试
// cache/cache_consistency_test.go
package cache
import (
"context"
"os"
"testing"
"time"
)
// 模拟缓存实现
type MockCache struct {
data map[string]interface{}
mu sync.RWMutex
}
func NewMockCache() *MockCache {
return &MockCache{
data: make(map[string]interface{}),
}
}
func (mc *MockCache) Get(ctx context.Context, key string) (interface{}, error) {
mc.mu.RLock()
defer mc.mu.RUnlock()
value, exists := mc.data[key]
if !exists {
return nil, fmt.Errorf("key not found")
}
return value, nil
}
func (mc *MockCache) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.data[key] = value
return nil
}
func (mc *MockCache) Delete(ctx context.Context, key string) error {
mc.mu.Lock()
defer mc.mu.Unlock()
delete(mc.data, key)
return nil
}
// 模拟数据库实现
type MockDatabase struct {
data map[string]interface{}
mu sync.RWMutex
}
func NewMockDatabase() *MockDatabase {
return &MockDatabase{
data: make(map[string]interface{}),
}
}
func (md *MockDatabase) Get(ctx context.Context, key string) (interface{}, error) {
md.mu.RLock()
defer md.mu.RUnlock()
value, exists := md.data[key]
if !exists {
return nil, fmt.Errorf("key not found")
}
return value, nil
}
func (md *MockDatabase) Set(ctx context.Context, key string, value interface{}) error {
md.mu.Lock()
defer md.mu.Unlock()
md.data[key] = value
return nil
}
func (md *MockDatabase) Delete(ctx context.Context, key string) error {
md.mu.Lock()
defer md.mu.Unlock()
delete(md.data, key)
return nil
}
func TestCacheAside(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
ca := NewCacheAside(cache, database)
ctx := context.Background()
// 测试写操作
if err := ca.Set(ctx, "key1", "value1"); err != nil {
t.Fatalf("Failed to set: %v", err)
}
// 测试读操作
value, err := ca.Get(ctx, "key1")
if err != nil {
t.Fatalf("Failed to get: %v", err)
}
if value != "value1" {
t.Errorf("Expected 'value1', got %v", value)
}
}
func TestWriteThrough(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
wt := NewWriteThrough(cache, database)
ctx := context.Background()
// 测试写操作
if err := wt.Set(ctx, "key1", "value1"); err != nil {
t.Fatalf("Failed to set: %v", err)
}
// 测试读操作
value, err := wt.Get(ctx, "key1")
if err != nil {
t.Fatalf("Failed to get: %v", err)
}
if value != "value1" {
t.Errorf("Expected 'value1', got %v", value)
}
}
func TestWriteBehind(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
wb := NewWriteBehind(cache, database, 100)
defer wb.Close()
ctx := context.Background()
// 测试写操作
if err := wb.Set(ctx, "key1", "value1"); err != nil {
t.Fatalf("Failed to set: %v", err)
}
// 测试读操作
value, err := wb.Get(ctx, "key1")
if err != nil {
t.Fatalf("Failed to get: %v", err)
}
if value != "value1" {
t.Errorf("Expected 'value1', got %v", value)
}
}
func TestCachePenetrationProtection(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
bloomFilter := NewSimpleBloomFilter(1000, 3)
cpp := NewCachePenetrationProtection(cache, database, bloomFilter)
ctx := context.Background()
// 测试读操作(应该被布隆过滤器拦截)
_, err := cpp.Get(ctx, "nonexistent_key")
if err == nil {
t.Error("Expected error for nonexistent key")
}
}
func TestCacheBreakdownProtection(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
cbp := NewCacheBreakdownProtection(cache, database)
ctx := context.Background()
// 测试读操作
value, err := cbp.Get(ctx, "key1")
if err != nil {
t.Fatalf("Failed to get: %v", err)
}
if value != "value1" {
t.Errorf("Expected 'value1', got %v", value)
}
}
func TestCacheAvalancheProtection(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
cap := NewCacheAvalancheProtection(cache, database)
ctx := context.Background()
// 测试读操作
value, err := cap.Get(ctx, "key1")
if err != nil {
t.Fatalf("Failed to get: %v", err)
}
if value != "value1" {
t.Errorf("Expected 'value1', got %v", value)
}
}
func TestCDCSynchronizer(t *testing.T) {
cache := NewMockCache()
eventCh := make(chan CDCEvent, 100)
cdc := NewCDCSynchronizer(cache, eventCh)
// 启动 CDC 同步
if err := cdc.Start(); err != nil {
t.Fatalf("Failed to start CDC: %v", err)
}
defer cdc.Stop()
// 模拟 CDC 事件
cdc.SimulateEvent("users", "INSERT", map[string]interface{}{
"id": 1,
"name": "John",
})
// 等待事件处理
time.Sleep(100 * time.Millisecond)
}
func TestDistributedTransaction(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
dtm := NewDistributedTransactionManager(cache, database, time.Minute)
// 开始事务
tx := dtm.BeginTransaction()
// 添加操作
tx.AddOperation("cache_set", "key1", "value1")
tx.AddOperation("db_set", "key1", "value1")
// 准备事务
if err := tx.Prepare(); err != nil {
t.Fatalf("Failed to prepare transaction: %v", err)
}
// 提交事务
if err := tx.Commit(); err != nil {
t.Fatalf("Failed to commit transaction: %v", err)
}
}
2. 性能基准测试
// cache/benchmark_test.go
package cache
import (
"context"
"testing"
"time"
)
func BenchmarkCacheAside(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
ca := NewCacheAside(cache, database)
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
ca.Get(ctx, key)
}
}
func BenchmarkWriteThrough(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
wt := NewWriteThrough(cache, database)
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
value := fmt.Sprintf("value%d", i%1000)
wt.Set(ctx, key, value)
}
}
func BenchmarkWriteBehind(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
wb := NewWriteBehind(cache, database, 1000)
defer wb.Close()
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
value := fmt.Sprintf("value%d", i%1000)
wb.Set(ctx, key, value)
}
}
func BenchmarkCachePenetrationProtection(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
bloomFilter := NewSimpleBloomFilter(10000, 3)
cpp := NewCachePenetrationProtection(cache, database, bloomFilter)
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
cpp.Get(ctx, key)
}
}
func BenchmarkCacheBreakdownProtection(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
cbp := NewCacheBreakdownProtection(cache, database)
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
cbp.Get(ctx, key)
}
}
func BenchmarkCacheAvalancheProtection(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
cap := NewCacheAvalancheProtection(cache, database)
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
cap.Get(ctx, key)
}
}
3. 并发测试
// cache/concurrent_test.go
package cache
import (
"context"
"sync"
"testing"
"time"
)
func TestCacheAsideConcurrent(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
ca := NewCacheAside(cache, database)
ctx := context.Background()
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
value := fmt.Sprintf("value_%d_%d", goroutineID, j)
if err := ca.Set(ctx, key, value); err != nil {
t.Errorf("Failed to set: %v", err)
}
}
}(i)
}
// 并发读取
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
if _, err := ca.Get(ctx, key); err != nil {
t.Errorf("Failed to get: %v", err)
}
}
}(i)
}
wg.Wait()
}
func TestWriteThroughConcurrent(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
wt := NewWriteThrough(cache, database)
ctx := context.Background()
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
value := fmt.Sprintf("value_%d_%d", goroutineID, j)
if err := wt.Set(ctx, key, value); err != nil {
t.Errorf("Failed to set: %v", err)
}
}
}(i)
}
// 并发读取
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
if _, err := wt.Get(ctx, key); err != nil {
t.Errorf("Failed to get: %v", err)
}
}
}(i)
}
wg.Wait()
}
func TestWriteBehindConcurrent(t *testing.T) {
cache := NewMockCache()
database := NewMockDatabase()
wb := NewWriteBehind(cache, database, 1000)
defer wb.Close()
ctx := context.Background()
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
value := fmt.Sprintf("value_%d_%d", goroutineID, j)
if err := wb.Set(ctx, key, value); err != nil {
t.Errorf("Failed to set: %v", err)
}
}
}(i)
}
// 并发读取
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key_%d_%d", goroutineID, j)
if _, err := wb.Get(ctx, key); err != nil {
t.Errorf("Failed to get: %v", err)
}
}
}(i)
}
wg.Wait()
}
性能对比分析
1. 缓存模式对比
模式 | 读性能 | 写性能 | 一致性 | 复杂度 | 适用场景 |
---|---|---|---|---|---|
Cache Aside | 高 | 中等 | 中等 | 低 | 大多数场景 |
Write Through | 高 | 低 | 高 | 中等 | 对一致性要求高 |
Write Behind | 高 | 高 | 低 | 高 | 对性能要求高 |
2. 防护策略对比
策略 | 性能影响 | 内存开销 | 实现复杂度 | 防护效果 |
---|---|---|---|---|
缓存穿透防护 | 低 | 中等 | 中等 | 高 |
缓存击穿防护 | 中等 | 低 | 低 | 高 |
缓存雪崩防护 | 低 | 低 | 低 | 中等 |
3. 性能测试结果
// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
// Cache Aside 性能
b.Run("CacheAside", func(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
ca := NewCacheAside(cache, database)
ctx := context.Background()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
ca.Get(ctx, key)
}
})
// Write Through 性能
b.Run("WriteThrough", func(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
wt := NewWriteThrough(cache, database)
ctx := context.Background()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
value := fmt.Sprintf("value%d", i%1000)
wt.Set(ctx, key, value)
}
})
// Write Behind 性能
b.Run("WriteBehind", func(b *testing.B) {
cache := NewMockCache()
database := NewMockDatabase()
wb := NewWriteBehind(cache, database, 1000)
defer wb.Close()
ctx := context.Background()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i%1000)
value := fmt.Sprintf("value%d", i%1000)
wb.Set(ctx, key, value)
}
})
}
面试要点
1. 缓存一致性的挑战
答案要点:
- 数据更新顺序:先更新缓存还是先更新数据库
- 并发访问:多个线程同时读写同一数据
- 网络延迟:缓存和数据库之间的网络延迟
- 故障恢复:系统故障后的数据恢复
2. 缓存模式的选择
答案要点:
- Cache Aside:适合大多数场景,简单灵活
- Write Through:适合对一致性要求高的场景
- Write Behind:适合对性能要求高的场景
- 混合模式:根据业务需求组合使用
3. 缓存问题的解决方案
答案要点:
- 缓存穿透:布隆过滤器 + 空值缓存
- 缓存击穿:互斥锁 + 双重检查
- 缓存雪崩:随机过期时间 + 缓存预热
- 数据一致性:CDC 同步 + 分布式事务
4. 分布式事务的实现
答案要点:
- 两阶段提交:准备阶段 + 提交阶段
- 补偿事务:正向操作 + 补偿操作
- 最终一致性:允许短暂不一致,最终达到一致
- 幂等性:确保操作可以重复执行
总结
通过本章学习,我们深入理解了:
- 缓存一致性的基本原理和挑战
- 三种缓存模式的实现和选择
- 缓存问题的解决方案和防护策略
- CDC 同步方案和分布式事务
- 性能优化技巧和最佳实践
缓存一致性策略为分布式系统提供了可靠的数据一致性保证。在下一章中,我们将学习主从复制机制,了解 Redis 如何实现高可用性。