06 - 限流系统设计
面试频率: | 难度等级: | 推荐时长: 45-60 分钟
目录
需求分析与澄清
业务场景
限流系统是分布式系统的核心保护机制,用于控制流量速率,防止系统过载。典型应用场景:
- API网关限流:保护后端服务,防止流量洪峰
- 防刷限流:短信验证码、优惠券领取
- 资源保护:数据库连接池、线程池
- 业务限流:秒杀活动、抢购场景
- 第三方接口限流:调用外部API的频率控制
核心挑战
限流系统的六大挑战:
┌─────────────────────────────────────────────────────────┐
│ 1. 精准控制 │
│ - 精确限制请求速率 │
│ - 避免误杀正常请求 │
│ │
│ 2. 分布式协调 │
│ - 多节点统一限流 │
│ - 保证全局计数准确 │
│ │
│ 3. 高性能 │
│ - 延迟 < 1ms │
│ - 不能成为性能瓶颈 │
│ │
│ 4. 灵活配置 │
│ - 支持动态调整限流规则 │
│ - 不重启服务即可生效 │
│ │
│ 5. 降级熔断 │
│ - 限流后的降级策略 │
│ - 自动熔断异常服务 │
│ │
│ 6. 多维度限流 │
│ - 用户维度、IP维度、接口维度 │
│ - 组合条件限流 │
└─────────────────────────────────────────────────────────┘
功能性需求
面试官视角的关键问题
面试官: "设计一个限流系统,保护我们的API不被打垮。"
你需要主动澄清以下需求:
Q1: 限流的维度是什么?
单机限流: 限制单台服务器的请求速率(简单)
分布式限流: 限制整个集群的请求速率(复杂)
维度选择:
- 用户维度: 限制单个用户的请求速率
- IP维度: 限制单个IP的请求速率
- 接口维度: 限制单个API的总请求速率
- 租户维度: 多租户SaaS系统
【本次设计】
- 支持多维度限流
- 重点实现分布式限流
Q2: 限流精度要求?
秒级限流: 每秒100次(最常用)
分钟级限流: 每分钟1000次
小时级限流: 每小时10000次
【本次设计】
- 主要支持秒级限流
- 支持自定义时间窗口
Q3: 限流后如何处理?
拒绝服务: 返回429错误(简单直接)
排队等待: 请求进入队列,延迟处理
降级服务: 返回缓存或默认值
【本次设计】
- 默认拒绝服务
- 支持自定义降级策略
Q4: 性能要求?
QPS: 100万+(不能成为瓶颈)
延迟: < 1ms (P99)
可用性: 99.99%
【本次设计】
- 本地计数器优先(高性能)
- Redis集中限流(精准度高)
Q5: 限流规则如何配置?
硬编码: 写在代码里(不灵活)
配置文件: 写在配置中(需重启)
动态配置: 配置中心动态加载(推荐)
【本次设计】
- 支持配置中心动态下发
- 支持热更新,无需重启
非功能性需求
| 需求类型 | 具体要求 | 优先级 |
|---|---|---|
| 高性能 | 延迟 < 1ms,不影响业务 | P0 |
| 高可用 | 99.99% 可用性 | P0 |
| 精准性 | 误差 < 5% | P0 |
| 可扩展 | 支持水平扩展 | P1 |
| 易用性 | 简单易接入 | P1 |
| 可观测 | 完善的监控告警 | P1 |
容量估算
QPS 估算
【业务规模】
服务总数: 100 个微服务
单服务实例: 10 个
API接口数: 500 个
单实例QPS: 1000(业务处理能力)
集群总QPS: 100 * 10 * 1000 = 100 万
【限流器QPS】
每个请求需要调用限流器: 1次
限流器QPS: 100 万
【Redis QPS】
采用分布式限流(Redis)
单个Redis实例QPS: 10 万(SET/GET操作)
所需Redis实例: 100万 / 10万 = 10 台
实际部署: 20 台(考虑冗余和热点)
内存估算
【单机限流】
限流规则: 500 个接口 * 1KB = 500 KB
计数器: 500 个 * 100 Bytes = 50 KB
总内存: < 1 MB(可忽略)
【分布式限流(Redis)】
单个限流key:
- key: "ratelimit:user:12345:api:/order" = 50 Bytes
- value: 计数 = 8 Bytes
- TTL: 过期时间 = 8 Bytes
单条记录: 66 Bytes
活跃用户: 100 万
每用户限流key: 10 个(不同API)
总key数: 100万 * 10 = 1000 万
总内存: 1000万 * 66 Bytes = 660 MB
实际部署: 2 GB(考虑Redis内存碎片)
延迟估算
【单机限流】
本地内存操作: < 0.01ms(极快)
【分布式限流】
Redis GET操作: 0.5ms(局域网)
Redis Lua脚本: 1ms(原子操作)
【总延迟】
限流检查 + 业务逻辑: 1ms + 50ms = 51ms
限流占比: 1/51 ≈ 2%(可接受)
限流算法对比
算法1: 固定窗口计数器
原理
固定窗口计数器(Fixed Window Counter):
- 将时间划分为固定窗口(如1秒)
- 统计每个窗口内的请求数
- 超过阈值则拒绝请求
时间轴:
|------窗口1------|------窗口2------|------窗口3------|
0s 1s 2s 3s
请求数: 80 请求数: 120 请求数: 90
阈值: 100/秒
窗口2超过阈值,拒绝新请求
优缺点
优点:
1. 实现简单,易于理解
2. 内存占用少(只需一个计数器)
3. 性能高(O(1)时间复杂度)
缺点:
1. 临界问题(边界突刺)
- 0.5s-1s: 100次请求
- 1s-1.5s: 100次请求
- 0.5s-1.5s: 200次请求 (实际超限)
2. 窗口切换时计数器归零,可能瞬间放过大量请求
代码实现
package ratelimit
import (
"sync"
"time"
)
// FixedWindowLimiter 固定窗口限流器
type FixedWindowLimiter struct {
limit int64 // 限流阈值
window time.Duration // 时间窗口
counter int64 // 当前计数
lastTime time.Time // 窗口开始时间
mu sync.Mutex // 互斥锁
}
// NewFixedWindowLimiter 创建固定窗口限流器
func NewFixedWindowLimiter(limit int64, window time.Duration) *FixedWindowLimiter {
return &FixedWindowLimiter{
limit: limit,
window: window,
counter: 0,
lastTime: time.Now(),
}
}
// Allow 判断是否允许请求
func (l *FixedWindowLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// 窗口是否过期
if now.Sub(l.lastTime) >= l.window {
// 重置窗口
l.counter = 0
l.lastTime = now
}
// 判断是否超过限流阈值
if l.counter < l.limit {
l.counter++
return true
}
return false
}
// 使用示例
func ExampleFixedWindow() {
// 限制每秒100次请求
limiter := NewFixedWindowLimiter(100, time.Second)
// 模拟请求
for i := 0; i < 150; i++ {
if limiter.Allow() {
// 处理请求
println("Request", i, "allowed")
} else {
// 拒绝请求
println("Request", i, "rejected")
}
}
}
算法2: 滑动窗口计数器
原理
滑动窗口计数器(Sliding Window Counter):
- 将时间窗口细分为多个小格子
- 统计当前时间往前推一个窗口内的请求总数
- 解决固定窗口的临界问题
示例:窗口1秒,分为10个格子(每格100ms)
时间轴:
|---|---|---|---|---|---|---|---|---|---|
0 100 200 300 400 500 600 700 800 900 1000ms
10 15 20 25 10 8 5 3 2 2 (请求数)
当前时间: 650ms
滑动窗口: [0-650ms]的前10个格子
总请求数: 10+15+20+25+10+8 = 88
优缺点
优点:
1. 解决固定窗口的临界问题
2. 限流更加平滑
3. 精度可调(格子越多越精确)
缺点:
1. 内存占用较高(需存储多个格子)
2. 实现相对复杂
3. 需要定期清理过期格子
代码实现
package ratelimit
import (
"sync"
"time"
)
// SlidingWindowLimiter 滑动窗口限流器
type SlidingWindowLimiter struct {
limit int64 // 限流阈值
window time.Duration // 时间窗口
slots int // 窗口分片数
counters map[int64]int64 // 时间槽 -> 计数
mu sync.Mutex // 互斥锁
}
// NewSlidingWindowLimiter 创建滑动窗口限流器
func NewSlidingWindowLimiter(limit int64, window time.Duration, slots int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
limit: limit,
window: window,
slots: slots,
counters: make(map[int64]int64),
}
}
// Allow 判断是否允许请求
func (l *SlidingWindowLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now().UnixNano()
slotDuration := int64(l.window) / int64(l.slots)
// 计算当前时间槽
currentSlot := now / slotDuration
// 计算滑动窗口的起始时间槽
windowStart := currentSlot - int64(l.slots) + 1
// 清理过期的时间槽
for slot := range l.counters {
if slot < windowStart {
delete(l.counters, slot)
}
}
// 统计当前窗口内的请求总数
var total int64
for slot := windowStart; slot <= currentSlot; slot++ {
total += l.counters[slot]
}
// 判断是否超过限流阈值
if total < l.limit {
l.counters[currentSlot]++
return true
}
return false
}
// 使用示例
func ExampleSlidingWindow() {
// 限制每秒100次请求,窗口分为10个槽
limiter := NewSlidingWindowLimiter(100, time.Second, 10)
// 模拟请求
for i := 0; i < 150; i++ {
if limiter.Allow() {
println("Request", i, "allowed")
} else {
println("Request", i, "rejected")
}
time.Sleep(5 * time.Millisecond)
}
}
算法3: 令牌桶 (Token Bucket)
原理
令牌桶算法(Token Bucket):
- 系统以恒定速率往桶里放令牌
- 请求需要消耗令牌才能通过
- 桶满时令牌溢出(丢弃)
- 桶空时请求被拒绝
示例:
+------------------+
| 令牌桶 |
令牌生成器 ---> | 🪙🪙🪙🪙🪙 | 容量: 100
(10个/秒) | |
+------------------+
|
请求消耗令牌
(1个/次)
特点:
1. 允许一定程度的突发流量(桶内有余量)
2. 长期平均速率受令牌生成速率限制
优缺点
优点:
1. 支持突发流量(桶内令牌可一次性消耗)
2. 平滑限流(令牌匀速生成)
3. 实现简单,性能高
缺点:
1. 需要额外存储令牌数
2. 需要定时器生成令牌(或懒加载计算)
3. 桶容量需要合理设置
代码实现
package ratelimit
import (
"sync"
"time"
)
// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
capacity int64 // 桶容量
tokens int64 // 当前令牌数
rate int64 // 令牌生成速率(个/秒)
lastTime time.Time // 上次生成令牌时间
mu sync.Mutex // 互斥锁
}
// NewTokenBucketLimiter 创建令牌桶限流器
func NewTokenBucketLimiter(capacity int64, rate int64) *TokenBucketLimiter {
return &TokenBucketLimiter{
capacity: capacity,
tokens: capacity, // 初始令牌数等于容量
rate: rate,
lastTime: time.Now(),
}
}
// Allow 判断是否允许请求(消耗1个令牌)
func (l *TokenBucketLimiter) Allow() bool {
return l.AllowN(1)
}
// AllowN 判断是否允许请求(消耗n个令牌)
func (l *TokenBucketLimiter) AllowN(n int64) bool {
l.mu.Lock()
defer l.mu.Unlock()
// 计算当前应该有多少令牌
now := time.Now()
elapsed := now.Sub(l.lastTime)
// 计算新生成的令牌数
newTokens := int64(elapsed.Seconds() * float64(l.rate))
// 更新令牌数(不超过容量)
l.tokens = min(l.capacity, l.tokens+newTokens)
l.lastTime = now
// 判断令牌是否足够
if l.tokens >= n {
l.tokens -= n
return true
}
return false
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
// 使用示例
func ExampleTokenBucket() {
// 容量100,每秒生成50个令牌
limiter := NewTokenBucketLimiter(100, 50)
// 模拟突发请求(前100个直接通过)
for i := 0; i < 150; i++ {
if limiter.Allow() {
println("Request", i, "allowed")
} else {
println("Request", i, "rejected")
}
}
// 等待1秒,令牌恢复
time.Sleep(time.Second)
// 继续请求(恢复50个令牌)
for i := 150; i < 200; i++ {
if limiter.Allow() {
println("Request", i, "allowed")
} else {
println("Request", i, "rejected")
}
}
}
算法4: 漏桶 (Leaky Bucket)
原理
漏桶算法(Leaky Bucket):
- 请求进入漏桶
- 漏桶以恒定速率流出请求
- 桶满时新请求溢出(拒绝)
示例:
请求流入(不定速)
↓
+----------+
| 🌊🌊🌊🌊🌊 | 桶容量: 100
| 🌊🌊🌊 |
+----------+
↓
匀速流出(10个/秒)
特点:
1. 强制限制流出速率
2. 平滑处理突发流量
3. 类似消息队列
优缺点
优点:
1. 严格控制流出速率(绝对平滑)
2. 适合处理突发流量
3. 实现简单
缺点:
1. 无法处理突发流量(流出速率固定)
2. 请求需要排队(增加延迟)
3. 队列满时丢弃请求
代码实现
package ratelimit
import (
"context"
"sync"
"time"
)
// LeakyBucketLimiter 漏桶限流器
type LeakyBucketLimiter struct {
capacity int // 桶容量
rate time.Duration // 流出速率(每个请求的间隔时间)
queue chan struct{} // 请求队列
mu sync.Mutex // 互斥锁
}
// NewLeakyBucketLimiter 创建漏桶限流器
func NewLeakyBucketLimiter(capacity int, rate time.Duration) *LeakyBucketLimiter {
limiter := &LeakyBucketLimiter{
capacity: capacity,
rate: rate,
queue: make(chan struct{}, capacity),
}
// 启动漏水协程
go limiter.leak()
return limiter
}
// leak 漏水(以恒定速率流出)
func (l *LeakyBucketLimiter) leak() {
ticker := time.NewTicker(l.rate)
defer ticker.Stop()
for range ticker.C {
// 从队列中取出一个请求(流出)
select {
case <-l.queue:
// 请求流出
default:
// 队列为空,无需处理
}
}
}
// Allow 判断是否允许请求
func (l *LeakyBucketLimiter) Allow() bool {
select {
case l.queue <- struct{}{}:
// 成功加入队列
return true
default:
// 队列已满,拒绝请求
return false
}
}
// Wait 等待直到允许请求(带超时)
func (l *LeakyBucketLimiter) Wait(ctx context.Context) error {
select {
case l.queue <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// 使用示例
func ExampleLeakyBucket() {
// 容量100,每10ms流出一个请求(100个/秒)
limiter := NewLeakyBucketLimiter(100, 10*time.Millisecond)
// 模拟突发请求
for i := 0; i < 150; i++ {
if limiter.Allow() {
println("Request", i, "allowed (queued)")
} else {
println("Request", i, "rejected (bucket full)")
}
}
// 等待队列处理
time.Sleep(2 * time.Second)
}
算法对比总结
| 算法 | 突发流量 | 平滑度 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 固定窗口 | 简单场景,对精度要求不高 | |||
| 滑动窗口 | 需要精确限流,内存充足 | |||
| 令牌桶 | 推荐,兼顾性能和灵活性 | |||
| 漏桶 | 需要绝对平滑,可接受排队 |
推荐选择:
- API网关限流:令牌桶(允许突发,体验好)
- 数据库保护:漏桶(严格控流,保护后端)
- 单机限流:固定窗口(简单高效)
- 精确限流:滑动窗口(精度高)
API设计
核心接口
// RateLimiter 限流器接口
type RateLimiter interface {
// Allow 判断是否允许请求通过
// 返回值: true-允许, false-拒绝
Allow(ctx context.Context, resource string, identifier string) (bool, error)
// AllowN 判断是否允许n个请求通过
AllowN(ctx context.Context, resource string, identifier string, n int64) (bool, error)
// Wait 等待直到允许请求通过(阻塞)
// 返回值: 等待时间, error
Wait(ctx context.Context, resource string, identifier string) (time.Duration, error)
// Reserve 预约令牌(不阻塞,返回等待时间)
Reserve(ctx context.Context, resource string, identifier string) (*Reservation, error)
}
// Reservation 预约信息
type Reservation struct {
OK bool // 是否成功
Delay time.Duration // 需要等待的时间
CancelFunc func() // 取消预约
}
// RuleManager 规则管理器接口
type RuleManager interface {
// AddRule 添加限流规则
AddRule(rule *RateLimitRule) error
// UpdateRule 更新限流规则
UpdateRule(rule *RateLimitRule) error
// DeleteRule 删除限流规则
DeleteRule(resource string) error
// GetRule 获取限流规则
GetRule(resource string) (*RateLimitRule, error)
// ListRules 列出所有规则
ListRules() ([]*RateLimitRule, error)
}
RESTful API
# 限流检查API
POST /api/v1/ratelimit/check
Request:
resource: "/api/order/create" # 资源标识
identifier: "user:12345" # 用户标识
tokens: 1 # 消耗令牌数
Response:
allowed: true # 是否允许
remaining: 99 # 剩余配额
reset_time: "2025-01-01T00:00:01Z" # 重置时间
retry_after: 0 # 重试等待时间(秒)
# 规则管理API
POST /api/v1/ratelimit/rules
Request:
resource: "/api/order/create"
limit: 100 # 限流阈值
window: "1s" # 时间窗口
algorithm: "token_bucket" # 限流算法
burst: 150 # 突发容量
Response:
rule_id: "rule_123"
created_at: "2025-01-01T00:00:00Z"
GET /api/v1/ratelimit/rules/{resource}
Response:
resource: "/api/order/create"
limit: 100
window: "1s"
algorithm: "token_bucket"
burst: 150
enabled: true
PUT /api/v1/ratelimit/rules/{resource}
DELETE /api/v1/ratelimit/rules/{resource}
数据模型设计
限流规则表
-- 限流规则配置表
CREATE TABLE rate_limit_rules (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
resource VARCHAR(255) NOT NULL COMMENT '资源标识(API路径/业务标识)',
algorithm VARCHAR(50) NOT NULL COMMENT '限流算法(fixed_window/sliding_window/token_bucket/leaky_bucket)',
limit_count BIGINT NOT NULL COMMENT '限流阈值',
window_size INT NOT NULL COMMENT '时间窗口(秒)',
burst_size BIGINT DEFAULT 0 COMMENT '突发容量(令牌桶)',
-- 限流维度
dimension VARCHAR(50) NOT NULL COMMENT '限流维度(user/ip/api/tenant)',
-- 生效配置
enabled TINYINT(1) DEFAULT 1 COMMENT '是否启用',
priority INT DEFAULT 0 COMMENT '优先级(数字越大优先级越高)',
-- 降级配置
fallback_action VARCHAR(50) DEFAULT 'reject' COMMENT '降级动作(reject/queue/degrade)',
fallback_message TEXT COMMENT '降级提示信息',
-- 时间戳
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_by VARCHAR(100),
UNIQUE KEY uk_resource (resource),
KEY idx_enabled (enabled),
KEY idx_priority (priority)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='限流规则配置表';
-- 示例数据
INSERT INTO rate_limit_rules
(resource, algorithm, limit_count, window_size, burst_size, dimension)
VALUES
('/api/order/create', 'token_bucket', 100, 1, 150, 'user'),
('/api/sms/send', 'fixed_window', 10, 60, 0, 'user'),
('/api/login', 'sliding_window', 5, 60, 0, 'ip');
Redis数据结构
# 1. 固定窗口计数器
# Key格式: ratelimit:fixed:{resource}:{identifier}:{window_start}
# Value: 计数
# TTL: 窗口大小
redis> SET ratelimit:fixed:/api/order:user:12345:1704067200 85 EX 1
redis> INCR ratelimit:fixed:/api/order:user:12345:1704067200
(integer) 86
# 2. 滑动窗口计数器 (使用Sorted Set)
# Key格式: ratelimit:sliding:{resource}:{identifier}
# Member: 请求ID或时间戳
# Score: 请求时间戳(毫秒)
redis> ZADD ratelimit:sliding:/api/order:user:12345 1704067200123 "req_001"
redis> ZADD ratelimit:sliding:/api/order:user:12345 1704067200456 "req_002"
# 清理过期数据
redis> ZREMRANGEBYSCORE ratelimit:sliding:/api/order:user:12345 0 1704067199999
# 统计当前窗口请求数
redis> ZCOUNT ratelimit:sliding:/api/order:user:12345 1704067200000 1704067201000
# 3. 令牌桶 (使用Hash)
# Key格式: ratelimit:token:{resource}:{identifier}
# Fields: tokens(当前令牌数), last_time(上次更新时间)
redis> HSET ratelimit:token:/api/order:user:12345 tokens 95 last_time 1704067200
redis> HGETALL ratelimit:token:/api/order:user:12345
1) "tokens"
2) "95"
3) "last_time"
4) "1704067200"
# 4. 规则缓存 (使用Hash)
# Key格式: ratelimit:rule:{resource}
redis> HSET ratelimit:rule:/api/order algorithm token_bucket limit 100 window 1 burst 150
架构设计
V1: 单机限流(本地内存)
架构图
┌─────────────────────────────────────────────┐
│ 用户请求 │
└──────────────────┬──────────────────────────┘
↓
┌──────────────────────┐
│ API Gateway │
│ ┌────────────────┐ │
│ │ Rate Limiter │ │ (内存计数器)
│ │ - Token Bucket │ │
│ │ - Local Cache │ │
│ └────────────────┘ │
└──────────────────────┘
↓
┌──────────────────────┐
│ Backend Services │
└──────────────────────┘
优点:
实现简单,性能极高(< 0.01ms)
无外部依赖
缺点:
单机限流,无法全局协调
水平扩展时限流不准确
(10台服务器,每台限100 = 总限1000,实际需要100)
代码实现
package ratelimit
import (
"context"
"sync"
"time"
)
// LocalRateLimiter 本地限流器(单机版)
type LocalRateLimiter struct {
limiters map[string]*TokenBucketLimiter
rules map[string]*RateLimitRule
mu sync.RWMutex
}
// RateLimitRule 限流规则
type RateLimitRule struct {
Resource string // 资源标识
Algorithm string // 算法类型
Limit int64 // 限流阈值
Window time.Duration // 时间窗口
Burst int64 // 突发容量
}
func NewLocalRateLimiter() *LocalRateLimiter {
return &LocalRateLimiter{
limiters: make(map[string]*TokenBucketLimiter),
rules: make(map[string]*RateLimitRule),
}
}
// AddRule 添加限流规则
func (l *LocalRateLimiter) AddRule(rule *RateLimitRule) {
l.mu.Lock()
defer l.mu.Unlock()
// 创建限流器
limiter := NewTokenBucketLimiter(rule.Burst, rule.Limit)
l.rules[rule.Resource] = rule
l.limiters[rule.Resource] = limiter
}
// Allow 判断是否允许请求
func (l *LocalRateLimiter) Allow(ctx context.Context, resource string, identifier string) (bool, error) {
l.mu.RLock()
limiter, exists := l.limiters[resource]
l.mu.RUnlock()
if !exists {
// 没有配置规则,默认允许
return true, nil
}
// 组合key: resource + identifier
// (简化实现,生产环境需要分别存储)
return limiter.Allow(), nil
}
// 使用示例
func ExampleLocalRateLimiter() {
limiter := NewLocalRateLimiter()
// 添加规则:订单API每秒100次
limiter.AddRule(&RateLimitRule{
Resource: "/api/order/create",
Algorithm: "token_bucket",
Limit: 100,
Window: time.Second,
Burst: 150,
})
ctx := context.Background()
// 模拟请求
for i := 0; i < 200; i++ {
allowed, _ := limiter.Allow(ctx, "/api/order/create", "user:12345")
if allowed {
println("Request", i, "allowed")
} else {
println("Request", i, "rejected")
}
}
}
V2: 分布式限流(Redis)
架构图
┌─────────────────────────────────────────────────────────┐
│ 用户请求 │
└────────────┬──────────────────┬─────────────────────────┘
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ API Gateway 1 │ │ API Gateway 2 │
│ ┌────────────┐ │ │ ┌────────────┐ │
│ │Rate Limiter│ │ │ │Rate Limiter│ │
│ └──────┬─────┘ │ │ └──────┬─────┘ │
└─────────┼────────┘ └─────────┼────────┘
│ │
└──────────┬──────────┘
↓
┌─────────────────────┐
│ Redis Cluster │
│ ┌───────────────┐ │
│ │ Lua Scripts │ │ (原子操作)
│ │ - Token Bucket│ │
│ │ - Sliding Win │ │
│ └───────────────┘ │
└─────────────────────┘
↓
┌─────────────────────┐
│ Backend Services │
└─────────────────────┘
优点:
全局限流,多实例共享配额
限流准确
支持动态配置
缺点:
依赖Redis(单点风险)
网络开销(~1ms延迟)
Redis故障影响限流功能
核心实现:Redis + Lua脚本
package ratelimit
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
)
// RedisRateLimiter Redis分布式限流器
type RedisRateLimiter struct {
client *redis.Client
script *redis.Script // Lua脚本
}
// Lua脚本:令牌桶算法
const tokenBucketScript = `
-- KEYS[1]: 限流key
-- ARGV[1]: 令牌生成速率(个/秒)
-- ARGV[2]: 桶容量
-- ARGV[3]: 当前时间戳(秒)
-- ARGV[4]: 消耗令牌数
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- 获取上次更新时间和令牌数
local last_time = tonumber(redis.call('HGET', KEYS[1], 'last_time') or now)
local tokens = tonumber(redis.call('HGET', KEYS[1], 'tokens') or capacity)
-- 计算新生成的令牌数
local elapsed = math.max(0, now - last_time)
local new_tokens = math.min(capacity, tokens + elapsed * rate)
-- 判断令牌是否足够
if new_tokens >= requested then
-- 扣减令牌
new_tokens = new_tokens - requested
-- 更新Redis
redis.call('HSET', KEYS[1], 'tokens', new_tokens)
redis.call('HSET', KEYS[1], 'last_time', now)
redis.call('EXPIRE', KEYS[1], 3600) -- 1小时过期
return {1, new_tokens} -- 允许,返回剩余令牌
else
return {0, new_tokens} -- 拒绝,返回当前令牌
end
`
func NewRedisRateLimiter(client *redis.Client) *RedisRateLimiter {
return &RedisRateLimiter{
client: client,
script: redis.NewScript(tokenBucketScript),
}
}
// Allow 判断是否允许请求
func (r *RedisRateLimiter) Allow(ctx context.Context, resource string, identifier string, rate int64, capacity int64) (bool, error) {
// 构造限流key
key := "ratelimit:token:" + resource + ":" + identifier
now := time.Now().Unix()
// 执行Lua脚本(原子操作)
result, err := r.script.Run(ctx, r.client, []string{key}, rate, capacity, now, 1).Result()
if err != nil {
return false, err
}
// 解析结果
values, ok := result.([]interface{})
if !ok || len(values) != 2 {
return false, errors.New("invalid script result")
}
allowed := values[0].(int64) == 1
return allowed, nil
}
// AllowN 判断是否允许n个请求
func (r *RedisRateLimiter) AllowN(ctx context.Context, resource string, identifier string, rate int64, capacity int64, n int64) (bool, error) {
key := "ratelimit:token:" + resource + ":" + identifier
now := time.Now().Unix()
result, err := r.script.Run(ctx, r.client, []string{key}, rate, capacity, now, n).Result()
if err != nil {
return false, err
}
values, ok := result.([]interface{})
if !ok || len(values) != 2 {
return false, errors.New("invalid script result")
}
allowed := values[0].(int64) == 1
return allowed, nil
}
// 使用示例
func ExampleRedisRateLimiter() {
// 连接Redis
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
limiter := NewRedisRateLimiter(client)
ctx := context.Background()
// 限流参数:每秒100个令牌,桶容量150
rate := int64(100)
capacity := int64(150)
// 模拟请求
for i := 0; i < 200; i++ {
allowed, err := limiter.Allow(ctx, "/api/order/create", "user:12345", rate, capacity)
if err != nil {
println("Error:", err.Error())
continue
}
if allowed {
println("Request", i, "allowed")
} else {
println("Request", i, "rejected")
}
}
}
滑动窗口 Lua 脚本
-- 滑动窗口限流(基于Sorted Set)
-- KEYS[1]: 限流key
-- ARGV[1]: 时间窗口(秒)
-- ARGV[2]: 限流阈值
-- ARGV[3]: 当前时间戳(毫秒)
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_start = now - window * 1000
-- 清理过期数据
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, window_start)
-- 统计当前窗口请求数
local count = redis.call('ZCARD', KEYS[1])
if count < limit then
-- 添加当前请求
redis.call('ZADD', KEYS[1], now, now)
redis.call('EXPIRE', KEYS[1], window)
return {1, limit - count - 1} -- 允许,返回剩余配额
else
return {0, 0} -- 拒绝
end
V3: 多级限流(本地+Redis)
架构图
┌─────────────────────────────────────────────────────────┐
│ 用户请求 │
└────────────┬──────────────────┬─────────────────────────┘
↓ ↓
┌─────────────────────┐ ┌─────────────────────┐
│ API Gateway 1 │ │ API Gateway 2 │
│ │ │ │
│ ┌───────────────┐ │ │ ┌───────────────┐ │
│ │ L1: 本地限流 │ │ │ │ L1: 本地限流 │ │ ← 快速拦截
│ │ (粗粒度) │ │ │ │ (粗粒度) │ │ (< 0.01ms)
│ └───────┬───────┘ │ │ └───────┬───────┘ │
│ ↓ │ │ ↓ │
│ ┌───────────────┐ │ │ ┌───────────────┐ │
│ │ L2: Redis限流 │ │ │ │ L2: Redis限流 │ │ ← 精准限流
│ │ (细粒度) │ │ │ │ (细粒度) │ │ (~1ms)
│ └───────┬───────┘ │ │ └───────┬───────┘ │
└──────────┼──────────┘ └──────────┼──────────┘
│ │
└───────────┬────────────┘
↓
┌─────────────────────┐
│ Redis Cluster │
└─────────────────────┘
↓
┌─────────────────────┐
│ Backend Services │
└─────────────────────┘
限流策略:
1. L1本地限流: 限制单机QPS(如总限100,10台机器每台限15)
2. L2 Redis限流: 全局精准限流(总限100)
优点:
性能高(本地快速拦截)
精度高(Redis全局限流)
降低Redis压力(本地拦截大部分请求)
Redis故障时仍有本地限流保护
缺点:
⚠️ 配置复杂(需要合理分配本地和全局配额)
⚠️ 一致性略弱(本地计数有延迟)
代码实现
package ratelimit
import (
"context"
"time"
)
// MultiLevelRateLimiter 多级限流器
type MultiLevelRateLimiter struct {
local *LocalRateLimiter
redis *RedisRateLimiter
config *MultiLevelConfig
}
// MultiLevelConfig 多级限流配置
type MultiLevelConfig struct {
// 本地限流配置
LocalLimit int64 // 本地限流阈值
LocalCapacity int64 // 本地突发容量
// Redis限流配置
RedisLimit int64 // 全局限流阈值
RedisCapacity int64 // 全局突发容量
// 策略
LocalFirst bool // 优先本地限流
RedisRequired bool // Redis必须检查
}
func NewMultiLevelRateLimiter(local *LocalRateLimiter, redis *RedisRateLimiter, config *MultiLevelConfig) *MultiLevelRateLimiter {
return &MultiLevelRateLimiter{
local: local,
redis: redis,
config: config,
}
}
// Allow 判断是否允许请求
func (m *MultiLevelRateLimiter) Allow(ctx context.Context, resource string, identifier string) (bool, error) {
// 第一级:本地限流(快速拦截)
localAllowed, err := m.local.Allow(ctx, resource, identifier)
if err != nil {
return false, err
}
if !localAllowed {
// 本地限流拦截
return false, nil
}
// 第二级:Redis限流(精准控制)
if m.config.RedisRequired {
redisAllowed, err := m.redis.Allow(ctx, resource, identifier, m.config.RedisLimit, m.config.RedisCapacity)
if err != nil {
// Redis故障,降级到仅本地限流
return localAllowed, nil
}
return redisAllowed, nil
}
return true, nil
}
// 使用示例
func ExampleMultiLevelRateLimiter() {
// 初始化本地限流器
local := NewLocalRateLimiter()
local.AddRule(&RateLimitRule{
Resource: "/api/order/create",
Algorithm: "token_bucket",
Limit: 15, // 单机限15(假设10台机器,总限100,每台分配10+冗余5)
Window: time.Second,
Burst: 20,
})
// 初始化Redis限流器
redis := NewRedisRateLimiter(nil) // 假设已连接
// 配置多级限流
config := &MultiLevelConfig{
LocalLimit: 15,
LocalCapacity: 20,
RedisLimit: 100,
RedisCapacity: 150,
LocalFirst: true,
RedisRequired: true,
}
limiter := NewMultiLevelRateLimiter(local, redis, config)
ctx := context.Background()
// 模拟请求
for i := 0; i < 200; i++ {
allowed, err := limiter.Allow(ctx, "/api/order/create", "user:12345")
if err != nil {
println("Error:", err.Error())
continue
}
if allowed {
println("Request", i, "allowed")
} else {
println("Request", i, "rejected")
}
}
}
核心算法与实现
完整限流系统实现
package ratelimit
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// 完整的限流系统实现
type RateLimitSystem struct {
ruleManager *RuleManager
localLimiter *LocalRateLimiter
redisLimiter *RedisRateLimiter
config *SystemConfig
mu sync.RWMutex
}
type SystemConfig struct {
Mode string // "local", "redis", "hybrid"
EnableMetrics bool // 是否启用指标
EnableLog bool // 是否启用日志
}
// Check 检查限流
func (s *RateLimitSystem) Check(ctx context.Context, resource string, identifier string) (*CheckResult, error) {
// 获取规则
rule, err := s.ruleManager.GetRule(resource)
if err != nil || rule == nil {
// 没有规则,默认允许
return &CheckResult{Allowed: true}, nil
}
if !rule.Enabled {
return &CheckResult{Allowed: true, Reason: "rule disabled"}, nil
}
start := time.Now()
var allowed bool
var remaining int64
// 根据模式选择限流器
switch s.config.Mode {
case "local":
allowed, err = s.localLimiter.Allow(ctx, resource, identifier)
case "redis":
allowed, err = s.redisLimiter.Allow(ctx, resource, identifier, rule.Limit, rule.Burst)
case "hybrid":
// 多级限流
localAllowed, _ := s.local Limiter.Allow(ctx, resource, identifier)
if !localAllowed {
allowed = false
} else {
allowed, err = s.redisLimiter.Allow(ctx, resource, identifier, rule.Limit, rule.Burst)
}
}
latency := time.Since(start)
// 记录指标
if s.config.EnableMetrics {
s.recordMetrics(resource, allowed, latency)
}
result := &CheckResult{
Allowed: allowed,
Remaining: remaining,
ResetTime: time.Now().Add(rule.Window),
Latency: latency,
}
if !allowed {
result.Reason = "rate limit exceeded"
result.RetryAfter = rule.Window
}
return result, err
}
type CheckResult struct {
Allowed bool // 是否允许
Remaining int64 // 剩余配额
ResetTime time.Time // 重置时间
RetryAfter time.Duration // 重试等待时间
Reason string // 原因
Latency time.Duration // 延迟
}
// 指标记录
func (s *RateLimitSystem) recordMetrics(resource string, allowed bool, latency time.Duration) {
// 使用Prometheus记录指标
// rateLimitRequestsTotal.WithLabelValues(resource, fmt.Sprint(allowed)).Inc()
// rateLimitLatency.WithLabelValues(resource).Observe(latency.Seconds())
}
中间件集成
HTTP中间件
package middleware
import (
"net/http"
"ratelimit"
)
// RateLimitMiddleware HTTP限流中间件
func RateLimitMiddleware(limiter *ratelimit.RateLimitSystem) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 获取资源标识(API路径)
resource := r.URL.Path
// 获取用户标识(从请求中提取)
identifier := extractIdentifier(r) // 如: user ID 或 IP
// 检查限流
result, err := limiter.Check(ctx, resource, identifier)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if !result.Allowed {
// 限流触发,返回429
w.Header().Set("X-RateLimit-Limit", "100")
w.Header().Set("X-RateLimit-Remaining", "0")
w.Header().Set("X-RateLimit-Reset", result.ResetTime.Format(time.RFC3339))
w.Header().Set("Retry-After", fmt.Sprintf("%d", int(result.RetryAfter.Seconds())))
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
// 添加响应头
w.Header().Set("X-RateLimit-Limit", "100")
w.Header().Set("X-RateLimit-Remaining", fmt.Sprintf("%d", result.Remaining))
// 继续处理请求
next.ServeHTTP(w, r)
})
}
}
// extractIdentifier 提取用户标识
func extractIdentifier(r *http.Request) string {
// 优先使用用户ID
if userID := r.Header.Get("X-User-ID"); userID != "" {
return "user:" + userID
}
// 其次使用IP
ip := r.Header.Get("X-Real-IP")
if ip == "" {
ip = r.Header.Get("X-Forwarded-For")
}
if ip == "" {
ip = r.RemoteAddr
}
return "ip:" + ip
}
// 使用示例
func main() {
// 初始化限流系统
limiter := ratelimit.NewRateLimitSystem(config)
// 创建HTTP服务
mux := http.NewServeMux()
mux.HandleFunc("/api/order/create", handleOrderCreate)
// 应用限流中间件
handler := RateLimitMiddleware(limiter)(mux)
http.ListenAndServe(":8080", handler)
}
gRPC拦截器
package interceptor
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"ratelimit"
)
// RateLimitInterceptor gRPC限流拦截器
func RateLimitInterceptor(limiter *ratelimit.RateLimitSystem) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 获取方法名作为资源标识
resource := info.FullMethod
// 从metadata获取用户标识
identifier := extractUserFromMetadata(ctx)
// 检查限流
result, err := limiter.Check(ctx, resource, identifier)
if err != nil {
return nil, status.Error(codes.Internal, "rate limit check failed")
}
if !result.Allowed {
// 设置响应元数据
md := metadata.Pairs(
"x-ratelimit-limit", "100",
"x-ratelimit-remaining", "0",
"x-ratelimit-reset", result.ResetTime.Format(time.RFC3339),
)
grpc.SetTrailer(ctx, md)
return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
}
// 继续处理请求
return handler(ctx, req)
}
}
func extractUserFromMetadata(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "unknown"
}
if userIDs := md.Get("user-id"); len(userIDs) > 0 {
return "user:" + userIDs[0]
}
return "unknown"
}
优化方案
优化1: 热点数据优化
问题:部分热点用户/API导致Redis压力大
解决方案:
// HotKeyOptimizer 热点key优化器
type HotKeyOptimizer struct {
localCache *sync.Map // 本地缓存热点数据
hotKeyStats map[string]int64
threshold int64 // 热点阈值
mu sync.RWMutex
}
// 检测热点key
func (h *HotKeyOptimizer) isHotKey(key string) bool {
h.mu.RLock()
count := h.hotKeyStats[key]
h.mu.RUnlock()
return count > h.threshold
}
// 优化后的限流检查
func (s *RateLimitSystem) CheckWithHotKeyOptimization(ctx context.Context, resource string, identifier string) (*CheckResult, error) {
key := resource + ":" + identifier
// 检测是否为热点key
if s.hotKeyOptimizer.isHotKey(key) {
// 使用本地缓存
if cachedResult, ok := s.hotKeyOptimizer.localCache.Load(key); ok {
return cachedResult.(*CheckResult), nil
}
}
// 正常限流检查
result, err := s.Check(ctx, resource, identifier)
// 缓存热点key结果
if s.hotKeyOptimizer.isHotKey(key) {
s.hotKeyOptimizer.localCache.Store(key, result)
}
return result, err
}
效果对比:
| 方案 | 延迟 | Redis QPS | 准确度 |
|---|---|---|---|
| 纯Redis | ~1ms | 100万 | 100% |
| 热点优化 | ~0.1ms | 20万 | 95% |
优化2: 批量检查
问题:单个请求需要检查多个限流规则,延迟高
解决方案:
// BatchCheck 批量检查限流
func (s *RateLimitSystem) BatchCheck(ctx context.Context, checks []CheckRequest) ([]CheckResult, error) {
results := make([]CheckResult, len(checks))
// 使用Pipeline批量执行Redis命令
pipe := s.redisLimiter.client.Pipeline()
for i, check := range checks {
key := "ratelimit:token:" + check.Resource + ":" + check.Identifier
// 添加到Pipeline
pipe.HGetAll(ctx, key)
}
// 执行Pipeline
cmds, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}
// 解析结果
for i, cmd := range cmds {
// 处理每个命令的结果
// ...
results[i] = CheckResult{Allowed: true}
}
return results, nil
}
效果对比:
| 方案 | 总延迟 | 吞吐量 |
|---|---|---|
| 逐个检查 (10个规则) | ~10ms | 1000 QPS |
| 批量检查 (10个规则) | ~2ms | 5000 QPS |
优化3: 预热机制
问题:冷启动时大量请求同时通过,压垮后端
解决方案:
// WarmupConfig 预热配置
type WarmupConfig struct {
Duration time.Duration // 预热时长
InitialLimit int64 // 初始限流值
TargetLimit int64 // 目标限流值
}
// CalculateLimit 计算预热期间的限流值
func (w *WarmupConfig) CalculateLimit(elapsedTime time.Duration) int64 {
if elapsedTime >= w.Duration {
return w.TargetLimit
}
// 线性增长
ratio := float64(elapsedTime) / float64(w.Duration)
currentLimit := float64(w.InitialLimit) + ratio*float64(w.TargetLimit-w.InitialLimit)
return int64(currentLimit)
}
// 使用示例
func ExampleWarmup() {
config := &WarmupConfig{
Duration: 5 * time.Minute, // 5分钟预热
InitialLimit: 10, // 初始10 QPS
TargetLimit: 100, // 目标100 QPS
}
startTime := time.Now()
// 每秒计算当前限流值
for i := 0; i < 300; i++ {
elapsed := time.Duration(i) * time.Second
currentLimit := config.CalculateLimit(elapsed)
println("Time:", elapsed, "Limit:", currentLimit)
time.Sleep(time.Second)
}
}
预热曲线:
Limit
100 │ ┌─────────
│ ┌───┘
50 │ ┌───┘
│ ┌───┘
10 │──────────────┘
└─────────────────────────────────── Time
0s 5min 10min
优化4: 降级熔断
// CircuitBreaker 熔断器
type CircuitBreaker struct {
maxFailures int // 最大失败次数
resetTimeout time.Duration // 重置超时
failures int // 当前失败次数
lastFailTime time.Time // 最后失败时间
state string // 状态: "closed", "open", "half-open"
mu sync.Mutex
}
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: "closed",
}
}
// Call 执行调用(带熔断)
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
// 检查状态
if cb.state == "open" {
// 检查是否可以进入半开状态
if time.Since(cb.lastFailTime) > cb.resetTimeout {
cb.state = "half-open"
} else {
return fmt.Errorf("circuit breaker is open")
}
}
// 执行调用
err := fn()
if err != nil {
// 记录失败
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = "open"
}
return err
}
// 成功,重置计数器
cb.failures = 0
cb.state = "closed"
return nil
}
// 集成到限流系统
func (s *RateLimitSystem) CheckWithCircuitBreaker(ctx context.Context, resource string, identifier string) (*CheckResult, error) {
var result *CheckResult
var err error
// 使用熔断器保护Redis调用
cbErr := s.circuitBreaker.Call(func() error {
result, err = s.redisLimiter.Allow(ctx, resource, identifier, 100, 150)
return err
})
if cbErr != nil {
// 熔断器打开,降级到本地限流
result, err = s.localLimiter.Allow(ctx, resource, identifier)
}
return result, err
}
监控告警
核心指标
业务指标:
- rate_limit_requests_total: 限流检查总数(按资源、结果分类)
- rate_limit_rejected_total: 限流拒绝总数
- rate_limit_pass_rate: 限流通过率
性能指标:
- rate_limit_latency: 限流检查延迟(P50/P95/P99)
- redis_operations_total: Redis操作总数
- local_cache_hit_rate: 本地缓存命中率
资源指标:
- rate_limit_rules_count: 限流规则数量
- rate_limit_hot_keys_count: 热点key数量
- circuit_breaker_state: 熔断器状态
Prometheus指标定义
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// 限流请求总数
RateLimitRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rate_limit_requests_total",
Help: "Total number of rate limit checks",
},
[]string{"resource", "allowed"},
)
// 限流延迟
RateLimitLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rate_limit_latency_seconds",
Help: "Latency of rate limit checks",
Buckets: prometheus.DefBuckets,
},
[]string{"resource"},
)
// Redis操作总数
RedisOperationsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "redis_operations_total",
Help: "Total number of Redis operations",
},
[]string{"operation", "status"},
)
// 本地缓存命中率
LocalCacheHits = promauto.NewCounter(
prometheus.CounterOpts{
Name: "local_cache_hits_total",
Help: "Total number of local cache hits",
},
)
LocalCacheMisses = promauto.NewCounter(
prometheus.CounterOpts{
Name: "local_cache_misses_total",
Help: "Total number of local cache misses",
},
)
// 熔断器状态
CircuitBreakerState = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "circuit_breaker_state",
Help: "Circuit breaker state (0=closed, 1=open, 2=half-open)",
},
[]string{"resource"},
)
)
// 记录限流检查
func RecordRateLimitCheck(resource string, allowed bool, latency time.Duration) {
RateLimitRequestsTotal.WithLabelValues(resource, fmt.Sprint(allowed)).Inc()
RateLimitLatency.WithLabelValues(resource).Observe(latency.Seconds())
}
Grafana Dashboard
{
"dashboard": {
"title": "Rate Limit Monitoring",
"panels": [
{
"title": "Requests Per Second",
"targets": [
{
"expr": "rate(rate_limit_requests_total[1m])"
}
]
},
{
"title": "Pass Rate",
"targets": [
{
"expr": "rate(rate_limit_requests_total{allowed=\"true\"}[1m]) / rate(rate_limit_requests_total[1m])"
}
]
},
{
"title": "P99 Latency",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(rate_limit_latency_seconds_bucket[5m]))"
}
]
},
{
"title": "Top 10 Limited APIs",
"targets": [
{
"expr": "topk(10, rate(rate_limit_requests_total{allowed=\"false\"}[5m]))"
}
]
}
]
}
}
告警规则
groups:
- name: rate_limit_alerts
rules:
# 限流拒绝率过高
- alert: HighRejectionRate
expr: |
(
rate(rate_limit_requests_total{allowed="false"}[5m])
/
rate(rate_limit_requests_total[5m])
) > 0.50
for: 5m
labels:
severity: warning
annotations:
summary: "Rate limit rejection rate is high"
description: "{{ $labels.resource }} rejection rate is {{ $value | humanizePercentage }}"
# 限流延迟过高
- alert: HighLatency
expr: |
histogram_quantile(0.99, rate(rate_limit_latency_seconds_bucket[5m])) > 0.010
for: 5m
labels:
severity: warning
annotations:
summary: "Rate limit latency is high"
description: "P99 latency is {{ $value }}s"
# Redis连接失败
- alert: RedisConnectionFailed
expr: |
rate(redis_operations_total{status="error"}[5m]) > 0.01
for: 2m
labels:
severity: critical
annotations:
summary: "Redis connection is failing"
description: "Redis error rate is {{ $value }}/s"
# 熔断器打开
- alert: CircuitBreakerOpen
expr: circuit_breaker_state == 1
for: 1m
labels:
severity: critical
annotations:
summary: "Circuit breaker is open"
description: "{{ $labels.resource }} circuit breaker is open"
# 热点key数量过多
- alert: TooManyHotKeys
expr: rate_limit_hot_keys_count > 1000
for: 10m
labels:
severity: warning
annotations:
summary: "Too many hot keys"
description: "Hot keys count is {{ $value }}"
面试问答
令牌桶和漏桶的区别是什么?应该选择哪个?
答案:
| 对比维度 | 令牌桶 | 漏桶 |
|---|---|---|
| 流出速率 | 可变(有令牌就放行) | 固定(匀速流出) |
| 突发流量 | 支持(桶内令牌可一次性用完) | 不支持(严格限速) |
| 实现复杂度 | 简单(懒加载计算令牌) | 中等(需要队列) |
| 适用场景 | API网关、用户限流 | 保护后端、流量整形 |
选择建议:
- API网关、面向用户的服务:令牌桶(允许突发,用户体验好)
- 保护数据库、消息队列等后端资源:漏桶(严格控流,保护后端)
代码示例:
// 令牌桶:允许突发
limiter := NewTokenBucketLimiter(100, 50) // 容量100,每秒生成50个
// 前100个请求瞬间通过(消耗桶内令牌)
// 漏桶:严格限速
limiter := NewLeakyBucketLimiter(100, 10*time.Millisecond) // 每10ms流出1个
// 请求匀速流出,绝对平滑
如何解决分布式限流的一致性问题?
答案:
问题场景:
- 10台服务器,总限流100 QPS
- 每台服务器单独限流10 QPS
- 问题:流量不均,可能导致总QPS超过100
解决方案:
方案1: Redis集中限流(推荐)
// 所有服务器共享Redis计数器
func (r *RedisRateLimiter) Allow(ctx context.Context, key string) bool {
// Lua脚本保证原子性
return redis.Eval(`
local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], 1)
end
return count <= 100
`, key)
}
优点:
- 全局精准限流
- 实现简单
缺点:
- 依赖Redis
- 网络延迟(~1ms)
方案2: 多级限流(本地+Redis)
L1: 本地限流(每台15 QPS,冗余50%)
L2: Redis限流(全局100 QPS)
流程:
1. 本地限流快速拦截(< 0.01ms)
2. 通过本地限流后,再检查Redis
3. Redis故障时降级到本地限流
效果对比:
| 方案 | 延迟 | 准确度 | 可用性 |
|---|---|---|---|
| 纯本地 | < 0.01ms | 60% | 99.99% |
| 纯Redis | ~1ms | 100% | 99.9% |
| 多级限流 | ~0.5ms | 95% | 99.99% |
固定窗口的临界问题如何解决?
答案:
临界问题:
窗口1: 00:00:00 - 00:00:59 (100个请求)
窗口2: 00:01:00 - 00:01:59 (100个请求)
问题:00:00:30 - 00:01:30 实际200个请求
解决方案1: 滑动窗口
// 使用Redis Sorted Set
func (l *SlidingWindowLimiter) Allow() bool {
now := time.Now().UnixMilli()
windowStart := now - 1000 // 1秒窗口
// 清理过期数据
redis.ZRemRangeByScore(key, 0, windowStart)
// 统计当前窗口请求数
count := redis.ZCard(key)
if count < limit {
redis.ZAdd(key, now, now)
return true
}
return false
}
解决方案2: 滑动日志
// 记录每个请求的时间戳
type SlidingLogLimiter struct {
requests []time.Time // 请求时间戳列表
}
func (l *SlidingLogLimiter) Allow() bool {
now := time.Now()
// 清理1秒前的请求
cutoff := now.Add(-time.Second)
l.requests = filterAfter(l.requests, cutoff)
if len(l.requests) < limit {
l.requests = append(l.requests, now)
return true
}
return false
}
权衡:
- 滑动窗口:精度高,但内存占用大
- 固定窗口+冗余:简单,限流值设为80(冗余20%)
Redis限流的Lua脚本为什么必须用?不用会怎样?
答案:
不用Lua的问题:
// 错误示例:非原子操作
func (r *RedisRateLimiter) Allow() bool {
count, _ := redis.Get(key) // 1. 读
if count < limit {
redis.Incr(key) // 2. 写
return true
}
return false
}
// 问题:并发时出现竞态条件
// 时刻1: 线程A读到count=99
// 时刻2: 线程B读到count=99
// 时刻3: 线程A写count=100
// 时刻4: 线程B写count=100 (超限!)
使用Lua的好处:
-- 正确示例:原子操作
local count = redis.call('GET', KEYS[1]) or 0
if tonumber(count) < tonumber(ARGV[1]) then
redis.call('INCR', KEYS[1])
return 1
else
return 0
end
原子性保证:
- Lua脚本在Redis中单线程执行
- 执行期间不会被其他命令打断
- 读-判断-写三步合一,绝对原子
性能对比:
| 方案 | 网络往返 | 竞态条件 | 性能 |
|---|---|---|---|
| 不用Lua | 2次(GET + INCR) | 有 | 慢 |
| 用Lua | 1次(EVAL) | 无 | 快 |
热点key怎么处理?比如秒杀场景
答案:
问题:
秒杀场景:
- 商品ID: item_12345
- 100万用户同时抢购
- Redis单key压力:100万 QPS
瓶颈:
1. Redis单key吞吐有限(~10万QPS)
2. 网络带宽打满
3. CPU单核打满(Redis单线程)
解决方案:
方案1: 本地缓存热点数据
type HotKeyCache struct {
local *sync.Map // 本地缓存
redis *redis.Client
stats map[string]int64 // 访问统计
}
func (h *HotKeyCache) Get(key string) (int64, error) {
// 检测热点
if h.isHot(key) {
// 使用本地缓存
if val, ok := h.local.Load(key); ok {
return val.(int64), nil
}
}
// 从Redis获取
val, err := h.redis.Get(ctx, key).Int64()
if err != nil {
return 0, err
}
// 缓存热点key
if h.isHot(key) {
h.local.Store(key, val)
}
return val, nil
}
方案2: key分片
// 将热点key分散到多个key
func (r *RedisRateLimiter) AllowWithSharding(key string, shards int) bool {
// 随机选择一个分片
shard := rand.Intn(shards)
shardKey := fmt.Sprintf("%s:shard:%d", key, shard)
// 限流值也要分片
limitPerShard := limit / shards
return r.Allow(shardKey, limitPerShard)
}
// 示例:
// 原始:item_12345 (100万QPS)
// 分片:item_12345:shard:0 (10万QPS)
// item_12345:shard:1 (10万QPS)
// ...
// item_12345:shard:9 (10万QPS)
效果对比:
| 方案 | Redis压力 | 延迟 | 准确度 |
|---|---|---|---|
| 无优化 | 100万QPS | 5ms | 100% |
| 本地缓存 | 10万QPS | 0.1ms | 90% |
| key分片 | 10万QPS | 1ms | 95% |
如何防止限流被绕过?
答案:
攻击场景:
- 修改User-Agent绕过识别
- 使用代理IP池
- 分布式爬虫
- 恶意API调用
防护方案:
1. 多维度限流
// 同时限制多个维度
type MultiDimensionLimiter struct {
userLimiter *RateLimiter // 用户维度: 100/s
ipLimiter *RateLimiter // IP维度: 1000/s
apiLimiter *RateLimiter // API维度: 10000/s
}
func (m *MultiDimensionLimiter) Allow(user string, ip string, api string) bool {
// 任一维度触发限流则拒绝
return m.userLimiter.Allow(user) &&
m.ipLimiter.Allow(ip) &&
m.apiLimiter.Allow(api)
}
2. 设备指纹
// 生成设备唯一标识
func generateDeviceFingerprint(r *http.Request) string {
// 组合多个因子
factors := []string{
r.Header.Get("User-Agent"),
r.Header.Get("Accept-Language"),
r.Header.Get("Accept-Encoding"),
getIP(r),
// Canvas指纹、WebGL指纹等
}
hash := sha256.Sum256([]byte(strings.Join(factors, "|")))
return hex.EncodeToString(hash[:])
}
// 基于设备指纹限流
limiter.Allow("device:" + fingerprint)
限流规则如何动态调整?不重启服务
答案:
方案: 配置中心(推荐)
// 使用etcd/Consul/Nacos
type DynamicRuleManager struct {
client *etcd.Client
rules sync.Map // 本地规则缓存
watchChan chan *RateLimitRule
}
// 启动监听
func (d *DynamicRuleManager) Watch() {
watchChan := d.client.Watch(ctx, "/ratelimit/rules/", clientv3.WithPrefix())
for resp := range watchChan {
for _, ev := range resp.Events {
switch ev.Type {
case mvccpb.PUT:
// 规则更新
rule := parseRule(ev.Kv.Value)
d.rules.Store(rule.Resource, rule)
log.Printf("Rule updated: %s", rule.Resource)
case mvccpb.DELETE:
// 规则删除
d.rules.Delete(string(ev.Kv.Key))
log.Printf("Rule deleted: %s", ev.Kv.Key)
}
}
}
}
网关限流和服务限流有什么区别?都需要吗?
答案:
对比分析
| 维度 | 网关限流 | 服务限流 |
|---|---|---|
| 位置 | 入口处(统一网关) | 服务内部 |
| 目的 | 保护整个系统 | 保护单个服务 |
| 粒度 | 粗粒度(API级别) | 细粒度(方法级别) |
| 识别 | 基于IP/Token | 基于用户/租户 |
| 降级 | 返回429 | 服务降级/熔断 |
推荐策略:
- 都需要:多层防护,层层拦截
- 网关限流:粗粒度保护,防止恶意流量
- 服务限流:细粒度控制,业务逻辑保护
限流失败后应该返回什么错误码?如何引导用户?
答案:
HTTP状态码选择
** 正确做法**:
- 返回429 Too Many Requests(RFC 6585)
完整响应示例
func (m *RateLimitMiddleware) handleRateLimitExceeded(w http.ResponseWriter, result *CheckResult) {
// 设置状态码
w.WriteHeader(http.StatusTooManyRequests) // 429
// 设置响应头
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-RateLimit-Limit", "100")
w.Header().Set("X-RateLimit-Remaining", "0")
w.Header().Set("X-RateLimit-Reset", result.ResetTime.Format(time.RFC3339))
w.Header().Set("Retry-After", strconv.Itoa(int(result.RetryAfter.Seconds())))
// 返回友好的错误信息
response := map[string]interface{}{
"error": "rate_limit_exceeded",
"message": "请求过于频繁,请稍后再试",
"details": map[string]interface{}{
"limit": 100,
"remaining": 0,
"reset_time": result.ResetTime.Unix(),
"retry_after": int(result.RetryAfter.Seconds()),
},
"help_url": "https://docs.example.com/rate-limits",
}
json.NewEncoder(w).Encode(response)
}
如何测试限流系统的正确性?
答案:
测试策略
1. 单元测试(算法正确性)
func TestTokenBucket(t *testing.T) {
limiter := NewTokenBucketLimiter(10, 10) // 容量10,速率10/s
// 测试1: 正常情况
for i := 0; i < 10; i++ {
assert.True(t, limiter.Allow(), "Should allow first 10 requests")
}
// 测试2: 超过容量
assert.False(t, limiter.Allow(), "Should reject 11th request")
// 测试3: 等待恢复
time.Sleep(1 * time.Second)
for i := 0; i < 10; i++ {
assert.True(t, limiter.Allow(), "Should allow after 1 second")
}
}
2. 压力测试(性能验证)
func BenchmarkRateLimiter(b *testing.B) {
limiter := NewRedisRateLimiter(redisClient)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
limiter.Allow(ctx, "bench", "user:1", 10000, 10000)
}
})
// 输出: BenchmarkRateLimiter-8 1000000 1234 ns/op
}