第2章:分布式锁
为什么需要分布式锁
单机锁的局限
单机环境:
// 单机锁:sync.Mutex
var mu sync.Mutex
var counter int
func Increment() {
mu.Lock()
defer mu.Unlock()
counter++ // 安全:同一时刻只有一个goroutine执行
}
特点:
- 简单可靠
- 性能高(内存操作)
- 只能在单个进程内有效
- 无法跨机器协调
分布式环境的挑战
问题场景:库存扣减
电商系统(3个服务实例):
T1: User1请求 → Instance1 → 读库存=10
T2: User2请求 → Instance2 → 读库存=10
T3: Instance1 → 扣减库存 → 写库存=9
T4: Instance2 → 扣减库存 → 写库存=9(应该是8!)
结果:超卖!库存实际应该是8,却变成了9
根本原因:
- 多个进程/机器同时访问共享资源
- 单机锁无法跨进程生效
- 需要分布式锁协调
分布式锁的要求
核心特性:
- 互斥性(Mutual Exclusion)
同一时刻,只有一个客户端能持有锁
- 防死锁(Deadlock Free)
即使持有锁的客户端崩溃,锁最终也能被释放
- 容错性(Fault Tolerance)
只要大部分节点正常,就能获取和释放锁
- 自解铃人(Unlock by Owner)
只有持有锁的客户端才能释放锁
分布式锁的实现方案
方案对比
| 方案 | 实现难度 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| 数据库 | 小规模系统 | |||
| Redis | 高性能场景 | |||
| ZooKeeper | 强一致性要求 | |||
| etcd | 现代化云原生 |
方案1:数据库锁
实现:利用数据库的唯一约束
-- 创建锁表
CREATE TABLE distributed_lock (
lock_key VARCHAR(255) PRIMARY KEY,
owner VARCHAR(255) NOT NULL,
expire_time TIMESTAMP NOT NULL
);
-- 获取锁(插入记录)
INSERT INTO distributed_lock (lock_key, owner, expire_time)
VALUES ('order_123', 'server_1', NOW() + INTERVAL 30 SECOND)
ON DUPLICATE KEY UPDATE lock_key=lock_key; -- 失败则锁已被占用
-- 释放锁(删除记录)
DELETE FROM distributed_lock
WHERE lock_key = 'order_123' AND owner = 'server_1';
Go实现:
type DBLock struct {
db *sql.DB
}
func (l *DBLock) TryLock(key, owner string, ttl time.Duration) (bool, error) {
expireTime := time.Now().Add(ttl)
result, err := l.db.Exec(
"INSERT INTO distributed_lock (lock_key, owner, expire_time) VALUES (?, ?, ?)",
key, owner, expireTime,
)
if err != nil {
// 插入失败,锁已被占用
return false, nil
}
affected, _ := result.RowsAffected()
return affected > 0, nil
}
func (l *DBLock) Unlock(key, owner string) error {
_, err := l.db.Exec(
"DELETE FROM distributed_lock WHERE lock_key = ? AND owner = ?",
key, owner,
)
return err
}
优缺点:
- 实现简单
- 强一致性(ACID)
- 性能较低(IO操作)
- 依赖数据库(单点风险)
- 过期锁需要定时清理
Redis分布式锁
基础实现:SETNX + EXPIRE
命令:
# 获取锁(SET with NX and EX)
SET lock_key unique_value NX EX 30
# NX:仅在key不存在时设置
# EX 30:30秒后自动过期
# unique_value:唯一标识(防止误删)
Go实现 V1:
type RedisLock struct {
client *redis.Client
key string
value string // 唯一标识(UUID)
ttl time.Duration
}
func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
return &RedisLock{
client: client,
key: key,
value: uuid.New().String(), // 生成唯一标识
ttl: ttl,
}
}
// 获取锁
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
// SET key value NX EX ttl
result, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
if err != nil {
return false, err
}
return result, nil // true表示获取成功
}
// 释放锁(使用Lua脚本保证原子性)
func (l *RedisLock) Unlock(ctx context.Context) error {
// Lua脚本:先判断是否是自己的锁,再删除
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
_, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
return err
}
// 使用示例
func ExampleRedisLock() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
lock := NewRedisLock(rdb, "order:1001", 30*time.Second)
ctx := context.Background()
// 尝试获取锁
acquired, err := lock.Lock(ctx)
if err != nil {
panic(err)
}
if !acquired {
log.Println("Failed to acquire lock")
return
}
// 执行业务逻辑
defer lock.Unlock(ctx)
ProcessOrder("1001")
}
进阶:锁续期(看门狗)
问题:业务执行时间超过锁的TTL怎么办?
解决方案:Watch Dog(看门狗机制)
type RedisLockWithRenewal struct {
RedisLock
renewalInterval time.Duration
stopRenewal chan struct{}
}
func NewRedisLockWithRenewal(client *redis.Client, key string, ttl time.Duration) *RedisLockWithRenewal {
return &RedisLockWithRenewal{
RedisLock: *NewRedisLock(client, key, ttl),
renewalInterval: ttl / 3, // 每1/3 TTL续期一次
stopRenewal: make(chan struct{}),
}
}
// 获取锁并启动自动续期
func (l *RedisLockWithRenewal) LockWithRenewal(ctx context.Context) (bool, error) {
acquired, err := l.Lock(ctx)
if err != nil || !acquired {
return acquired, err
}
// 启动续期协程
go l.renewalLoop(ctx)
return true, nil
}
// 续期循环
func (l *RedisLockWithRenewal) renewalLoop(ctx context.Context) {
ticker := time.NewTicker(l.renewalInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 续期:重新设置过期时间
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`
l.client.Eval(ctx, script, []string{l.key}, l.value, int(l.ttl.Seconds()))
case <-l.stopRenewal:
// 停止续期
return
case <-ctx.Done():
// 上下文取消
return
}
}
}
// 释放锁并停止续期
func (l *RedisLockWithRenewal) UnlockWithRenewal(ctx context.Context) error {
close(l.stopRenewal) // 停止续期
return l.Unlock(ctx)
}
// 使用示例
func ExampleLockWithRenewal() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
lock := NewRedisLockWithRenewal(rdb, "order:1001", 10*time.Second)
ctx := context.Background()
acquired, _ := lock.LockWithRenewal(ctx)
if acquired {
defer lock.UnlockWithRenewal(ctx)
// 长时间业务逻辑(可能超过10秒)
time.Sleep(30 * time.Second) // 锁会自动续期
ProcessOrder("1001")
}
}
Redlock算法
问题:Redis主从切换时可能丢失锁
场景:
T1: Client1从Master获取锁
T2: Master宕机(锁数据未同步到Slave)
T3: Slave提升为新Master
T4: Client2从新Master获取同一把锁 两个客户端都持有锁!
Redlock解决方案:使用多个独立的Redis实例
算法步骤:
- 获取当前时间戳(毫秒)
- 依次向N个Redis实例请求锁
- 如果超过半数(N/2+1)实例获取成功,且总耗时<锁的有效期,则认为获取锁成功
- 锁的实际有效期 = TTL - 获取锁的耗时
- 如果获取失败,向所有实例发送释放锁命令
实现:
type Redlock struct {
clients []*redis.Client
quorum int // 需要成功的实例数
}
func NewRedlock(addrs []string) *Redlock {
clients := make([]*redis.Client, len(addrs))
for i, addr := range addrs {
clients[i] = redis.NewClient(&redis.Options{Addr: addr})
}
return &Redlock{
clients: clients,
quorum: len(addrs)/2 + 1, // 多数派
}
}
func (r *Redlock) Lock(ctx context.Context, key, value string, ttl time.Duration) (bool, error) {
start := time.Now()
successCount := 0
// 向所有实例请求锁
for _, client := range r.clients {
ok, err := client.SetNX(ctx, key, value, ttl).Result()
if err == nil && ok {
successCount++
}
}
// 计算耗时
elapsed := time.Since(start)
// 判断是否成功
if successCount >= r.quorum && elapsed < ttl {
return true, nil // 获取锁成功
}
// 获取失败,释放已获取的锁
r.Unlock(ctx, key, value)
return false, nil
}
func (r *Redlock) Unlock(ctx context.Context, key, value string) error {
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
// 向所有实例发送释放命令
for _, client := range r.clients {
client.Eval(ctx, script, []string{key}, value)
}
return nil
}
Redlock特点:
- 容错性强(多数节点故障仍可用)
- 无主从复制延迟问题
- 实现复杂
- 需要部署多个独立Redis实例
- 仍有争议(Martin Kleppmann批评)
ZooKeeper分布式锁
实现原理
核心机制:临时顺序节点
ZooKeeper目录结构:
/locks
├── lock-0000000001 (临时顺序节点)
├── lock-0000000002
├── lock-0000000003
└── lock-0000000004
规则:
1. 客户端创建临时顺序节点
2. 序号最小的节点获得锁
3. 其他节点Watch前一个节点
4. 前一个节点删除时,下一个节点获得锁
流程图:
Client1 → Create /locks/lock-0000000001 → 序号最小 → 获得锁
Client2 → Create /locks/lock-0000000002 → Watch lock-0000000001 → 等待
Client3 → Create /locks/lock-0000000003 → Watch lock-0000000002 → 等待
Client1释放锁 → Delete lock-0000000001
↓
Client2收到通知 → 序号最小 → 获得锁
Go实现:
type ZKLock struct {
conn *zk.Conn
path string // 锁的根路径,如 "/locks/my-lock"
node string // 创建的节点路径
}
func NewZKLock(conn *zk.Conn, path string) *ZKLock {
return &ZKLock{
conn: conn,
path: path,
}
}
// 获取锁
func (l *ZKLock) Lock() error {
// 1. 创建临时顺序节点
node, err := l.conn.Create(
l.path+"/lock-",
[]byte{},
zk.FlagEphemeral|zk.FlagSequence, // 临时+顺序
zk.WorldACL(zk.PermAll),
)
if err != nil {
return err
}
l.node = node
for {
// 2. 获取所有子节点
children, _, err := l.conn.Children(l.path)
if err != nil {
return err
}
// 3. 排序
sort.Strings(children)
// 4. 判断自己是否是最小的
mySeq := strings.TrimPrefix(node, l.path+"/")
if children[0] == mySeq {
// 获得锁
return nil
}
// 5. 找到前一个节点
var prevNode string
for i, child := range children {
if child == mySeq && i > 0 {
prevNode = children[i-1]
break
}
}
// 6. Watch前一个节点
exists, _, event, err := l.conn.ExistsW(l.path + "/" + prevNode)
if err != nil {
return err
}
if !exists {
// 前一个节点已删除,重新尝试
continue
}
// 7. 等待前一个节点删除
<-event
}
}
// 释放锁
func (l *ZKLock) Unlock() error {
return l.conn.Delete(l.node, -1)
}
// 使用示例
func ExampleZKLock() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second)
if err != nil {
panic(err)
}
defer conn.Close()
lock := NewZKLock(conn, "/locks/order")
// 获取锁
if err := lock.Lock(); err != nil {
panic(err)
}
// 执行业务逻辑
defer lock.Unlock()
ProcessOrder()
}
ZooKeeper锁特点:
- 强一致性(CP系统)
- 自动释放(临时节点,会话断开自动删除)
- 公平锁(按顺序获取)
- 性能较低(网络往返)
- 依赖ZooKeeper集群
常见问题与解决方案
问题1:死锁
场景:
Client1获取锁后崩溃,锁未释放
→ 其他客户端永远无法获取锁
→ 系统死锁
解决方案:
- 设置过期时间(TTL)
// Redis:自动过期
redis.Set(key, value, 30*time.Second)
// ZooKeeper:临时节点(会话结束自动删除)
conn.Create(path, data, zk.FlagEphemeral, acl)
- 心跳续期
// 业务执行超过TTL时,定期续期
go func() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
redis.Expire(key, 30*time.Second)
}
}()
问题2:误删锁
场景:
T1: Client1获取锁(TTL=30s)
T2: Client1业务执行超时(35s)
T3: 锁自动过期,Client2获取锁
T4: Client1业务完成,删除锁 → 误删了Client2的锁!
解决方案:唯一标识 + Lua脚本
// 获取锁时生成唯一标识
value := uuid.New().String()
redis.SetNX(key, value, ttl)
// 释放锁时验证身份
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
redis.Eval(script, []string{key}, value)
问题3:锁超时
场景:
业务执行时间:40秒
锁的TTL:30秒
→ 锁提前释放,可能被其他客户端获取
解决方案:
- 合理设置TTL
TTL = 业务执行时间 * 2
- 看门狗机制(自动续期)
go func() {
ticker := time.NewTicker(ttl / 3)
for range ticker.C {
redis.Expire(key, ttl)
}
}()
- Redisson的实现(Java)
RLock lock = redisson.getLock("myLock");
lock.lock(); // 默认30秒,自动续期
try {
// 业务逻辑(可以执行任意时间)
} finally {
lock.unlock();
}
问题4:Redis主从切换
场景:
T1: Client1从Master获取锁
T2: Master宕机(数据未同步到Slave)
T3: Slave提升为新Master
T4: Client2从新Master获取同一把锁
→ 两个客户端同时持有锁!
解决方案:
- Redlock算法(多个独立Redis)
- 使用ZooKeeper/etcd(强一致性)
- 业务层幂等性(即使锁失效也不影响正确性)
实战案例
案例:库存扣减
场景:电商秒杀,防止超卖
type InventoryService struct {
redis *redis.Client
db *sql.DB
}
// 扣减库存(带分布式锁)
func (s *InventoryService) DeductStock(productID string, quantity int) error {
lockKey := "lock:product:" + productID
lock := NewRedisLockWithRenewal(s.redis, lockKey, 10*time.Second)
ctx := context.Background()
// 获取锁
acquired, err := lock.LockWithRenewal(ctx)
if err != nil {
return err
}
if !acquired {
return errors.New("too many requests, please retry")
}
defer lock.UnlockWithRenewal(ctx)
// 查询库存
var stock int
err = s.db.QueryRow("SELECT stock FROM products WHERE id = ?", productID).Scan(&stock)
if err != nil {
return err
}
// 判断库存
if stock < quantity {
return errors.New("insufficient stock")
}
// 扣减库存
_, err = s.db.Exec("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, productID)
if err != nil {
return err
}
return nil
}
优化版:基于Redis的库存扣减
// 使用Lua脚本保证原子性
func (s *InventoryService) DeductStockRedis(productID string, quantity int) error {
script := `
local stock = redis.call('GET', KEYS[1])
if not stock then
return -1 -- 商品不存在
end
stock = tonumber(stock)
local requested = tonumber(ARGV[1])
if stock >= requested then
redis.call('DECRBY', KEYS[1], requested)
return 1 -- 扣减成功
else
return 0 -- 库存不足
end
`
stockKey := "stock:" + productID
result, err := s.redis.Eval(context.Background(), script, []string{stockKey}, quantity).Int()
if err != nil {
return err
}
switch result {
case -1:
return errors.New("product not found")
case 0:
return errors.New("insufficient stock")
default:
// 异步更新数据库
go s.syncStockToDB(productID)
return nil
}
}
面试问答
Redis分布式锁为什么要用Lua脚本释放锁?
答案:
问题:分两步操作不是原子的
// 错误做法
value := redis.Get(key)
if value == myValue {
redis.Del(key) // 可能删除了别人的锁!
}
// 问题:
// T1: 读取value,验证通过
// T2: 锁过期,被Client2获取
// T3: 执行Del,删除了Client2的锁!
解决:Lua脚本保证原子性
-- 正确做法
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
-- Lua脚本在Redis中单线程执行,不会被打断
Redlock算法可靠吗?有什么争议?
答案:
Martin Kleppmann的批评:
- 时钟依赖问题
假设:
- Client1从3个节点获取锁(t1时刻)
- Client1的时钟突然快进(时钟漂移)
- 锁过期时间到了(从Client1视角还没到)
- Client2获取锁成功
→ 两个客户端同时持有锁!
- 性能问题
需要向N个节点请求(N通常=5)
延迟 = max(各节点延迟) + 网络往返
Antirez(Redis作者)的反驳:
- Redlock不依赖时钟同步,只要时钟漂移<锁的TTL即可
- 时钟问题是所有分布式系统都要面对的
实践建议:
| 场景 | 推荐方案 |
|---|---|
| 强一致性 | ZooKeeper/etcd(CP系统) |
| 高性能 | Redis单实例+业务幂等 |
| 高可用 | Redlock(多个独立Redis) |
ZooKeeper锁和Redis锁如何选择?
答案:
| 对比维度 | Redis锁 | ZooKeeper锁 |
|---|---|---|
| 性能 | 高(内存操作) | 中等(网络+磁盘) |
| 一致性 | 弱(主从复制延迟) | 强(Paxos/ZAB) |
| 可靠性 | 需Redlock保证 | 天然可靠 |
| 公平性 | 非公平 | 公平(顺序节点) |
| 实现难度 | 低 | 中 |
| 维护成本 | 低 | 高(需ZK集群) |
选择建议:
Redis锁:
秒杀、抢购(高并发)
缓存更新(允许偶尔失效)
临时限流
ZooKeeper锁:
配置更新(需要强一致)
Leader选举
分布式任务调度
如何实现可重入锁?
答案:
概念:同一个线程可以多次获取同一把锁
实现思路:
- 记录锁的持有者
- 记录重入次数
- 释放锁时递减计数,为0时真正释放
Redis实现:
type ReentrantLock struct {
client *redis.Client
key string
owner string // 线程ID
}
// 获取锁
func (l *ReentrantLock) Lock(ctx context.Context) error {
script := `
local owner = redis.call('HGET', KEYS[1], 'owner')
local count = redis.call('HGET', KEYS[1], 'count')
if owner == false then
-- 锁不存在,直接获取
redis.call('HSET', KEYS[1], 'owner', ARGV[1])
redis.call('HSET', KEYS[1], 'count', 1)
redis.call('EXPIRE', KEYS[1], ARGV[2])
return 1
elseif owner == ARGV[1] then
-- 同一个持有者,重入
redis.call('HINCRBY', KEYS[1], 'count', 1)
redis.call('EXPIRE', KEYS[1], ARGV[2])
return 1
else
-- 被其他线程持有
return 0
end
`
result, err := l.client.Eval(ctx, script, []string{l.key}, l.owner, 30).Int()
if err != nil {
return err
}
if result == 0 {
return errors.New("lock held by another thread")
}
return nil
}
// 释放锁
func (l *ReentrantLock) Unlock(ctx context.Context) error {
script := `
local owner = redis.call('HGET', KEYS[1], 'owner')
if owner ~= ARGV[1] then
return 0 -- 不是自己的锁
end
local count = tonumber(redis.call('HGET', KEYS[1], 'count'))
if count > 1 then
-- 重入次数-1
redis.call('HINCRBY', KEYS[1], 'count', -1)
return 1
else
-- 真正释放锁
redis.call('DEL', KEYS[1])
return 1
end
`
_, err := l.client.Eval(ctx, script, []string{l.key}, l.owner).Result()
return err
}
如何实现读写锁?
答案:
概念:
- 读锁(共享锁):多个线程可同时持有
- 写锁(排他锁):只有一个线程可持有,且不能有读锁
实现思路:
Redis数据结构:
Hash {
"write": "thread_id" 或 ""
"read": "thread1:1,thread2:1,thread3:1" (线程ID:重入次数)
}
规则:
1. 获取读锁:write为空,且自己不在read中
2. 获取写锁:write为空,且read为空
3. 释放读锁:从read中移除自己
4. 释放写锁:清空write