HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 分布式架构模式

    • 分布式架构模式手册
    • 第1章:分布式一致性
    • 第2章:分布式锁
    • 第3章:分布式协调
    • 第4章:服务发现与注册
    • 第5章:负载均衡
    • 第6章:熔断降级
    • 第7章:DDD领域驱动设计
    • 第8章:CQRS与Event Sourcing

第2章:分布式锁

为什么需要分布式锁

单机锁的局限

单机环境:

// 单机锁:sync.Mutex
var mu sync.Mutex
var counter int

func Increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++  // 安全:同一时刻只有一个goroutine执行
}

特点:

  • 简单可靠
  • 性能高(内存操作)
  • 只能在单个进程内有效
  • 无法跨机器协调

分布式环境的挑战

问题场景:库存扣减

电商系统(3个服务实例):

T1: User1请求 → Instance1 → 读库存=10
T2: User2请求 → Instance2 → 读库存=10
T3: Instance1 → 扣减库存 → 写库存=9
T4: Instance2 → 扣减库存 → 写库存=9(应该是8!)

结果:超卖!库存实际应该是8,却变成了9

根本原因:

  • 多个进程/机器同时访问共享资源
  • 单机锁无法跨进程生效
  • 需要分布式锁协调

分布式锁的要求

核心特性:

  1. 互斥性(Mutual Exclusion)
同一时刻,只有一个客户端能持有锁
  1. 防死锁(Deadlock Free)
即使持有锁的客户端崩溃,锁最终也能被释放
  1. 容错性(Fault Tolerance)
只要大部分节点正常,就能获取和释放锁
  1. 自解铃人(Unlock by Owner)
只有持有锁的客户端才能释放锁

分布式锁的实现方案

方案对比

方案实现难度性能可靠性适用场景
数据库小规模系统
Redis高性能场景
ZooKeeper强一致性要求
etcd现代化云原生

方案1:数据库锁

实现:利用数据库的唯一约束

-- 创建锁表
CREATE TABLE distributed_lock (
    lock_key VARCHAR(255) PRIMARY KEY,
    owner VARCHAR(255) NOT NULL,
    expire_time TIMESTAMP NOT NULL
);

-- 获取锁(插入记录)
INSERT INTO distributed_lock (lock_key, owner, expire_time)
VALUES ('order_123', 'server_1', NOW() + INTERVAL 30 SECOND)
ON DUPLICATE KEY UPDATE lock_key=lock_key;  -- 失败则锁已被占用

-- 释放锁(删除记录)
DELETE FROM distributed_lock
WHERE lock_key = 'order_123' AND owner = 'server_1';

Go实现:

type DBLock struct {
    db *sql.DB
}

func (l *DBLock) TryLock(key, owner string, ttl time.Duration) (bool, error) {
    expireTime := time.Now().Add(ttl)

    result, err := l.db.Exec(
        "INSERT INTO distributed_lock (lock_key, owner, expire_time) VALUES (?, ?, ?)",
        key, owner, expireTime,
    )

    if err != nil {
        // 插入失败,锁已被占用
        return false, nil
    }

    affected, _ := result.RowsAffected()
    return affected > 0, nil
}

func (l *DBLock) Unlock(key, owner string) error {
    _, err := l.db.Exec(
        "DELETE FROM distributed_lock WHERE lock_key = ? AND owner = ?",
        key, owner,
    )
    return err
}

优缺点:

  • 实现简单
  • 强一致性(ACID)
  • 性能较低(IO操作)
  • 依赖数据库(单点风险)
  • 过期锁需要定时清理

Redis分布式锁

基础实现:SETNX + EXPIRE

命令:

# 获取锁(SET with NX and EX)
SET lock_key unique_value NX EX 30

# NX:仅在key不存在时设置
# EX 30:30秒后自动过期
# unique_value:唯一标识(防止误删)

Go实现 V1:

type RedisLock struct {
    client *redis.Client
    key    string
    value  string  // 唯一标识(UUID)
    ttl    time.Duration
}

func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
    return &RedisLock{
        client: client,
        key:    key,
        value:  uuid.New().String(),  // 生成唯一标识
        ttl:    ttl,
    }
}

// 获取锁
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
    // SET key value NX EX ttl
    result, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
    if err != nil {
        return false, err
    }

    return result, nil  // true表示获取成功
}

// 释放锁(使用Lua脚本保证原子性)
func (l *RedisLock) Unlock(ctx context.Context) error {
    // Lua脚本:先判断是否是自己的锁,再删除
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `

    _, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    return err
}

// 使用示例
func ExampleRedisLock() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    lock := NewRedisLock(rdb, "order:1001", 30*time.Second)

    ctx := context.Background()

    // 尝试获取锁
    acquired, err := lock.Lock(ctx)
    if err != nil {
        panic(err)
    }

    if !acquired {
        log.Println("Failed to acquire lock")
        return
    }

    // 执行业务逻辑
    defer lock.Unlock(ctx)
    ProcessOrder("1001")
}

进阶:锁续期(看门狗)

问题:业务执行时间超过锁的TTL怎么办?

解决方案:Watch Dog(看门狗机制)

type RedisLockWithRenewal struct {
    RedisLock
    renewalInterval time.Duration
    stopRenewal     chan struct{}
}

func NewRedisLockWithRenewal(client *redis.Client, key string, ttl time.Duration) *RedisLockWithRenewal {
    return &RedisLockWithRenewal{
        RedisLock:       *NewRedisLock(client, key, ttl),
        renewalInterval: ttl / 3,  // 每1/3 TTL续期一次
        stopRenewal:     make(chan struct{}),
    }
}

// 获取锁并启动自动续期
func (l *RedisLockWithRenewal) LockWithRenewal(ctx context.Context) (bool, error) {
    acquired, err := l.Lock(ctx)
    if err != nil || !acquired {
        return acquired, err
    }

    // 启动续期协程
    go l.renewalLoop(ctx)

    return true, nil
}

// 续期循环
func (l *RedisLockWithRenewal) renewalLoop(ctx context.Context) {
    ticker := time.NewTicker(l.renewalInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // 续期:重新设置过期时间
            script := `
                if redis.call("GET", KEYS[1]) == ARGV[1] then
                    return redis.call("EXPIRE", KEYS[1], ARGV[2])
                else
                    return 0
                end
            `
            l.client.Eval(ctx, script, []string{l.key}, l.value, int(l.ttl.Seconds()))

        case <-l.stopRenewal:
            // 停止续期
            return

        case <-ctx.Done():
            // 上下文取消
            return
        }
    }
}

// 释放锁并停止续期
func (l *RedisLockWithRenewal) UnlockWithRenewal(ctx context.Context) error {
    close(l.stopRenewal)  // 停止续期
    return l.Unlock(ctx)
}

// 使用示例
func ExampleLockWithRenewal() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    lock := NewRedisLockWithRenewal(rdb, "order:1001", 10*time.Second)

    ctx := context.Background()
    acquired, _ := lock.LockWithRenewal(ctx)

    if acquired {
        defer lock.UnlockWithRenewal(ctx)

        // 长时间业务逻辑(可能超过10秒)
        time.Sleep(30 * time.Second)  // 锁会自动续期
        ProcessOrder("1001")
    }
}

Redlock算法

问题:Redis主从切换时可能丢失锁

场景:
T1: Client1从Master获取锁
T2: Master宕机(锁数据未同步到Slave)
T3: Slave提升为新Master
T4: Client2从新Master获取同一把锁  两个客户端都持有锁!

Redlock解决方案:使用多个独立的Redis实例

算法步骤:

  1. 获取当前时间戳(毫秒)
  2. 依次向N个Redis实例请求锁
  3. 如果超过半数(N/2+1)实例获取成功,且总耗时<锁的有效期,则认为获取锁成功
  4. 锁的实际有效期 = TTL - 获取锁的耗时
  5. 如果获取失败,向所有实例发送释放锁命令

实现:

type Redlock struct {
    clients []*redis.Client
    quorum  int  // 需要成功的实例数
}

func NewRedlock(addrs []string) *Redlock {
    clients := make([]*redis.Client, len(addrs))
    for i, addr := range addrs {
        clients[i] = redis.NewClient(&redis.Options{Addr: addr})
    }

    return &Redlock{
        clients: clients,
        quorum:  len(addrs)/2 + 1,  // 多数派
    }
}

func (r *Redlock) Lock(ctx context.Context, key, value string, ttl time.Duration) (bool, error) {
    start := time.Now()
    successCount := 0

    // 向所有实例请求锁
    for _, client := range r.clients {
        ok, err := client.SetNX(ctx, key, value, ttl).Result()
        if err == nil && ok {
            successCount++
        }
    }

    // 计算耗时
    elapsed := time.Since(start)

    // 判断是否成功
    if successCount >= r.quorum && elapsed < ttl {
        return true, nil  // 获取锁成功
    }

    // 获取失败,释放已获取的锁
    r.Unlock(ctx, key, value)
    return false, nil
}

func (r *Redlock) Unlock(ctx context.Context, key, value string) error {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `

    // 向所有实例发送释放命令
    for _, client := range r.clients {
        client.Eval(ctx, script, []string{key}, value)
    }

    return nil
}

Redlock特点:

  • 容错性强(多数节点故障仍可用)
  • 无主从复制延迟问题
  • 实现复杂
  • 需要部署多个独立Redis实例
  • 仍有争议(Martin Kleppmann批评)

ZooKeeper分布式锁

实现原理

核心机制:临时顺序节点

ZooKeeper目录结构:
/locks
  ├── lock-0000000001  (临时顺序节点)
  ├── lock-0000000002
  ├── lock-0000000003
  └── lock-0000000004

规则:
1. 客户端创建临时顺序节点
2. 序号最小的节点获得锁
3. 其他节点Watch前一个节点
4. 前一个节点删除时,下一个节点获得锁

流程图:

Client1 → Create /locks/lock-0000000001 → 序号最小 → 获得锁 
Client2 → Create /locks/lock-0000000002 → Watch lock-0000000001 → 等待
Client3 → Create /locks/lock-0000000003 → Watch lock-0000000002 → 等待

Client1释放锁 → Delete lock-0000000001
                         ↓
Client2收到通知 → 序号最小 → 获得锁 

Go实现:

type ZKLock struct {
    conn *zk.Conn
    path string  // 锁的根路径,如 "/locks/my-lock"
    node string  // 创建的节点路径
}

func NewZKLock(conn *zk.Conn, path string) *ZKLock {
    return &ZKLock{
        conn: conn,
        path: path,
    }
}

// 获取锁
func (l *ZKLock) Lock() error {
    // 1. 创建临时顺序节点
    node, err := l.conn.Create(
        l.path+"/lock-",
        []byte{},
        zk.FlagEphemeral|zk.FlagSequence,  // 临时+顺序
        zk.WorldACL(zk.PermAll),
    )
    if err != nil {
        return err
    }

    l.node = node

    for {
        // 2. 获取所有子节点
        children, _, err := l.conn.Children(l.path)
        if err != nil {
            return err
        }

        // 3. 排序
        sort.Strings(children)

        // 4. 判断自己是否是最小的
        mySeq := strings.TrimPrefix(node, l.path+"/")
        if children[0] == mySeq {
            // 获得锁
            return nil
        }

        // 5. 找到前一个节点
        var prevNode string
        for i, child := range children {
            if child == mySeq && i > 0 {
                prevNode = children[i-1]
                break
            }
        }

        // 6. Watch前一个节点
        exists, _, event, err := l.conn.ExistsW(l.path + "/" + prevNode)
        if err != nil {
            return err
        }

        if !exists {
            // 前一个节点已删除,重新尝试
            continue
        }

        // 7. 等待前一个节点删除
        <-event
    }
}

// 释放锁
func (l *ZKLock) Unlock() error {
    return l.conn.Delete(l.node, -1)
}

// 使用示例
func ExampleZKLock() {
    conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    lock := NewZKLock(conn, "/locks/order")

    // 获取锁
    if err := lock.Lock(); err != nil {
        panic(err)
    }

    // 执行业务逻辑
    defer lock.Unlock()
    ProcessOrder()
}

ZooKeeper锁特点:

  • 强一致性(CP系统)
  • 自动释放(临时节点,会话断开自动删除)
  • 公平锁(按顺序获取)
  • 性能较低(网络往返)
  • 依赖ZooKeeper集群

常见问题与解决方案

问题1:死锁

场景:

Client1获取锁后崩溃,锁未释放
→ 其他客户端永远无法获取锁
→ 系统死锁

解决方案:

  1. 设置过期时间(TTL)
// Redis:自动过期
redis.Set(key, value, 30*time.Second)

// ZooKeeper:临时节点(会话结束自动删除)
conn.Create(path, data, zk.FlagEphemeral, acl)
  1. 心跳续期
// 业务执行超过TTL时,定期续期
go func() {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        redis.Expire(key, 30*time.Second)
    }
}()

问题2:误删锁

场景:

T1: Client1获取锁(TTL=30s)
T2: Client1业务执行超时(35s)
T3: 锁自动过期,Client2获取锁
T4: Client1业务完成,删除锁 → 误删了Client2的锁!

解决方案:唯一标识 + Lua脚本

// 获取锁时生成唯一标识
value := uuid.New().String()
redis.SetNX(key, value, ttl)

// 释放锁时验证身份
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end
`
redis.Eval(script, []string{key}, value)

问题3:锁超时

场景:

业务执行时间:40秒
锁的TTL:30秒
→ 锁提前释放,可能被其他客户端获取

解决方案:

  1. 合理设置TTL
TTL = 业务执行时间 * 2
  1. 看门狗机制(自动续期)
go func() {
    ticker := time.NewTicker(ttl / 3)
    for range ticker.C {
        redis.Expire(key, ttl)
    }
}()
  1. Redisson的实现(Java)
RLock lock = redisson.getLock("myLock");
lock.lock();  // 默认30秒,自动续期

try {
    // 业务逻辑(可以执行任意时间)
} finally {
    lock.unlock();
}

问题4:Redis主从切换

场景:

T1: Client1从Master获取锁
T2: Master宕机(数据未同步到Slave)
T3: Slave提升为新Master
T4: Client2从新Master获取同一把锁
→ 两个客户端同时持有锁!

解决方案:

  1. Redlock算法(多个独立Redis)
  2. 使用ZooKeeper/etcd(强一致性)
  3. 业务层幂等性(即使锁失效也不影响正确性)

实战案例

案例:库存扣减

场景:电商秒杀,防止超卖

type InventoryService struct {
    redis *redis.Client
    db    *sql.DB
}

// 扣减库存(带分布式锁)
func (s *InventoryService) DeductStock(productID string, quantity int) error {
    lockKey := "lock:product:" + productID
    lock := NewRedisLockWithRenewal(s.redis, lockKey, 10*time.Second)

    ctx := context.Background()

    // 获取锁
    acquired, err := lock.LockWithRenewal(ctx)
    if err != nil {
        return err
    }

    if !acquired {
        return errors.New("too many requests, please retry")
    }

    defer lock.UnlockWithRenewal(ctx)

    // 查询库存
    var stock int
    err = s.db.QueryRow("SELECT stock FROM products WHERE id = ?", productID).Scan(&stock)
    if err != nil {
        return err
    }

    // 判断库存
    if stock < quantity {
        return errors.New("insufficient stock")
    }

    // 扣减库存
    _, err = s.db.Exec("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, productID)
    if err != nil {
        return err
    }

    return nil
}

优化版:基于Redis的库存扣减

// 使用Lua脚本保证原子性
func (s *InventoryService) DeductStockRedis(productID string, quantity int) error {
    script := `
local stock = redis.call('GET', KEYS[1])
if not stock then
    return -1  -- 商品不存在
end

stock = tonumber(stock)
local requested = tonumber(ARGV[1])

if stock >= requested then
    redis.call('DECRBY', KEYS[1], requested)
    return 1  -- 扣减成功
else
    return 0  -- 库存不足
end
`

    stockKey := "stock:" + productID
    result, err := s.redis.Eval(context.Background(), script, []string{stockKey}, quantity).Int()

    if err != nil {
        return err
    }

    switch result {
    case -1:
        return errors.New("product not found")
    case 0:
        return errors.New("insufficient stock")
    default:
        // 异步更新数据库
        go s.syncStockToDB(productID)
        return nil
    }
}

面试问答

Redis分布式锁为什么要用Lua脚本释放锁?

答案:

问题:分两步操作不是原子的

//  错误做法
value := redis.Get(key)
if value == myValue {
    redis.Del(key)  // 可能删除了别人的锁!
}

// 问题:
// T1: 读取value,验证通过
// T2: 锁过期,被Client2获取
// T3: 执行Del,删除了Client2的锁!

解决:Lua脚本保证原子性

--  正确做法
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end

-- Lua脚本在Redis中单线程执行,不会被打断

Redlock算法可靠吗?有什么争议?

答案:

Martin Kleppmann的批评:

  1. 时钟依赖问题
假设:
- Client1从3个节点获取锁(t1时刻)
- Client1的时钟突然快进(时钟漂移)
- 锁过期时间到了(从Client1视角还没到)
- Client2获取锁成功
→ 两个客户端同时持有锁!
  1. 性能问题
需要向N个节点请求(N通常=5)
延迟 = max(各节点延迟) + 网络往返

Antirez(Redis作者)的反驳:

  1. Redlock不依赖时钟同步,只要时钟漂移<锁的TTL即可
  2. 时钟问题是所有分布式系统都要面对的

实践建议:

场景推荐方案
强一致性ZooKeeper/etcd(CP系统)
高性能Redis单实例+业务幂等
高可用Redlock(多个独立Redis)

ZooKeeper锁和Redis锁如何选择?

答案:

对比维度Redis锁ZooKeeper锁
性能高(内存操作)中等(网络+磁盘)
一致性弱(主从复制延迟)强(Paxos/ZAB)
可靠性需Redlock保证天然可靠
公平性非公平公平(顺序节点)
实现难度低中
维护成本低高(需ZK集群)

选择建议:

Redis锁:
 秒杀、抢购(高并发)
 缓存更新(允许偶尔失效)
 临时限流

ZooKeeper锁:
 配置更新(需要强一致)
 Leader选举
 分布式任务调度

如何实现可重入锁?

答案:

概念:同一个线程可以多次获取同一把锁

实现思路:

  1. 记录锁的持有者
  2. 记录重入次数
  3. 释放锁时递减计数,为0时真正释放

Redis实现:

type ReentrantLock struct {
    client *redis.Client
    key    string
    owner  string  // 线程ID
}

// 获取锁
func (l *ReentrantLock) Lock(ctx context.Context) error {
    script := `
local owner = redis.call('HGET', KEYS[1], 'owner')
local count = redis.call('HGET', KEYS[1], 'count')

if owner == false then
    -- 锁不存在,直接获取
    redis.call('HSET', KEYS[1], 'owner', ARGV[1])
    redis.call('HSET', KEYS[1], 'count', 1)
    redis.call('EXPIRE', KEYS[1], ARGV[2])
    return 1
elseif owner == ARGV[1] then
    -- 同一个持有者,重入
    redis.call('HINCRBY', KEYS[1], 'count', 1)
    redis.call('EXPIRE', KEYS[1], ARGV[2])
    return 1
else
    -- 被其他线程持有
    return 0
end
`

    result, err := l.client.Eval(ctx, script, []string{l.key}, l.owner, 30).Int()
    if err != nil {
        return err
    }

    if result == 0 {
        return errors.New("lock held by another thread")
    }

    return nil
}

// 释放锁
func (l *ReentrantLock) Unlock(ctx context.Context) error {
    script := `
local owner = redis.call('HGET', KEYS[1], 'owner')

if owner ~= ARGV[1] then
    return 0  -- 不是自己的锁
end

local count = tonumber(redis.call('HGET', KEYS[1], 'count'))
if count > 1 then
    -- 重入次数-1
    redis.call('HINCRBY', KEYS[1], 'count', -1)
    return 1
else
    -- 真正释放锁
    redis.call('DEL', KEYS[1])
    return 1
end
`

    _, err := l.client.Eval(ctx, script, []string{l.key}, l.owner).Result()
    return err
}

如何实现读写锁?

答案:

概念:

  • 读锁(共享锁):多个线程可同时持有
  • 写锁(排他锁):只有一个线程可持有,且不能有读锁

实现思路:

Redis数据结构:
Hash {
    "write": "thread_id" 或 ""
    "read":  "thread1:1,thread2:1,thread3:1"  (线程ID:重入次数)
}

规则:
1. 获取读锁:write为空,且自己不在read中
2. 获取写锁:write为空,且read为空
3. 释放读锁:从read中移除自己
4. 释放写锁:清空write

Prev
第1章:分布式一致性
Next
第3章:分布式协调