HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串与 SDS 实现
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

分布式锁实现

学习目标

  • 深入理解分布式锁的设计原理和实现方式
  • 掌握基于 Redis 的分布式锁实现
  • 理解分布式锁的常见问题和解决方案
  • 实现完整的分布式锁库

分布式锁概述

分布式锁是在分布式系统中实现互斥访问共享资源的一种机制。它需要满足以下特性:

基本特性

  1. 互斥性:同一时刻只有一个客户端能持有锁
  2. 防死锁:避免客户端崩溃导致锁无法释放
  3. 可重入性:同一客户端可以多次获取同一把锁
  4. 容错性:在部分节点故障时仍能正常工作

实现方式对比

实现方式优点缺点适用场景
Redis性能高、实现简单单点故障、时钟依赖高并发、对一致性要求不高
ZooKeeper强一致性、可靠性高性能较低、实现复杂强一致性要求
etcd强一致性、性能较好实现复杂云原生环境

️ Redis 分布式锁实现

1. 基础锁实现

// lock/redis_lock.go
package lock

import (
    "context"
    "crypto/rand"
    "encoding/hex"
    "fmt"
    "net"
    "strconv"
    "time"
)

// RedisLock 分布式锁
type RedisLock struct {
    client    *RedisClient
    key       string
    value     string
    ttl       time.Duration
    retryTime time.Duration
    retryCount int
}

// RedisClient Redis 客户端
type RedisClient struct {
    conn net.Conn
}

// 创建 Redis 客户端
func NewRedisClient(addr string) (*RedisClient, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    return &RedisClient{conn: conn}, nil
}

// 创建分布式锁
func NewRedisLock(client *RedisClient, key string, ttl time.Duration) *RedisLock {
    return &RedisLock{
        client:     client,
        key:        key,
        value:      generateRandomValue(),
        ttl:        ttl,
        retryTime:  100 * time.Millisecond,
        retryCount: 3,
    }
}

// 生成随机值
func generateRandomValue() string {
    bytes := make([]byte, 16)
    rand.Read(bytes)
    return hex.EncodeToString(bytes)
}

// 获取锁
func (l *RedisLock) Lock(ctx context.Context) error {
    for i := 0; i < l.retryCount; i++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            if l.tryLock() {
                return nil
            }
            time.Sleep(l.retryTime)
        }
    }
    return fmt.Errorf("failed to acquire lock after %d attempts", l.retryCount)
}

// 尝试获取锁
func (l *RedisLock) tryLock() bool {
    // 使用 SET key value NX EX ttl 命令
    cmd := fmt.Sprintf("SET %s %s NX EX %d\r\n", l.key, l.value, int(l.ttl.Seconds()))
    
    // 发送命令
    if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
        return false
    }
    
    // 读取响应
    response := make([]byte, 1024)
    n, err := l.client.conn.Read(response)
    if err != nil {
        return false
    }
    
    // 检查响应
    resp := string(response[:n])
    return resp == "+OK\r\n"
}

// 释放锁
func (l *RedisLock) Unlock() error {
    // 使用 Lua 脚本确保原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    
    cmd := fmt.Sprintf("EVAL %s 1 %s %s\r\n", script, l.key, l.value)
    
    // 发送命令
    if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
        return err
    }
    
    // 读取响应
    response := make([]byte, 1024)
    n, err := l.client.conn.Read(response)
    if err != nil {
        return err
    }
    
    // 检查响应
    resp := string(response[:n])
    if resp == ":1\r\n" {
        return nil
    }
    return fmt.Errorf("failed to release lock")
}

// 续期锁
func (l *RedisLock) Renew(ctx context.Context) error {
    // 使用 Lua 脚本续期
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("EXPIRE", KEYS[1], ARGV[2])
        else
            return 0
        end
    `
    
    cmd := fmt.Sprintf("EVAL %s 1 %s %s %d\r\n", script, l.key, l.value, int(l.ttl.Seconds()))
    
    // 发送命令
    if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
        return err
    }
    
    // 读取响应
    response := make([]byte, 1024)
    n, err := l.client.conn.Read(response)
    if err != nil {
        return err
    }
    
    // 检查响应
    resp := string(response[:n])
    if resp == ":1\r\n" {
        return nil
    }
    return fmt.Errorf("failed to renew lock")
}

// 检查锁是否存在
func (l *RedisLock) IsLocked() bool {
    cmd := fmt.Sprintf("GET %s\r\n", l.key)
    
    // 发送命令
    if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
        return false
    }
    
    // 读取响应
    response := make([]byte, 1024)
    n, err := l.client.conn.Read(response)
    if err != nil {
        return false
    }
    
    // 检查响应
    resp := string(response[:n])
    return resp != "$-1\r\n"
}

2. 可重入锁实现

// lock/reentrant_lock.go
package lock

import (
    "context"
    "fmt"
    "sync"
)

// ReentrantLock 可重入锁
type ReentrantLock struct {
    *RedisLock
    mutex    sync.Mutex
    count    int
    owner    string
}

// 创建可重入锁
func NewReentrantLock(client *RedisClient, key string, ttl time.Duration) *ReentrantLock {
    return &ReentrantLock{
        RedisLock: NewRedisLock(client, key, ttl),
        owner:     generateRandomValue(),
    }
}

// 获取锁(可重入)
func (r *ReentrantLock) Lock(ctx context.Context) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    
    // 如果已经持有锁,增加计数
    if r.count > 0 {
        r.count++
        return nil
    }
    
    // 尝试获取锁
    if err := r.RedisLock.Lock(ctx); err != nil {
        return err
    }
    
    r.count = 1
    return nil
}

// 释放锁(可重入)
func (r *ReentrantLock) Unlock() error {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    
    if r.count <= 0 {
        return fmt.Errorf("lock not held")
    }
    
    r.count--
    
    // 如果计数为 0,释放锁
    if r.count == 0 {
        return r.RedisLock.Unlock()
    }
    
    return nil
}

// 检查是否持有锁
func (r *ReentrantLock) IsHeld() bool {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    return r.count > 0
}

3. 看门狗锁实现

// lock/watchdog_lock.go
package lock

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// WatchdogLock 看门狗锁
type WatchdogLock struct {
    *RedisLock
    mutex      sync.Mutex
    stopCh     chan struct{}
    renewCh    chan struct{}
    isRenewing bool
}

// 创建看门狗锁
func NewWatchdogLock(client *RedisClient, key string, ttl time.Duration) *WatchdogLock {
    return &WatchdogLock{
        RedisLock: NewRedisLock(client, key, ttl),
        stopCh:    make(chan struct{}),
        renewCh:   make(chan struct{}),
    }
}

// 获取锁(带看门狗)
func (w *WatchdogLock) Lock(ctx context.Context) error {
    if err := w.RedisLock.Lock(ctx); err != nil {
        return err
    }
    
    // 启动看门狗
    w.startWatchdog()
    return nil
}

// 释放锁(带看门狗)
func (w *WatchdogLock) Unlock() error {
    w.mutex.Lock()
    defer w.mutex.Unlock()
    
    // 停止看门狗
    if w.isRenewing {
        close(w.stopCh)
        w.isRenewing = false
    }
    
    return w.RedisLock.Unlock()
}

// 启动看门狗
func (w *WatchdogLock) startWatchdog() {
    w.mutex.Lock()
    defer w.mutex.Unlock()
    
    if w.isRenewing {
        return
    }
    
    w.isRenewing = true
    go w.watchdog()
}

// 看门狗协程
func (w *WatchdogLock) watchdog() {
    ticker := time.NewTicker(w.ttl / 2) // 每 TTL/2 时间续期一次
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if err := w.RedisLock.Renew(context.Background()); err != nil {
                // 续期失败,锁可能已丢失
                w.mutex.Lock()
                w.isRenewing = false
                w.mutex.Unlock()
                return
            }
        case <-w.stopCh:
            return
        }
    }
}

4. 红锁实现

// lock/redlock.go
package lock

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// RedLock 红锁
type RedLock struct {
    clients []*RedisClient
    key     string
    value   string
    ttl     time.Duration
    quorum  int
}

// 创建红锁
func NewRedLock(clients []*RedisClient, key string, ttl time.Duration) *RedLock {
    return &RedLock{
        clients: clients,
        key:     key,
        value:   generateRandomValue(),
        ttl:     ttl,
        quorum:  len(clients)/2 + 1, // 需要超过一半的节点同意
    }
}

// 获取锁
func (r *RedLock) Lock(ctx context.Context) error {
    start := time.Now()
    
    // 尝试在所有节点上获取锁
    successCount := 0
    var wg sync.WaitGroup
    var mutex sync.Mutex
    
    for _, client := range r.clients {
        wg.Add(1)
        go func(c *RedisClient) {
            defer wg.Done()
            
            if r.tryLockOnNode(c) {
                mutex.Lock()
                successCount++
                mutex.Unlock()
            }
        }(client)
    }
    
    wg.Wait()
    
    // 检查是否获得足够多的锁
    if successCount < r.quorum {
        // 释放已获得的锁
        r.unlockAll()
        return fmt.Errorf("failed to acquire lock on enough nodes: %d/%d", successCount, len(r.clients))
    }
    
    // 检查获取锁的时间是否超过 TTL
    elapsed := time.Since(start)
    if elapsed >= r.ttl {
        r.unlockAll()
        return fmt.Errorf("lock acquisition took too long: %v", elapsed)
    }
    
    return nil
}

// 在单个节点上尝试获取锁
func (r *RedLock) tryLockOnNode(client *RedisClient) bool {
    cmd := fmt.Sprintf("SET %s %s NX EX %d\r\n", r.key, r.value, int(r.ttl.Seconds()))
    
    // 发送命令
    if _, err := client.conn.Write([]byte(cmd)); err != nil {
        return false
    }
    
    // 读取响应
    response := make([]byte, 1024)
    n, err := client.conn.Read(response)
    if err != nil {
        return false
    }
    
    // 检查响应
    resp := string(response[:n])
    return resp == "+OK\r\n"
}

// 释放锁
func (r *RedLock) Unlock() error {
    var wg sync.WaitGroup
    
    for _, client := range r.clients {
        wg.Add(1)
        go func(c *RedisClient) {
            defer wg.Done()
            r.unlockOnNode(c)
        }(client)
    }
    
    wg.Wait()
    return nil
}

// 在单个节点上释放锁
func (r *RedLock) unlockOnNode(client *RedisClient) {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    
    cmd := fmt.Sprintf("EVAL %s 1 %s %s\r\n", script, r.key, r.value)
    
    // 发送命令
    client.conn.Write([]byte(cmd))
    
    // 读取响应(忽略错误)
    response := make([]byte, 1024)
    client.conn.Read(response)
}

// 释放所有锁
func (r *RedLock) unlockAll() {
    var wg sync.WaitGroup
    
    for _, client := range r.clients {
        wg.Add(1)
        go func(c *RedisClient) {
            defer wg.Done()
            r.unlockOnNode(c)
        }(client)
    }
    
    wg.Wait()
}

测试验证

1. 单元测试

// lock/lock_test.go
package lock

import (
    "context"
    "testing"
    "time"
)

func TestRedisLock(t *testing.T) {
    // 创建 Redis 客户端
    client, err := NewRedisClient("127.0.0.1:6380")
    if err != nil {
        t.Fatalf("Failed to create Redis client: %v", err)
    }
    defer client.conn.Close()
    
    // 创建锁
    lock := NewRedisLock(client, "test_lock", 10*time.Second)
    
    // 获取锁
    ctx := context.Background()
    if err := lock.Lock(ctx); err != nil {
        t.Fatalf("Failed to acquire lock: %v", err)
    }
    
    // 检查锁状态
    if !lock.IsLocked() {
        t.Error("Lock should be held")
    }
    
    // 释放锁
    if err := lock.Unlock(); err != nil {
        t.Fatalf("Failed to release lock: %v", err)
    }
    
    // 检查锁状态
    if lock.IsLocked() {
        t.Error("Lock should not be held")
    }
}

func TestReentrantLock(t *testing.T) {
    // 创建 Redis 客户端
    client, err := NewRedisClient("127.0.0.1:6380")
    if err != nil {
        t.Fatalf("Failed to create Redis client: %v", err)
    }
    defer client.conn.Close()
    
    // 创建可重入锁
    lock := NewReentrantLock(client, "test_reentrant_lock", 10*time.Second)
    
    ctx := context.Background()
    
    // 第一次获取锁
    if err := lock.Lock(ctx); err != nil {
        t.Fatalf("Failed to acquire lock: %v", err)
    }
    
    // 第二次获取锁(重入)
    if err := lock.Lock(ctx); err != nil {
        t.Fatalf("Failed to reacquire lock: %v", err)
    }
    
    // 检查锁状态
    if !lock.IsHeld() {
        t.Error("Lock should be held")
    }
    
    // 第一次释放锁
    if err := lock.Unlock(); err != nil {
        t.Fatalf("Failed to release lock: %v", err)
    }
    
    // 检查锁状态
    if !lock.IsHeld() {
        t.Error("Lock should still be held")
    }
    
    // 第二次释放锁
    if err := lock.Unlock(); err != nil {
        t.Fatalf("Failed to release lock: %v", err)
    }
    
    // 检查锁状态
    if lock.IsHeld() {
        t.Error("Lock should not be held")
    }
}

2. 并发测试

// lock/concurrent_test.go
package lock

import (
    "context"
    "sync"
    "testing"
    "time"
)

func TestConcurrentLock(t *testing.T) {
    // 创建 Redis 客户端
    client, err := NewRedisClient("127.0.0.1:6380")
    if err != nil {
        t.Fatalf("Failed to create Redis client: %v", err)
    }
    defer client.conn.Close()
    
    const numGoroutines = 10
    const lockKey = "concurrent_test_lock"
    
    var wg sync.WaitGroup
    var successCount int
    var mutex sync.Mutex
    
    // 启动多个协程尝试获取锁
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            lock := NewRedisLock(client, lockKey, 5*time.Second)
            ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
            defer cancel()
            
            if err := lock.Lock(ctx); err == nil {
                mutex.Lock()
                successCount++
                mutex.Unlock()
                
                // 持有锁一段时间
                time.Sleep(100 * time.Millisecond)
                
                // 释放锁
                lock.Unlock()
            }
        }(i)
    }
    
    wg.Wait()
    
    // 检查结果
    if successCount != 1 {
        t.Errorf("Expected 1 successful lock acquisition, got %d", successCount)
    }
}

3. 性能测试

// lock/benchmark_test.go
package lock

import (
    "context"
    "testing"
    "time"
)

func BenchmarkRedisLock(b *testing.B) {
    // 创建 Redis 客户端
    client, err := NewRedisClient("127.0.0.1:6380")
    if err != nil {
        b.Fatalf("Failed to create Redis client: %v", err)
    }
    defer client.conn.Close()
    
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        lock := NewRedisLock(client, "benchmark_lock", 10*time.Second)
        
        ctx := context.Background()
        if err := lock.Lock(ctx); err != nil {
            b.Fatalf("Failed to acquire lock: %v", err)
        }
        
        lock.Unlock()
    }
}

性能分析

锁性能对比

锁类型获取时间释放时间内存使用可靠性
基础锁1ms1ms低中等
可重入锁2ms2ms中等中等
看门狗锁1ms1ms中等高
红锁5ms3ms高很高

并发性能测试

// 测试不同并发度下的锁性能
func BenchmarkConcurrentLock(b *testing.B) {
    client, _ := NewRedisClient("127.0.0.1:6380")
    defer client.conn.Close()
    
    concurrencyLevels := []int{1, 10, 100, 1000}
    
    for _, concurrency := range concurrencyLevels {
        b.Run(fmt.Sprintf("concurrency_%d", concurrency), func(b *testing.B) {
            var wg sync.WaitGroup
            semaphore := make(chan struct{}, concurrency)
            
            b.ResetTimer()
            
            for i := 0; i < b.N; i++ {
                wg.Add(1)
                go func() {
                    defer wg.Done()
                    
                    semaphore <- struct{}{}
                    defer func() { <-semaphore }()
                    
                    lock := NewRedisLock(client, "benchmark_lock", 10*time.Second)
                    ctx := context.Background()
                    
                    if err := lock.Lock(ctx); err == nil {
                        lock.Unlock()
                    }
                }()
            }
            
            wg.Wait()
        })
    }
}

面试要点

1. 分布式锁的实现原理

答案要点:

  • SET NX EX:使用 Redis 的原子操作实现互斥
  • 随机值:防止误删其他客户端的锁
  • Lua 脚本:确保释放锁的原子性
  • TTL:防止死锁,自动过期释放

2. 分布式锁的常见问题

问题:

  • 时钟漂移:不同节点时间不一致
  • 网络分区:网络故障导致锁状态不一致
  • 锁续期:长时间操作需要续期
  • 误删锁:释放了其他客户端的锁

解决方案:

  • 红锁:多节点投票机制
  • 看门狗:自动续期机制
  • 随机值:防止误删
  • Lua 脚本:保证原子性

3. 红锁的争议

支持观点:

  • 高可用性:多节点容错
  • 强一致性:多数节点同意
  • 防止脑裂:避免网络分区问题

反对观点:

  • 性能开销:需要多个节点
  • 实现复杂:增加系统复杂度
  • 时钟依赖:仍然依赖时钟同步

总结

通过本章学习,我们深入理解了:

  1. 分布式锁的设计原理和实现方式
  2. Redis 分布式锁的完整实现
  3. 可重入锁和看门狗锁的高级特性
  4. 红锁的多节点容错机制

分布式锁是分布式系统中的重要组件,为协调多个节点访问共享资源提供了可靠的解决方案。在下一章中,我们将学习缓存一致性策略,了解如何保证缓存与数据库之间的数据一致性。

Prev
混合持久化策略
Next
缓存一致性策略