分布式锁实现
学习目标
- 深入理解分布式锁的设计原理和实现方式
- 掌握基于 Redis 的分布式锁实现
- 理解分布式锁的常见问题和解决方案
- 实现完整的分布式锁库
分布式锁概述
分布式锁是在分布式系统中实现互斥访问共享资源的一种机制。它需要满足以下特性:
基本特性
- 互斥性:同一时刻只有一个客户端能持有锁
- 防死锁:避免客户端崩溃导致锁无法释放
- 可重入性:同一客户端可以多次获取同一把锁
- 容错性:在部分节点故障时仍能正常工作
实现方式对比
实现方式 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Redis | 性能高、实现简单 | 单点故障、时钟依赖 | 高并发、对一致性要求不高 |
ZooKeeper | 强一致性、可靠性高 | 性能较低、实现复杂 | 强一致性要求 |
etcd | 强一致性、性能较好 | 实现复杂 | 云原生环境 |
️ Redis 分布式锁实现
1. 基础锁实现
// lock/redis_lock.go
package lock
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"strconv"
"time"
)
// RedisLock 分布式锁
type RedisLock struct {
client *RedisClient
key string
value string
ttl time.Duration
retryTime time.Duration
retryCount int
}
// RedisClient Redis 客户端
type RedisClient struct {
conn net.Conn
}
// 创建 Redis 客户端
func NewRedisClient(addr string) (*RedisClient, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
return &RedisClient{conn: conn}, nil
}
// 创建分布式锁
func NewRedisLock(client *RedisClient, key string, ttl time.Duration) *RedisLock {
return &RedisLock{
client: client,
key: key,
value: generateRandomValue(),
ttl: ttl,
retryTime: 100 * time.Millisecond,
retryCount: 3,
}
}
// 生成随机值
func generateRandomValue() string {
bytes := make([]byte, 16)
rand.Read(bytes)
return hex.EncodeToString(bytes)
}
// 获取锁
func (l *RedisLock) Lock(ctx context.Context) error {
for i := 0; i < l.retryCount; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
if l.tryLock() {
return nil
}
time.Sleep(l.retryTime)
}
}
return fmt.Errorf("failed to acquire lock after %d attempts", l.retryCount)
}
// 尝试获取锁
func (l *RedisLock) tryLock() bool {
// 使用 SET key value NX EX ttl 命令
cmd := fmt.Sprintf("SET %s %s NX EX %d\r\n", l.key, l.value, int(l.ttl.Seconds()))
// 发送命令
if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
return false
}
// 读取响应
response := make([]byte, 1024)
n, err := l.client.conn.Read(response)
if err != nil {
return false
}
// 检查响应
resp := string(response[:n])
return resp == "+OK\r\n"
}
// 释放锁
func (l *RedisLock) Unlock() error {
// 使用 Lua 脚本确保原子性
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
cmd := fmt.Sprintf("EVAL %s 1 %s %s\r\n", script, l.key, l.value)
// 发送命令
if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
return err
}
// 读取响应
response := make([]byte, 1024)
n, err := l.client.conn.Read(response)
if err != nil {
return err
}
// 检查响应
resp := string(response[:n])
if resp == ":1\r\n" {
return nil
}
return fmt.Errorf("failed to release lock")
}
// 续期锁
func (l *RedisLock) Renew(ctx context.Context) error {
// 使用 Lua 脚本续期
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`
cmd := fmt.Sprintf("EVAL %s 1 %s %s %d\r\n", script, l.key, l.value, int(l.ttl.Seconds()))
// 发送命令
if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
return err
}
// 读取响应
response := make([]byte, 1024)
n, err := l.client.conn.Read(response)
if err != nil {
return err
}
// 检查响应
resp := string(response[:n])
if resp == ":1\r\n" {
return nil
}
return fmt.Errorf("failed to renew lock")
}
// 检查锁是否存在
func (l *RedisLock) IsLocked() bool {
cmd := fmt.Sprintf("GET %s\r\n", l.key)
// 发送命令
if _, err := l.client.conn.Write([]byte(cmd)); err != nil {
return false
}
// 读取响应
response := make([]byte, 1024)
n, err := l.client.conn.Read(response)
if err != nil {
return false
}
// 检查响应
resp := string(response[:n])
return resp != "$-1\r\n"
}
2. 可重入锁实现
// lock/reentrant_lock.go
package lock
import (
"context"
"fmt"
"sync"
)
// ReentrantLock 可重入锁
type ReentrantLock struct {
*RedisLock
mutex sync.Mutex
count int
owner string
}
// 创建可重入锁
func NewReentrantLock(client *RedisClient, key string, ttl time.Duration) *ReentrantLock {
return &ReentrantLock{
RedisLock: NewRedisLock(client, key, ttl),
owner: generateRandomValue(),
}
}
// 获取锁(可重入)
func (r *ReentrantLock) Lock(ctx context.Context) error {
r.mutex.Lock()
defer r.mutex.Unlock()
// 如果已经持有锁,增加计数
if r.count > 0 {
r.count++
return nil
}
// 尝试获取锁
if err := r.RedisLock.Lock(ctx); err != nil {
return err
}
r.count = 1
return nil
}
// 释放锁(可重入)
func (r *ReentrantLock) Unlock() error {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.count <= 0 {
return fmt.Errorf("lock not held")
}
r.count--
// 如果计数为 0,释放锁
if r.count == 0 {
return r.RedisLock.Unlock()
}
return nil
}
// 检查是否持有锁
func (r *ReentrantLock) IsHeld() bool {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.count > 0
}
3. 看门狗锁实现
// lock/watchdog_lock.go
package lock
import (
"context"
"fmt"
"sync"
"time"
)
// WatchdogLock 看门狗锁
type WatchdogLock struct {
*RedisLock
mutex sync.Mutex
stopCh chan struct{}
renewCh chan struct{}
isRenewing bool
}
// 创建看门狗锁
func NewWatchdogLock(client *RedisClient, key string, ttl time.Duration) *WatchdogLock {
return &WatchdogLock{
RedisLock: NewRedisLock(client, key, ttl),
stopCh: make(chan struct{}),
renewCh: make(chan struct{}),
}
}
// 获取锁(带看门狗)
func (w *WatchdogLock) Lock(ctx context.Context) error {
if err := w.RedisLock.Lock(ctx); err != nil {
return err
}
// 启动看门狗
w.startWatchdog()
return nil
}
// 释放锁(带看门狗)
func (w *WatchdogLock) Unlock() error {
w.mutex.Lock()
defer w.mutex.Unlock()
// 停止看门狗
if w.isRenewing {
close(w.stopCh)
w.isRenewing = false
}
return w.RedisLock.Unlock()
}
// 启动看门狗
func (w *WatchdogLock) startWatchdog() {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isRenewing {
return
}
w.isRenewing = true
go w.watchdog()
}
// 看门狗协程
func (w *WatchdogLock) watchdog() {
ticker := time.NewTicker(w.ttl / 2) // 每 TTL/2 时间续期一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := w.RedisLock.Renew(context.Background()); err != nil {
// 续期失败,锁可能已丢失
w.mutex.Lock()
w.isRenewing = false
w.mutex.Unlock()
return
}
case <-w.stopCh:
return
}
}
}
4. 红锁实现
// lock/redlock.go
package lock
import (
"context"
"fmt"
"sync"
"time"
)
// RedLock 红锁
type RedLock struct {
clients []*RedisClient
key string
value string
ttl time.Duration
quorum int
}
// 创建红锁
func NewRedLock(clients []*RedisClient, key string, ttl time.Duration) *RedLock {
return &RedLock{
clients: clients,
key: key,
value: generateRandomValue(),
ttl: ttl,
quorum: len(clients)/2 + 1, // 需要超过一半的节点同意
}
}
// 获取锁
func (r *RedLock) Lock(ctx context.Context) error {
start := time.Now()
// 尝试在所有节点上获取锁
successCount := 0
var wg sync.WaitGroup
var mutex sync.Mutex
for _, client := range r.clients {
wg.Add(1)
go func(c *RedisClient) {
defer wg.Done()
if r.tryLockOnNode(c) {
mutex.Lock()
successCount++
mutex.Unlock()
}
}(client)
}
wg.Wait()
// 检查是否获得足够多的锁
if successCount < r.quorum {
// 释放已获得的锁
r.unlockAll()
return fmt.Errorf("failed to acquire lock on enough nodes: %d/%d", successCount, len(r.clients))
}
// 检查获取锁的时间是否超过 TTL
elapsed := time.Since(start)
if elapsed >= r.ttl {
r.unlockAll()
return fmt.Errorf("lock acquisition took too long: %v", elapsed)
}
return nil
}
// 在单个节点上尝试获取锁
func (r *RedLock) tryLockOnNode(client *RedisClient) bool {
cmd := fmt.Sprintf("SET %s %s NX EX %d\r\n", r.key, r.value, int(r.ttl.Seconds()))
// 发送命令
if _, err := client.conn.Write([]byte(cmd)); err != nil {
return false
}
// 读取响应
response := make([]byte, 1024)
n, err := client.conn.Read(response)
if err != nil {
return false
}
// 检查响应
resp := string(response[:n])
return resp == "+OK\r\n"
}
// 释放锁
func (r *RedLock) Unlock() error {
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1)
go func(c *RedisClient) {
defer wg.Done()
r.unlockOnNode(c)
}(client)
}
wg.Wait()
return nil
}
// 在单个节点上释放锁
func (r *RedLock) unlockOnNode(client *RedisClient) {
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
cmd := fmt.Sprintf("EVAL %s 1 %s %s\r\n", script, r.key, r.value)
// 发送命令
client.conn.Write([]byte(cmd))
// 读取响应(忽略错误)
response := make([]byte, 1024)
client.conn.Read(response)
}
// 释放所有锁
func (r *RedLock) unlockAll() {
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1)
go func(c *RedisClient) {
defer wg.Done()
r.unlockOnNode(c)
}(client)
}
wg.Wait()
}
测试验证
1. 单元测试
// lock/lock_test.go
package lock
import (
"context"
"testing"
"time"
)
func TestRedisLock(t *testing.T) {
// 创建 Redis 客户端
client, err := NewRedisClient("127.0.0.1:6380")
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}
defer client.conn.Close()
// 创建锁
lock := NewRedisLock(client, "test_lock", 10*time.Second)
// 获取锁
ctx := context.Background()
if err := lock.Lock(ctx); err != nil {
t.Fatalf("Failed to acquire lock: %v", err)
}
// 检查锁状态
if !lock.IsLocked() {
t.Error("Lock should be held")
}
// 释放锁
if err := lock.Unlock(); err != nil {
t.Fatalf("Failed to release lock: %v", err)
}
// 检查锁状态
if lock.IsLocked() {
t.Error("Lock should not be held")
}
}
func TestReentrantLock(t *testing.T) {
// 创建 Redis 客户端
client, err := NewRedisClient("127.0.0.1:6380")
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}
defer client.conn.Close()
// 创建可重入锁
lock := NewReentrantLock(client, "test_reentrant_lock", 10*time.Second)
ctx := context.Background()
// 第一次获取锁
if err := lock.Lock(ctx); err != nil {
t.Fatalf("Failed to acquire lock: %v", err)
}
// 第二次获取锁(重入)
if err := lock.Lock(ctx); err != nil {
t.Fatalf("Failed to reacquire lock: %v", err)
}
// 检查锁状态
if !lock.IsHeld() {
t.Error("Lock should be held")
}
// 第一次释放锁
if err := lock.Unlock(); err != nil {
t.Fatalf("Failed to release lock: %v", err)
}
// 检查锁状态
if !lock.IsHeld() {
t.Error("Lock should still be held")
}
// 第二次释放锁
if err := lock.Unlock(); err != nil {
t.Fatalf("Failed to release lock: %v", err)
}
// 检查锁状态
if lock.IsHeld() {
t.Error("Lock should not be held")
}
}
2. 并发测试
// lock/concurrent_test.go
package lock
import (
"context"
"sync"
"testing"
"time"
)
func TestConcurrentLock(t *testing.T) {
// 创建 Redis 客户端
client, err := NewRedisClient("127.0.0.1:6380")
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}
defer client.conn.Close()
const numGoroutines = 10
const lockKey = "concurrent_test_lock"
var wg sync.WaitGroup
var successCount int
var mutex sync.Mutex
// 启动多个协程尝试获取锁
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
lock := NewRedisLock(client, lockKey, 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := lock.Lock(ctx); err == nil {
mutex.Lock()
successCount++
mutex.Unlock()
// 持有锁一段时间
time.Sleep(100 * time.Millisecond)
// 释放锁
lock.Unlock()
}
}(i)
}
wg.Wait()
// 检查结果
if successCount != 1 {
t.Errorf("Expected 1 successful lock acquisition, got %d", successCount)
}
}
3. 性能测试
// lock/benchmark_test.go
package lock
import (
"context"
"testing"
"time"
)
func BenchmarkRedisLock(b *testing.B) {
// 创建 Redis 客户端
client, err := NewRedisClient("127.0.0.1:6380")
if err != nil {
b.Fatalf("Failed to create Redis client: %v", err)
}
defer client.conn.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
lock := NewRedisLock(client, "benchmark_lock", 10*time.Second)
ctx := context.Background()
if err := lock.Lock(ctx); err != nil {
b.Fatalf("Failed to acquire lock: %v", err)
}
lock.Unlock()
}
}
性能分析
锁性能对比
锁类型 | 获取时间 | 释放时间 | 内存使用 | 可靠性 |
---|---|---|---|---|
基础锁 | 1ms | 1ms | 低 | 中等 |
可重入锁 | 2ms | 2ms | 中等 | 中等 |
看门狗锁 | 1ms | 1ms | 中等 | 高 |
红锁 | 5ms | 3ms | 高 | 很高 |
并发性能测试
// 测试不同并发度下的锁性能
func BenchmarkConcurrentLock(b *testing.B) {
client, _ := NewRedisClient("127.0.0.1:6380")
defer client.conn.Close()
concurrencyLevels := []int{1, 10, 100, 1000}
for _, concurrency := range concurrencyLevels {
b.Run(fmt.Sprintf("concurrency_%d", concurrency), func(b *testing.B) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, concurrency)
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
lock := NewRedisLock(client, "benchmark_lock", 10*time.Second)
ctx := context.Background()
if err := lock.Lock(ctx); err == nil {
lock.Unlock()
}
}()
}
wg.Wait()
})
}
}
面试要点
1. 分布式锁的实现原理
答案要点:
- SET NX EX:使用 Redis 的原子操作实现互斥
- 随机值:防止误删其他客户端的锁
- Lua 脚本:确保释放锁的原子性
- TTL:防止死锁,自动过期释放
2. 分布式锁的常见问题
问题:
- 时钟漂移:不同节点时间不一致
- 网络分区:网络故障导致锁状态不一致
- 锁续期:长时间操作需要续期
- 误删锁:释放了其他客户端的锁
解决方案:
- 红锁:多节点投票机制
- 看门狗:自动续期机制
- 随机值:防止误删
- Lua 脚本:保证原子性
3. 红锁的争议
支持观点:
- 高可用性:多节点容错
- 强一致性:多数节点同意
- 防止脑裂:避免网络分区问题
反对观点:
- 性能开销:需要多个节点
- 实现复杂:增加系统复杂度
- 时钟依赖:仍然依赖时钟同步
总结
通过本章学习,我们深入理解了:
- 分布式锁的设计原理和实现方式
- Redis 分布式锁的完整实现
- 可重入锁和看门狗锁的高级特性
- 红锁的多节点容错机制
分布式锁是分布式系统中的重要组件,为协调多个节点访问共享资源提供了可靠的解决方案。在下一章中,我们将学习缓存一致性策略,了解如何保证缓存与数据库之间的数据一致性。