HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 系统设计实战

    • 系统设计面试教程
    • 系统设计方法论
    • 01-短链系统设计
    • 02 - 秒杀系统设计
    • 03 - IM 即时通讯系统设计
    • 04 - Feed 流系统设计
    • 05 - 分布式 ID 生成器设计
    • 06 - 限流系统设计
    • 第7章:搜索引擎设计
    • 08 - 推荐系统设计
    • 09 - 支付系统设计
    • 10 - 电商系统设计
    • 11 - 直播系统设计
    • 第12章:缓存系统设计
    • 第13章:消息队列设计
    • 第14章:分布式事务
    • 15 - 监控系统设计

06 - 限流系统设计

面试频率: | 难度等级: | 推荐时长: 45-60 分钟

目录

  • 需求分析与澄清
  • 容量估算
  • 限流算法对比
  • API设计
  • 数据模型设计
  • 架构设计
  • 核心算法与实现
  • 优化方案
  • 监控告警
  • 面试问答

需求分析与澄清

业务场景

限流系统是分布式系统的核心保护机制,用于控制流量速率,防止系统过载。典型应用场景:

  • API网关限流:保护后端服务,防止流量洪峰
  • 防刷限流:短信验证码、优惠券领取
  • 资源保护:数据库连接池、线程池
  • 业务限流:秒杀活动、抢购场景
  • 第三方接口限流:调用外部API的频率控制

核心挑战

限流系统的六大挑战:
┌─────────────────────────────────────────────────────────┐
│ 1. 精准控制                                              │
│    - 精确限制请求速率                                    │
│    - 避免误杀正常请求                                    │
│                                                          │
│ 2. 分布式协调                                            │
│    - 多节点统一限流                                      │
│    - 保证全局计数准确                                    │
│                                                          │
│ 3. 高性能                                                │
│    - 延迟 < 1ms                                          │
│    - 不能成为性能瓶颈                                    │
│                                                          │
│ 4. 灵活配置                                              │
│    - 支持动态调整限流规则                                │
│    - 不重启服务即可生效                                  │
│                                                          │
│ 5. 降级熔断                                              │
│    - 限流后的降级策略                                    │
│    - 自动熔断异常服务                                    │
│                                                          │
│ 6. 多维度限流                                            │
│    - 用户维度、IP维度、接口维度                          │
│    - 组合条件限流                                        │
└─────────────────────────────────────────────────────────┘

功能性需求

面试官视角的关键问题

面试官: "设计一个限流系统,保护我们的API不被打垮。"

你需要主动澄清以下需求:

Q1: 限流的维度是什么?

单机限流: 限制单台服务器的请求速率(简单)
分布式限流: 限制整个集群的请求速率(复杂)

维度选择:
- 用户维度: 限制单个用户的请求速率
- IP维度: 限制单个IP的请求速率
- 接口维度: 限制单个API的总请求速率
- 租户维度: 多租户SaaS系统

【本次设计】
- 支持多维度限流
- 重点实现分布式限流

Q2: 限流精度要求?

秒级限流: 每秒100次(最常用)
分钟级限流: 每分钟1000次
小时级限流: 每小时10000次

【本次设计】
- 主要支持秒级限流
- 支持自定义时间窗口

Q3: 限流后如何处理?

拒绝服务: 返回429错误(简单直接)
排队等待: 请求进入队列,延迟处理
降级服务: 返回缓存或默认值

【本次设计】
- 默认拒绝服务
- 支持自定义降级策略

Q4: 性能要求?

QPS: 100万+(不能成为瓶颈)
延迟: < 1ms (P99)
可用性: 99.99%

【本次设计】
- 本地计数器优先(高性能)
- Redis集中限流(精准度高)

Q5: 限流规则如何配置?

硬编码: 写在代码里(不灵活)
配置文件: 写在配置中(需重启)
动态配置: 配置中心动态加载(推荐)

【本次设计】
- 支持配置中心动态下发
- 支持热更新,无需重启

非功能性需求

需求类型具体要求优先级
高性能延迟 < 1ms,不影响业务P0
高可用99.99% 可用性P0
精准性误差 < 5%P0
可扩展支持水平扩展P1
易用性简单易接入P1
可观测完善的监控告警P1

容量估算

QPS 估算

【业务规模】
服务总数: 100 个微服务
单服务实例: 10 个
API接口数: 500 个

单实例QPS: 1000(业务处理能力)
集群总QPS: 100 * 10 * 1000 = 100 万

【限流器QPS】
每个请求需要调用限流器: 1次
限流器QPS: 100 万

【Redis QPS】
采用分布式限流(Redis)
单个Redis实例QPS: 10 万(SET/GET操作)
所需Redis实例: 100万 / 10万 = 10 台
实际部署: 20 台(考虑冗余和热点)

内存估算

【单机限流】
限流规则: 500 个接口 * 1KB = 500 KB
计数器: 500 个 * 100 Bytes = 50 KB
总内存: < 1 MB(可忽略)

【分布式限流(Redis)】
单个限流key:
  - key: "ratelimit:user:12345:api:/order" = 50 Bytes
  - value: 计数 = 8 Bytes
  - TTL: 过期时间 = 8 Bytes
  单条记录: 66 Bytes

活跃用户: 100 万
每用户限流key: 10 个(不同API)
总key数: 100万 * 10 = 1000 万

总内存: 1000万 * 66 Bytes = 660 MB
实际部署: 2 GB(考虑Redis内存碎片)

延迟估算

【单机限流】
本地内存操作: < 0.01ms(极快)

【分布式限流】
Redis GET操作: 0.5ms(局域网)
Redis Lua脚本: 1ms(原子操作)

【总延迟】
限流检查 + 业务逻辑: 1ms + 50ms = 51ms
限流占比: 1/51 ≈ 2%(可接受)

限流算法对比

算法1: 固定窗口计数器

原理

固定窗口计数器(Fixed Window Counter):
  - 将时间划分为固定窗口(如1秒)
  - 统计每个窗口内的请求数
  - 超过阈值则拒绝请求

时间轴:
|------窗口1------|------窗口2------|------窗口3------|
0s              1s              2s              3s
请求数: 80         请求数: 120     请求数: 90

阈值: 100/秒
窗口2超过阈值,拒绝新请求

优缺点

 优点:
1. 实现简单,易于理解
2. 内存占用少(只需一个计数器)
3. 性能高(O(1)时间复杂度)

 缺点:
1. 临界问题(边界突刺)
   - 0.5s-1s: 100次请求 
   - 1s-1.5s: 100次请求 
   - 0.5s-1.5s: 200次请求  (实际超限)

2. 窗口切换时计数器归零,可能瞬间放过大量请求

代码实现

package ratelimit

import (
    "sync"
    "time"
)

// FixedWindowLimiter 固定窗口限流器
type FixedWindowLimiter struct {
    limit    int64         // 限流阈值
    window   time.Duration // 时间窗口
    counter  int64         // 当前计数
    lastTime time.Time     // 窗口开始时间
    mu       sync.Mutex    // 互斥锁
}

// NewFixedWindowLimiter 创建固定窗口限流器
func NewFixedWindowLimiter(limit int64, window time.Duration) *FixedWindowLimiter {
    return &FixedWindowLimiter{
        limit:    limit,
        window:   window,
        counter:  0,
        lastTime: time.Now(),
    }
}

// Allow 判断是否允许请求
func (l *FixedWindowLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    // 窗口是否过期
    if now.Sub(l.lastTime) >= l.window {
        // 重置窗口
        l.counter = 0
        l.lastTime = now
    }

    // 判断是否超过限流阈值
    if l.counter < l.limit {
        l.counter++
        return true
    }

    return false
}

// 使用示例
func ExampleFixedWindow() {
    // 限制每秒100次请求
    limiter := NewFixedWindowLimiter(100, time.Second)

    // 模拟请求
    for i := 0; i < 150; i++ {
        if limiter.Allow() {
            // 处理请求
            println("Request", i, "allowed")
        } else {
            // 拒绝请求
            println("Request", i, "rejected")
        }
    }
}

算法2: 滑动窗口计数器

原理

滑动窗口计数器(Sliding Window Counter):
  - 将时间窗口细分为多个小格子
  - 统计当前时间往前推一个窗口内的请求总数
  - 解决固定窗口的临界问题

示例:窗口1秒,分为10个格子(每格100ms)

时间轴:
|---|---|---|---|---|---|---|---|---|---|
 0  100 200 300 400 500 600 700 800 900 1000ms
 10  15  20  25  10   8   5   3   2   2  (请求数)

当前时间: 650ms
滑动窗口: [0-650ms]的前10个格子
总请求数: 10+15+20+25+10+8 = 88

优缺点

 优点:
1. 解决固定窗口的临界问题
2. 限流更加平滑
3. 精度可调(格子越多越精确)

 缺点:
1. 内存占用较高(需存储多个格子)
2. 实现相对复杂
3. 需要定期清理过期格子

代码实现

package ratelimit

import (
    "sync"
    "time"
)

// SlidingWindowLimiter 滑动窗口限流器
type SlidingWindowLimiter struct {
    limit     int64              // 限流阈值
    window    time.Duration      // 时间窗口
    slots     int                // 窗口分片数
    counters  map[int64]int64    // 时间槽 -> 计数
    mu        sync.Mutex         // 互斥锁
}

// NewSlidingWindowLimiter 创建滑动窗口限流器
func NewSlidingWindowLimiter(limit int64, window time.Duration, slots int) *SlidingWindowLimiter {
    return &SlidingWindowLimiter{
        limit:    limit,
        window:   window,
        slots:    slots,
        counters: make(map[int64]int64),
    }
}

// Allow 判断是否允许请求
func (l *SlidingWindowLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now().UnixNano()
    slotDuration := int64(l.window) / int64(l.slots)

    // 计算当前时间槽
    currentSlot := now / slotDuration

    // 计算滑动窗口的起始时间槽
    windowStart := currentSlot - int64(l.slots) + 1

    // 清理过期的时间槽
    for slot := range l.counters {
        if slot < windowStart {
            delete(l.counters, slot)
        }
    }

    // 统计当前窗口内的请求总数
    var total int64
    for slot := windowStart; slot <= currentSlot; slot++ {
        total += l.counters[slot]
    }

    // 判断是否超过限流阈值
    if total < l.limit {
        l.counters[currentSlot]++
        return true
    }

    return false
}

// 使用示例
func ExampleSlidingWindow() {
    // 限制每秒100次请求,窗口分为10个槽
    limiter := NewSlidingWindowLimiter(100, time.Second, 10)

    // 模拟请求
    for i := 0; i < 150; i++ {
        if limiter.Allow() {
            println("Request", i, "allowed")
        } else {
            println("Request", i, "rejected")
        }
        time.Sleep(5 * time.Millisecond)
    }
}

算法3: 令牌桶 (Token Bucket)

原理

令牌桶算法(Token Bucket):
  - 系统以恒定速率往桶里放令牌
  - 请求需要消耗令牌才能通过
  - 桶满时令牌溢出(丢弃)
  - 桶空时请求被拒绝

示例:
                  +------------------+
                  |   令牌桶          |
  令牌生成器 --->  |   🪙🪙🪙🪙🪙        |  容量: 100
  (10个/秒)      |                  |
                  +------------------+
                           |
                      请求消耗令牌
                       (1个/次)

特点:
1. 允许一定程度的突发流量(桶内有余量)
2. 长期平均速率受令牌生成速率限制

优缺点

 优点:
1. 支持突发流量(桶内令牌可一次性消耗)
2. 平滑限流(令牌匀速生成)
3. 实现简单,性能高

 缺点:
1. 需要额外存储令牌数
2. 需要定时器生成令牌(或懒加载计算)
3. 桶容量需要合理设置

代码实现

package ratelimit

import (
    "sync"
    "time"
)

// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
    capacity   int64         // 桶容量
    tokens     int64         // 当前令牌数
    rate       int64         // 令牌生成速率(个/秒)
    lastTime   time.Time     // 上次生成令牌时间
    mu         sync.Mutex    // 互斥锁
}

// NewTokenBucketLimiter 创建令牌桶限流器
func NewTokenBucketLimiter(capacity int64, rate int64) *TokenBucketLimiter {
    return &TokenBucketLimiter{
        capacity: capacity,
        tokens:   capacity,  // 初始令牌数等于容量
        rate:     rate,
        lastTime: time.Now(),
    }
}

// Allow 判断是否允许请求(消耗1个令牌)
func (l *TokenBucketLimiter) Allow() bool {
    return l.AllowN(1)
}

// AllowN 判断是否允许请求(消耗n个令牌)
func (l *TokenBucketLimiter) AllowN(n int64) bool {
    l.mu.Lock()
    defer l.mu.Unlock()

    // 计算当前应该有多少令牌
    now := time.Now()
    elapsed := now.Sub(l.lastTime)

    // 计算新生成的令牌数
    newTokens := int64(elapsed.Seconds() * float64(l.rate))

    // 更新令牌数(不超过容量)
    l.tokens = min(l.capacity, l.tokens+newTokens)
    l.lastTime = now

    // 判断令牌是否足够
    if l.tokens >= n {
        l.tokens -= n
        return true
    }

    return false
}

func min(a, b int64) int64 {
    if a < b {
        return a
    }
    return b
}

// 使用示例
func ExampleTokenBucket() {
    // 容量100,每秒生成50个令牌
    limiter := NewTokenBucketLimiter(100, 50)

    // 模拟突发请求(前100个直接通过)
    for i := 0; i < 150; i++ {
        if limiter.Allow() {
            println("Request", i, "allowed")
        } else {
            println("Request", i, "rejected")
        }
    }

    // 等待1秒,令牌恢复
    time.Sleep(time.Second)

    // 继续请求(恢复50个令牌)
    for i := 150; i < 200; i++ {
        if limiter.Allow() {
            println("Request", i, "allowed")
        } else {
            println("Request", i, "rejected")
        }
    }
}

算法4: 漏桶 (Leaky Bucket)

原理

漏桶算法(Leaky Bucket):
  - 请求进入漏桶
  - 漏桶以恒定速率流出请求
  - 桶满时新请求溢出(拒绝)

示例:
       请求流入(不定速)
              ↓
        +----------+
        | 🌊🌊🌊🌊🌊  |  桶容量: 100
        | 🌊🌊🌊     |
        +----------+
              ↓
        匀速流出(10个/秒)

特点:
1. 强制限制流出速率
2. 平滑处理突发流量
3. 类似消息队列

优缺点

 优点:
1. 严格控制流出速率(绝对平滑)
2. 适合处理突发流量
3. 实现简单

 缺点:
1. 无法处理突发流量(流出速率固定)
2. 请求需要排队(增加延迟)
3. 队列满时丢弃请求

代码实现

package ratelimit

import (
    "context"
    "sync"
    "time"
)

// LeakyBucketLimiter 漏桶限流器
type LeakyBucketLimiter struct {
    capacity int           // 桶容量
    rate     time.Duration // 流出速率(每个请求的间隔时间)
    queue    chan struct{} // 请求队列
    mu       sync.Mutex    // 互斥锁
}

// NewLeakyBucketLimiter 创建漏桶限流器
func NewLeakyBucketLimiter(capacity int, rate time.Duration) *LeakyBucketLimiter {
    limiter := &LeakyBucketLimiter{
        capacity: capacity,
        rate:     rate,
        queue:    make(chan struct{}, capacity),
    }

    // 启动漏水协程
    go limiter.leak()

    return limiter
}

// leak 漏水(以恒定速率流出)
func (l *LeakyBucketLimiter) leak() {
    ticker := time.NewTicker(l.rate)
    defer ticker.Stop()

    for range ticker.C {
        // 从队列中取出一个请求(流出)
        select {
        case <-l.queue:
            // 请求流出
        default:
            // 队列为空,无需处理
        }
    }
}

// Allow 判断是否允许请求
func (l *LeakyBucketLimiter) Allow() bool {
    select {
    case l.queue <- struct{}{}:
        // 成功加入队列
        return true
    default:
        // 队列已满,拒绝请求
        return false
    }
}

// Wait 等待直到允许请求(带超时)
func (l *LeakyBucketLimiter) Wait(ctx context.Context) error {
    select {
    case l.queue <- struct{}{}:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// 使用示例
func ExampleLeakyBucket() {
    // 容量100,每10ms流出一个请求(100个/秒)
    limiter := NewLeakyBucketLimiter(100, 10*time.Millisecond)

    // 模拟突发请求
    for i := 0; i < 150; i++ {
        if limiter.Allow() {
            println("Request", i, "allowed (queued)")
        } else {
            println("Request", i, "rejected (bucket full)")
        }
    }

    // 等待队列处理
    time.Sleep(2 * time.Second)
}

算法对比总结

算法突发流量平滑度复杂度适用场景
固定窗口简单场景,对精度要求不高
滑动窗口需要精确限流,内存充足
令牌桶推荐,兼顾性能和灵活性
漏桶需要绝对平滑,可接受排队

推荐选择:

  • API网关限流:令牌桶(允许突发,体验好)
  • 数据库保护:漏桶(严格控流,保护后端)
  • 单机限流:固定窗口(简单高效)
  • 精确限流:滑动窗口(精度高)

API设计

核心接口

// RateLimiter 限流器接口
type RateLimiter interface {
    // Allow 判断是否允许请求通过
    // 返回值: true-允许, false-拒绝
    Allow(ctx context.Context, resource string, identifier string) (bool, error)

    // AllowN 判断是否允许n个请求通过
    AllowN(ctx context.Context, resource string, identifier string, n int64) (bool, error)

    // Wait 等待直到允许请求通过(阻塞)
    // 返回值: 等待时间, error
    Wait(ctx context.Context, resource string, identifier string) (time.Duration, error)

    // Reserve 预约令牌(不阻塞,返回等待时间)
    Reserve(ctx context.Context, resource string, identifier string) (*Reservation, error)
}

// Reservation 预约信息
type Reservation struct {
    OK        bool          // 是否成功
    Delay     time.Duration // 需要等待的时间
    CancelFunc func()       // 取消预约
}

// RuleManager 规则管理器接口
type RuleManager interface {
    // AddRule 添加限流规则
    AddRule(rule *RateLimitRule) error

    // UpdateRule 更新限流规则
    UpdateRule(rule *RateLimitRule) error

    // DeleteRule 删除限流规则
    DeleteRule(resource string) error

    // GetRule 获取限流规则
    GetRule(resource string) (*RateLimitRule, error)

    // ListRules 列出所有规则
    ListRules() ([]*RateLimitRule, error)
}

RESTful API

# 限流检查API
POST /api/v1/ratelimit/check
Request:
  resource: "/api/order/create"    # 资源标识
  identifier: "user:12345"          # 用户标识
  tokens: 1                         # 消耗令牌数
Response:
  allowed: true                     # 是否允许
  remaining: 99                     # 剩余配额
  reset_time: "2025-01-01T00:00:01Z"  # 重置时间
  retry_after: 0                    # 重试等待时间(秒)

# 规则管理API
POST /api/v1/ratelimit/rules
Request:
  resource: "/api/order/create"
  limit: 100                        # 限流阈值
  window: "1s"                      # 时间窗口
  algorithm: "token_bucket"         # 限流算法
  burst: 150                        # 突发容量
Response:
  rule_id: "rule_123"
  created_at: "2025-01-01T00:00:00Z"

GET /api/v1/ratelimit/rules/{resource}
Response:
  resource: "/api/order/create"
  limit: 100
  window: "1s"
  algorithm: "token_bucket"
  burst: 150
  enabled: true

PUT /api/v1/ratelimit/rules/{resource}
DELETE /api/v1/ratelimit/rules/{resource}

数据模型设计

限流规则表

-- 限流规则配置表
CREATE TABLE rate_limit_rules (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    resource VARCHAR(255) NOT NULL COMMENT '资源标识(API路径/业务标识)',
    algorithm VARCHAR(50) NOT NULL COMMENT '限流算法(fixed_window/sliding_window/token_bucket/leaky_bucket)',
    limit_count BIGINT NOT NULL COMMENT '限流阈值',
    window_size INT NOT NULL COMMENT '时间窗口(秒)',
    burst_size BIGINT DEFAULT 0 COMMENT '突发容量(令牌桶)',

    -- 限流维度
    dimension VARCHAR(50) NOT NULL COMMENT '限流维度(user/ip/api/tenant)',

    -- 生效配置
    enabled TINYINT(1) DEFAULT 1 COMMENT '是否启用',
    priority INT DEFAULT 0 COMMENT '优先级(数字越大优先级越高)',

    -- 降级配置
    fallback_action VARCHAR(50) DEFAULT 'reject' COMMENT '降级动作(reject/queue/degrade)',
    fallback_message TEXT COMMENT '降级提示信息',

    -- 时间戳
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    created_by VARCHAR(100),

    UNIQUE KEY uk_resource (resource),
    KEY idx_enabled (enabled),
    KEY idx_priority (priority)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='限流规则配置表';

-- 示例数据
INSERT INTO rate_limit_rules
(resource, algorithm, limit_count, window_size, burst_size, dimension)
VALUES
('/api/order/create', 'token_bucket', 100, 1, 150, 'user'),
('/api/sms/send', 'fixed_window', 10, 60, 0, 'user'),
('/api/login', 'sliding_window', 5, 60, 0, 'ip');

Redis数据结构

# 1. 固定窗口计数器
# Key格式: ratelimit:fixed:{resource}:{identifier}:{window_start}
# Value: 计数
# TTL: 窗口大小
redis> SET ratelimit:fixed:/api/order:user:12345:1704067200 85 EX 1
redis> INCR ratelimit:fixed:/api/order:user:12345:1704067200
(integer) 86

# 2. 滑动窗口计数器 (使用Sorted Set)
# Key格式: ratelimit:sliding:{resource}:{identifier}
# Member: 请求ID或时间戳
# Score: 请求时间戳(毫秒)
redis> ZADD ratelimit:sliding:/api/order:user:12345 1704067200123 "req_001"
redis> ZADD ratelimit:sliding:/api/order:user:12345 1704067200456 "req_002"

# 清理过期数据
redis> ZREMRANGEBYSCORE ratelimit:sliding:/api/order:user:12345 0 1704067199999

# 统计当前窗口请求数
redis> ZCOUNT ratelimit:sliding:/api/order:user:12345 1704067200000 1704067201000

# 3. 令牌桶 (使用Hash)
# Key格式: ratelimit:token:{resource}:{identifier}
# Fields: tokens(当前令牌数), last_time(上次更新时间)
redis> HSET ratelimit:token:/api/order:user:12345 tokens 95 last_time 1704067200
redis> HGETALL ratelimit:token:/api/order:user:12345
1) "tokens"
2) "95"
3) "last_time"
4) "1704067200"

# 4. 规则缓存 (使用Hash)
# Key格式: ratelimit:rule:{resource}
redis> HSET ratelimit:rule:/api/order algorithm token_bucket limit 100 window 1 burst 150

架构设计

V1: 单机限流(本地内存)

架构图

┌─────────────────────────────────────────────┐
│              用户请求                        │
└──────────────────┬──────────────────────────┘
                   ↓
        ┌──────────────────────┐
        │   API Gateway        │
        │  ┌────────────────┐  │
        │  │ Rate Limiter   │  │  (内存计数器)
        │  │ - Token Bucket │  │
        │  │ - Local Cache  │  │
        │  └────────────────┘  │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  Backend Services    │
        └──────────────────────┘

优点:
 实现简单,性能极高(< 0.01ms)
 无外部依赖

缺点:
 单机限流,无法全局协调
 水平扩展时限流不准确
   (10台服务器,每台限100 = 总限1000,实际需要100)

代码实现

package ratelimit

import (
    "context"
    "sync"
    "time"
)

// LocalRateLimiter 本地限流器(单机版)
type LocalRateLimiter struct {
    limiters map[string]*TokenBucketLimiter
    rules    map[string]*RateLimitRule
    mu       sync.RWMutex
}

// RateLimitRule 限流规则
type RateLimitRule struct {
    Resource  string        // 资源标识
    Algorithm string        // 算法类型
    Limit     int64         // 限流阈值
    Window    time.Duration // 时间窗口
    Burst     int64         // 突发容量
}

func NewLocalRateLimiter() *LocalRateLimiter {
    return &LocalRateLimiter{
        limiters: make(map[string]*TokenBucketLimiter),
        rules:    make(map[string]*RateLimitRule),
    }
}

// AddRule 添加限流规则
func (l *LocalRateLimiter) AddRule(rule *RateLimitRule) {
    l.mu.Lock()
    defer l.mu.Unlock()

    // 创建限流器
    limiter := NewTokenBucketLimiter(rule.Burst, rule.Limit)

    l.rules[rule.Resource] = rule
    l.limiters[rule.Resource] = limiter
}

// Allow 判断是否允许请求
func (l *LocalRateLimiter) Allow(ctx context.Context, resource string, identifier string) (bool, error) {
    l.mu.RLock()
    limiter, exists := l.limiters[resource]
    l.mu.RUnlock()

    if !exists {
        // 没有配置规则,默认允许
        return true, nil
    }

    // 组合key: resource + identifier
    // (简化实现,生产环境需要分别存储)
    return limiter.Allow(), nil
}

// 使用示例
func ExampleLocalRateLimiter() {
    limiter := NewLocalRateLimiter()

    // 添加规则:订单API每秒100次
    limiter.AddRule(&RateLimitRule{
        Resource:  "/api/order/create",
        Algorithm: "token_bucket",
        Limit:     100,
        Window:    time.Second,
        Burst:     150,
    })

    ctx := context.Background()

    // 模拟请求
    for i := 0; i < 200; i++ {
        allowed, _ := limiter.Allow(ctx, "/api/order/create", "user:12345")
        if allowed {
            println("Request", i, "allowed")
        } else {
            println("Request", i, "rejected")
        }
    }
}

V2: 分布式限流(Redis)

架构图

┌─────────────────────────────────────────────────────────┐
│                     用户请求                             │
└────────────┬──────────────────┬─────────────────────────┘
             ↓                  ↓
  ┌──────────────────┐  ┌──────────────────┐
  │  API Gateway 1   │  │  API Gateway 2   │
  │  ┌────────────┐  │  │  ┌────────────┐  │
  │  │Rate Limiter│  │  │  │Rate Limiter│  │
  │  └──────┬─────┘  │  │  └──────┬─────┘  │
  └─────────┼────────┘  └─────────┼────────┘
            │                     │
            └──────────┬──────────┘
                       ↓
            ┌─────────────────────┐
            │   Redis Cluster     │
            │  ┌───────────────┐  │
            │  │ Lua Scripts   │  │  (原子操作)
            │  │ - Token Bucket│  │
            │  │ - Sliding Win │  │
            │  └───────────────┘  │
            └─────────────────────┘
                       ↓
            ┌─────────────────────┐
            │  Backend Services   │
            └─────────────────────┘

优点:
 全局限流,多实例共享配额
 限流准确
 支持动态配置

缺点:
 依赖Redis(单点风险)
 网络开销(~1ms延迟)
 Redis故障影响限流功能

核心实现:Redis + Lua脚本

package ratelimit

import (
    "context"
    "errors"
    "time"

    "github.com/go-redis/redis/v8"
)

// RedisRateLimiter Redis分布式限流器
type RedisRateLimiter struct {
    client *redis.Client
    script *redis.Script  // Lua脚本
}

// Lua脚本:令牌桶算法
const tokenBucketScript = `
-- KEYS[1]: 限流key
-- ARGV[1]: 令牌生成速率(个/秒)
-- ARGV[2]: 桶容量
-- ARGV[3]: 当前时间戳(秒)
-- ARGV[4]: 消耗令牌数

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 获取上次更新时间和令牌数
local last_time = tonumber(redis.call('HGET', KEYS[1], 'last_time') or now)
local tokens = tonumber(redis.call('HGET', KEYS[1], 'tokens') or capacity)

-- 计算新生成的令牌数
local elapsed = math.max(0, now - last_time)
local new_tokens = math.min(capacity, tokens + elapsed * rate)

-- 判断令牌是否足够
if new_tokens >= requested then
    -- 扣减令牌
    new_tokens = new_tokens - requested

    -- 更新Redis
    redis.call('HSET', KEYS[1], 'tokens', new_tokens)
    redis.call('HSET', KEYS[1], 'last_time', now)
    redis.call('EXPIRE', KEYS[1], 3600)  -- 1小时过期

    return {1, new_tokens}  -- 允许,返回剩余令牌
else
    return {0, new_tokens}  -- 拒绝,返回当前令牌
end
`

func NewRedisRateLimiter(client *redis.Client) *RedisRateLimiter {
    return &RedisRateLimiter{
        client: client,
        script: redis.NewScript(tokenBucketScript),
    }
}

// Allow 判断是否允许请求
func (r *RedisRateLimiter) Allow(ctx context.Context, resource string, identifier string, rate int64, capacity int64) (bool, error) {
    // 构造限流key
    key := "ratelimit:token:" + resource + ":" + identifier

    now := time.Now().Unix()

    // 执行Lua脚本(原子操作)
    result, err := r.script.Run(ctx, r.client, []string{key}, rate, capacity, now, 1).Result()
    if err != nil {
        return false, err
    }

    // 解析结果
    values, ok := result.([]interface{})
    if !ok || len(values) != 2 {
        return false, errors.New("invalid script result")
    }

    allowed := values[0].(int64) == 1
    return allowed, nil
}

// AllowN 判断是否允许n个请求
func (r *RedisRateLimiter) AllowN(ctx context.Context, resource string, identifier string, rate int64, capacity int64, n int64) (bool, error) {
    key := "ratelimit:token:" + resource + ":" + identifier
    now := time.Now().Unix()

    result, err := r.script.Run(ctx, r.client, []string{key}, rate, capacity, now, n).Result()
    if err != nil {
        return false, err
    }

    values, ok := result.([]interface{})
    if !ok || len(values) != 2 {
        return false, errors.New("invalid script result")
    }

    allowed := values[0].(int64) == 1
    return allowed, nil
}

// 使用示例
func ExampleRedisRateLimiter() {
    // 连接Redis
    client := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    limiter := NewRedisRateLimiter(client)
    ctx := context.Background()

    // 限流参数:每秒100个令牌,桶容量150
    rate := int64(100)
    capacity := int64(150)

    // 模拟请求
    for i := 0; i < 200; i++ {
        allowed, err := limiter.Allow(ctx, "/api/order/create", "user:12345", rate, capacity)
        if err != nil {
            println("Error:", err.Error())
            continue
        }

        if allowed {
            println("Request", i, "allowed")
        } else {
            println("Request", i, "rejected")
        }
    }
}

滑动窗口 Lua 脚本

-- 滑动窗口限流(基于Sorted Set)
-- KEYS[1]: 限流key
-- ARGV[1]: 时间窗口(秒)
-- ARGV[2]: 限流阈值
-- ARGV[3]: 当前时间戳(毫秒)

local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

local window_start = now - window * 1000

-- 清理过期数据
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, window_start)

-- 统计当前窗口请求数
local count = redis.call('ZCARD', KEYS[1])

if count < limit then
    -- 添加当前请求
    redis.call('ZADD', KEYS[1], now, now)
    redis.call('EXPIRE', KEYS[1], window)
    return {1, limit - count - 1}  -- 允许,返回剩余配额
else
    return {0, 0}  -- 拒绝
end

V3: 多级限流(本地+Redis)

架构图

┌─────────────────────────────────────────────────────────┐
│                     用户请求                             │
└────────────┬──────────────────┬─────────────────────────┘
             ↓                  ↓
  ┌─────────────────────┐  ┌─────────────────────┐
  │  API Gateway 1      │  │  API Gateway 2      │
  │                     │  │                     │
  │  ┌───────────────┐  │  │  ┌───────────────┐  │
  │  │ L1: 本地限流  │  │  │  │ L1: 本地限流  │  │  ← 快速拦截
  │  │  (粗粒度)     │  │  │  │  (粗粒度)     │  │    (< 0.01ms)
  │  └───────┬───────┘  │  │  └───────┬───────┘  │
  │          ↓          │  │          ↓          │
  │  ┌───────────────┐  │  │  ┌───────────────┐  │
  │  │ L2: Redis限流 │  │  │  │ L2: Redis限流 │  │  ← 精准限流
  │  │  (细粒度)     │  │  │  │  (细粒度)     │  │    (~1ms)
  │  └───────┬───────┘  │  │  └───────┬───────┘  │
  └──────────┼──────────┘  └──────────┼──────────┘
             │                        │
             └───────────┬────────────┘
                         ↓
              ┌─────────────────────┐
              │   Redis Cluster     │
              └─────────────────────┘
                         ↓
              ┌─────────────────────┐
              │  Backend Services   │
              └─────────────────────┘

限流策略:
1. L1本地限流: 限制单机QPS(如总限100,10台机器每台限15)
2. L2 Redis限流: 全局精准限流(总限100)

优点:
 性能高(本地快速拦截)
 精度高(Redis全局限流)
 降低Redis压力(本地拦截大部分请求)
 Redis故障时仍有本地限流保护

缺点:
⚠️  配置复杂(需要合理分配本地和全局配额)
⚠️  一致性略弱(本地计数有延迟)

代码实现

package ratelimit

import (
    "context"
    "time"
)

// MultiLevelRateLimiter 多级限流器
type MultiLevelRateLimiter struct {
    local  *LocalRateLimiter
    redis  *RedisRateLimiter
    config *MultiLevelConfig
}

// MultiLevelConfig 多级限流配置
type MultiLevelConfig struct {
    // 本地限流配置
    LocalLimit    int64   // 本地限流阈值
    LocalCapacity int64   // 本地突发容量

    // Redis限流配置
    RedisLimit    int64   // 全局限流阈值
    RedisCapacity int64   // 全局突发容量

    // 策略
    LocalFirst    bool    // 优先本地限流
    RedisRequired bool    // Redis必须检查
}

func NewMultiLevelRateLimiter(local *LocalRateLimiter, redis *RedisRateLimiter, config *MultiLevelConfig) *MultiLevelRateLimiter {
    return &MultiLevelRateLimiter{
        local:  local,
        redis:  redis,
        config: config,
    }
}

// Allow 判断是否允许请求
func (m *MultiLevelRateLimiter) Allow(ctx context.Context, resource string, identifier string) (bool, error) {
    // 第一级:本地限流(快速拦截)
    localAllowed, err := m.local.Allow(ctx, resource, identifier)
    if err != nil {
        return false, err
    }

    if !localAllowed {
        // 本地限流拦截
        return false, nil
    }

    // 第二级:Redis限流(精准控制)
    if m.config.RedisRequired {
        redisAllowed, err := m.redis.Allow(ctx, resource, identifier, m.config.RedisLimit, m.config.RedisCapacity)
        if err != nil {
            // Redis故障,降级到仅本地限流
            return localAllowed, nil
        }
        return redisAllowed, nil
    }

    return true, nil
}

// 使用示例
func ExampleMultiLevelRateLimiter() {
    // 初始化本地限流器
    local := NewLocalRateLimiter()
    local.AddRule(&RateLimitRule{
        Resource:  "/api/order/create",
        Algorithm: "token_bucket",
        Limit:     15,   // 单机限15(假设10台机器,总限100,每台分配10+冗余5)
        Window:    time.Second,
        Burst:     20,
    })

    // 初始化Redis限流器
    redis := NewRedisRateLimiter(nil) // 假设已连接

    // 配置多级限流
    config := &MultiLevelConfig{
        LocalLimit:    15,
        LocalCapacity: 20,
        RedisLimit:    100,
        RedisCapacity: 150,
        LocalFirst:    true,
        RedisRequired: true,
    }

    limiter := NewMultiLevelRateLimiter(local, redis, config)

    ctx := context.Background()

    // 模拟请求
    for i := 0; i < 200; i++ {
        allowed, err := limiter.Allow(ctx, "/api/order/create", "user:12345")
        if err != nil {
            println("Error:", err.Error())
            continue
        }

        if allowed {
            println("Request", i, "allowed")
        } else {
            println("Request", i, "rejected")
        }
    }
}

核心算法与实现

完整限流系统实现

package ratelimit

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/go-redis/redis/v8"
)

// 完整的限流系统实现
type RateLimitSystem struct {
    ruleManager  *RuleManager
    localLimiter *LocalRateLimiter
    redisLimiter *RedisRateLimiter
    config       *SystemConfig
    mu           sync.RWMutex
}

type SystemConfig struct {
    Mode          string  // "local", "redis", "hybrid"
    EnableMetrics bool    // 是否启用指标
    EnableLog     bool    // 是否启用日志
}

// Check 检查限流
func (s *RateLimitSystem) Check(ctx context.Context, resource string, identifier string) (*CheckResult, error) {
    // 获取规则
    rule, err := s.ruleManager.GetRule(resource)
    if err != nil || rule == nil {
        // 没有规则,默认允许
        return &CheckResult{Allowed: true}, nil
    }

    if !rule.Enabled {
        return &CheckResult{Allowed: true, Reason: "rule disabled"}, nil
    }

    start := time.Now()

    var allowed bool
    var remaining int64

    // 根据模式选择限流器
    switch s.config.Mode {
    case "local":
        allowed, err = s.localLimiter.Allow(ctx, resource, identifier)
    case "redis":
        allowed, err = s.redisLimiter.Allow(ctx, resource, identifier, rule.Limit, rule.Burst)
    case "hybrid":
        // 多级限流
        localAllowed, _ := s.local Limiter.Allow(ctx, resource, identifier)
        if !localAllowed {
            allowed = false
        } else {
            allowed, err = s.redisLimiter.Allow(ctx, resource, identifier, rule.Limit, rule.Burst)
        }
    }

    latency := time.Since(start)

    // 记录指标
    if s.config.EnableMetrics {
        s.recordMetrics(resource, allowed, latency)
    }

    result := &CheckResult{
        Allowed:   allowed,
        Remaining: remaining,
        ResetTime: time.Now().Add(rule.Window),
        Latency:   latency,
    }

    if !allowed {
        result.Reason = "rate limit exceeded"
        result.RetryAfter = rule.Window
    }

    return result, err
}

type CheckResult struct {
    Allowed    bool          // 是否允许
    Remaining  int64         // 剩余配额
    ResetTime  time.Time     // 重置时间
    RetryAfter time.Duration // 重试等待时间
    Reason     string        // 原因
    Latency    time.Duration // 延迟
}

// 指标记录
func (s *RateLimitSystem) recordMetrics(resource string, allowed bool, latency time.Duration) {
    // 使用Prometheus记录指标
    // rateLimitRequestsTotal.WithLabelValues(resource, fmt.Sprint(allowed)).Inc()
    // rateLimitLatency.WithLabelValues(resource).Observe(latency.Seconds())
}

中间件集成

HTTP中间件

package middleware

import (
    "net/http"
    "ratelimit"
)

// RateLimitMiddleware HTTP限流中间件
func RateLimitMiddleware(limiter *ratelimit.RateLimitSystem) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            ctx := r.Context()

            // 获取资源标识(API路径)
            resource := r.URL.Path

            // 获取用户标识(从请求中提取)
            identifier := extractIdentifier(r) // 如: user ID 或 IP

            // 检查限流
            result, err := limiter.Check(ctx, resource, identifier)
            if err != nil {
                http.Error(w, "Internal Server Error", http.StatusInternalServerError)
                return
            }

            if !result.Allowed {
                // 限流触发,返回429
                w.Header().Set("X-RateLimit-Limit", "100")
                w.Header().Set("X-RateLimit-Remaining", "0")
                w.Header().Set("X-RateLimit-Reset", result.ResetTime.Format(time.RFC3339))
                w.Header().Set("Retry-After", fmt.Sprintf("%d", int(result.RetryAfter.Seconds())))

                http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
                return
            }

            // 添加响应头
            w.Header().Set("X-RateLimit-Limit", "100")
            w.Header().Set("X-RateLimit-Remaining", fmt.Sprintf("%d", result.Remaining))

            // 继续处理请求
            next.ServeHTTP(w, r)
        })
    }
}

// extractIdentifier 提取用户标识
func extractIdentifier(r *http.Request) string {
    // 优先使用用户ID
    if userID := r.Header.Get("X-User-ID"); userID != "" {
        return "user:" + userID
    }

    // 其次使用IP
    ip := r.Header.Get("X-Real-IP")
    if ip == "" {
        ip = r.Header.Get("X-Forwarded-For")
    }
    if ip == "" {
        ip = r.RemoteAddr
    }

    return "ip:" + ip
}

// 使用示例
func main() {
    // 初始化限流系统
    limiter := ratelimit.NewRateLimitSystem(config)

    // 创建HTTP服务
    mux := http.NewServeMux()
    mux.HandleFunc("/api/order/create", handleOrderCreate)

    // 应用限流中间件
    handler := RateLimitMiddleware(limiter)(mux)

    http.ListenAndServe(":8080", handler)
}

gRPC拦截器

package interceptor

import (
    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
    "ratelimit"
)

// RateLimitInterceptor gRPC限流拦截器
func RateLimitInterceptor(limiter *ratelimit.RateLimitSystem) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // 获取方法名作为资源标识
        resource := info.FullMethod

        // 从metadata获取用户标识
        identifier := extractUserFromMetadata(ctx)

        // 检查限流
        result, err := limiter.Check(ctx, resource, identifier)
        if err != nil {
            return nil, status.Error(codes.Internal, "rate limit check failed")
        }

        if !result.Allowed {
            // 设置响应元数据
            md := metadata.Pairs(
                "x-ratelimit-limit", "100",
                "x-ratelimit-remaining", "0",
                "x-ratelimit-reset", result.ResetTime.Format(time.RFC3339),
            )
            grpc.SetTrailer(ctx, md)

            return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
        }

        // 继续处理请求
        return handler(ctx, req)
    }
}

func extractUserFromMetadata(ctx context.Context) string {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "unknown"
    }

    if userIDs := md.Get("user-id"); len(userIDs) > 0 {
        return "user:" + userIDs[0]
    }

    return "unknown"
}

优化方案

优化1: 热点数据优化

问题:部分热点用户/API导致Redis压力大

解决方案:

// HotKeyOptimizer 热点key优化器
type HotKeyOptimizer struct {
    localCache  *sync.Map  // 本地缓存热点数据
    hotKeyStats map[string]int64
    threshold   int64      // 热点阈值
    mu          sync.RWMutex
}

// 检测热点key
func (h *HotKeyOptimizer) isHotKey(key string) bool {
    h.mu.RLock()
    count := h.hotKeyStats[key]
    h.mu.RUnlock()

    return count > h.threshold
}

// 优化后的限流检查
func (s *RateLimitSystem) CheckWithHotKeyOptimization(ctx context.Context, resource string, identifier string) (*CheckResult, error) {
    key := resource + ":" + identifier

    // 检测是否为热点key
    if s.hotKeyOptimizer.isHotKey(key) {
        // 使用本地缓存
        if cachedResult, ok := s.hotKeyOptimizer.localCache.Load(key); ok {
            return cachedResult.(*CheckResult), nil
        }
    }

    // 正常限流检查
    result, err := s.Check(ctx, resource, identifier)

    // 缓存热点key结果
    if s.hotKeyOptimizer.isHotKey(key) {
        s.hotKeyOptimizer.localCache.Store(key, result)
    }

    return result, err
}

效果对比:

方案延迟Redis QPS准确度
纯Redis~1ms100万100%
热点优化~0.1ms20万95%

优化2: 批量检查

问题:单个请求需要检查多个限流规则,延迟高

解决方案:

// BatchCheck 批量检查限流
func (s *RateLimitSystem) BatchCheck(ctx context.Context, checks []CheckRequest) ([]CheckResult, error) {
    results := make([]CheckResult, len(checks))

    // 使用Pipeline批量执行Redis命令
    pipe := s.redisLimiter.client.Pipeline()

    for i, check := range checks {
        key := "ratelimit:token:" + check.Resource + ":" + check.Identifier

        // 添加到Pipeline
        pipe.HGetAll(ctx, key)
    }

    // 执行Pipeline
    cmds, err := pipe.Exec(ctx)
    if err != nil {
        return nil, err
    }

    // 解析结果
    for i, cmd := range cmds {
        // 处理每个命令的结果
        // ...
        results[i] = CheckResult{Allowed: true}
    }

    return results, nil
}

效果对比:

方案总延迟吞吐量
逐个检查 (10个规则)~10ms1000 QPS
批量检查 (10个规则)~2ms5000 QPS

优化3: 预热机制

问题:冷启动时大量请求同时通过,压垮后端

解决方案:

// WarmupConfig 预热配置
type WarmupConfig struct {
    Duration     time.Duration // 预热时长
    InitialLimit int64         // 初始限流值
    TargetLimit  int64         // 目标限流值
}

// CalculateLimit 计算预热期间的限流值
func (w *WarmupConfig) CalculateLimit(elapsedTime time.Duration) int64 {
    if elapsedTime >= w.Duration {
        return w.TargetLimit
    }

    // 线性增长
    ratio := float64(elapsedTime) / float64(w.Duration)
    currentLimit := float64(w.InitialLimit) + ratio*float64(w.TargetLimit-w.InitialLimit)

    return int64(currentLimit)
}

// 使用示例
func ExampleWarmup() {
    config := &WarmupConfig{
        Duration:     5 * time.Minute, // 5分钟预热
        InitialLimit: 10,              // 初始10 QPS
        TargetLimit:  100,             // 目标100 QPS
    }

    startTime := time.Now()

    // 每秒计算当前限流值
    for i := 0; i < 300; i++ {
        elapsed := time.Duration(i) * time.Second
        currentLimit := config.CalculateLimit(elapsed)

        println("Time:", elapsed, "Limit:", currentLimit)

        time.Sleep(time.Second)
    }
}

预热曲线:

Limit
  100 │                          ┌─────────
      │                      ┌───┘
   50 │                  ┌───┘
      │              ┌───┘
   10 │──────────────┘
      └─────────────────────────────────── Time
      0s            5min                  10min

优化4: 降级熔断

// CircuitBreaker 熔断器
type CircuitBreaker struct {
    maxFailures  int           // 最大失败次数
    resetTimeout time.Duration // 重置超时
    failures     int           // 当前失败次数
    lastFailTime time.Time     // 最后失败时间
    state        string        // 状态: "closed", "open", "half-open"
    mu           sync.Mutex
}

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        state:        "closed",
    }
}

// Call 执行调用(带熔断)
func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    // 检查状态
    if cb.state == "open" {
        // 检查是否可以进入半开状态
        if time.Since(cb.lastFailTime) > cb.resetTimeout {
            cb.state = "half-open"
        } else {
            return fmt.Errorf("circuit breaker is open")
        }
    }

    // 执行调用
    err := fn()

    if err != nil {
        // 记录失败
        cb.failures++
        cb.lastFailTime = time.Now()

        if cb.failures >= cb.maxFailures {
            cb.state = "open"
        }

        return err
    }

    // 成功,重置计数器
    cb.failures = 0
    cb.state = "closed"

    return nil
}

// 集成到限流系统
func (s *RateLimitSystem) CheckWithCircuitBreaker(ctx context.Context, resource string, identifier string) (*CheckResult, error) {
    var result *CheckResult
    var err error

    // 使用熔断器保护Redis调用
    cbErr := s.circuitBreaker.Call(func() error {
        result, err = s.redisLimiter.Allow(ctx, resource, identifier, 100, 150)
        return err
    })

    if cbErr != nil {
        // 熔断器打开,降级到本地限流
        result, err = s.localLimiter.Allow(ctx, resource, identifier)
    }

    return result, err
}

监控告警

核心指标

业务指标:
- rate_limit_requests_total: 限流检查总数(按资源、结果分类)
- rate_limit_rejected_total: 限流拒绝总数
- rate_limit_pass_rate: 限流通过率

性能指标:
- rate_limit_latency: 限流检查延迟(P50/P95/P99)
- redis_operations_total: Redis操作总数
- local_cache_hit_rate: 本地缓存命中率

资源指标:
- rate_limit_rules_count: 限流规则数量
- rate_limit_hot_keys_count: 热点key数量
- circuit_breaker_state: 熔断器状态

Prometheus指标定义

package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    // 限流请求总数
    RateLimitRequestsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rate_limit_requests_total",
            Help: "Total number of rate limit checks",
        },
        []string{"resource", "allowed"},
    )

    // 限流延迟
    RateLimitLatency = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "rate_limit_latency_seconds",
            Help:    "Latency of rate limit checks",
            Buckets: prometheus.DefBuckets,
        },
        []string{"resource"},
    )

    // Redis操作总数
    RedisOperationsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "redis_operations_total",
            Help: "Total number of Redis operations",
        },
        []string{"operation", "status"},
    )

    // 本地缓存命中率
    LocalCacheHits = promauto.NewCounter(
        prometheus.CounterOpts{
            Name: "local_cache_hits_total",
            Help: "Total number of local cache hits",
        },
    )

    LocalCacheMisses = promauto.NewCounter(
        prometheus.CounterOpts{
            Name: "local_cache_misses_total",
            Help: "Total number of local cache misses",
        },
    )

    // 熔断器状态
    CircuitBreakerState = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "circuit_breaker_state",
            Help: "Circuit breaker state (0=closed, 1=open, 2=half-open)",
        },
        []string{"resource"},
    )
)

// 记录限流检查
func RecordRateLimitCheck(resource string, allowed bool, latency time.Duration) {
    RateLimitRequestsTotal.WithLabelValues(resource, fmt.Sprint(allowed)).Inc()
    RateLimitLatency.WithLabelValues(resource).Observe(latency.Seconds())
}

Grafana Dashboard

{
  "dashboard": {
    "title": "Rate Limit Monitoring",
    "panels": [
      {
        "title": "Requests Per Second",
        "targets": [
          {
            "expr": "rate(rate_limit_requests_total[1m])"
          }
        ]
      },
      {
        "title": "Pass Rate",
        "targets": [
          {
            "expr": "rate(rate_limit_requests_total{allowed=\"true\"}[1m]) / rate(rate_limit_requests_total[1m])"
          }
        ]
      },
      {
        "title": "P99 Latency",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(rate_limit_latency_seconds_bucket[5m]))"
          }
        ]
      },
      {
        "title": "Top 10 Limited APIs",
        "targets": [
          {
            "expr": "topk(10, rate(rate_limit_requests_total{allowed=\"false\"}[5m]))"
          }
        ]
      }
    ]
  }
}

告警规则

groups:
  - name: rate_limit_alerts
    rules:
      # 限流拒绝率过高
      - alert: HighRejectionRate
        expr: |
          (
            rate(rate_limit_requests_total{allowed="false"}[5m])
            /
            rate(rate_limit_requests_total[5m])
          ) > 0.50
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Rate limit rejection rate is high"
          description: "{{ $labels.resource }} rejection rate is {{ $value | humanizePercentage }}"

      # 限流延迟过高
      - alert: HighLatency
        expr: |
          histogram_quantile(0.99, rate(rate_limit_latency_seconds_bucket[5m])) > 0.010
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Rate limit latency is high"
          description: "P99 latency is {{ $value }}s"

      # Redis连接失败
      - alert: RedisConnectionFailed
        expr: |
          rate(redis_operations_total{status="error"}[5m]) > 0.01
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Redis connection is failing"
          description: "Redis error rate is {{ $value }}/s"

      # 熔断器打开
      - alert: CircuitBreakerOpen
        expr: circuit_breaker_state == 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Circuit breaker is open"
          description: "{{ $labels.resource }} circuit breaker is open"

      # 热点key数量过多
      - alert: TooManyHotKeys
        expr: rate_limit_hot_keys_count > 1000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Too many hot keys"
          description: "Hot keys count is {{ $value }}"

面试问答

令牌桶和漏桶的区别是什么?应该选择哪个?

答案:

对比维度令牌桶漏桶
流出速率可变(有令牌就放行)固定(匀速流出)
突发流量支持(桶内令牌可一次性用完)不支持(严格限速)
实现复杂度简单(懒加载计算令牌)中等(需要队列)
适用场景API网关、用户限流保护后端、流量整形

选择建议:

  • API网关、面向用户的服务:令牌桶(允许突发,用户体验好)
  • 保护数据库、消息队列等后端资源:漏桶(严格控流,保护后端)

代码示例:

// 令牌桶:允许突发
limiter := NewTokenBucketLimiter(100, 50)  // 容量100,每秒生成50个
// 前100个请求瞬间通过(消耗桶内令牌)

// 漏桶:严格限速
limiter := NewLeakyBucketLimiter(100, 10*time.Millisecond)  // 每10ms流出1个
// 请求匀速流出,绝对平滑

如何解决分布式限流的一致性问题?

答案:

问题场景:

  • 10台服务器,总限流100 QPS
  • 每台服务器单独限流10 QPS
  • 问题:流量不均,可能导致总QPS超过100

解决方案:

方案1: Redis集中限流(推荐)

// 所有服务器共享Redis计数器
func (r *RedisRateLimiter) Allow(ctx context.Context, key string) bool {
    // Lua脚本保证原子性
    return redis.Eval(`
        local count = redis.call('INCR', KEYS[1])
        if count == 1 then
            redis.call('EXPIRE', KEYS[1], 1)
        end
        return count <= 100
    `, key)
}

优点:

  • 全局精准限流
  • 实现简单

缺点:

  • 依赖Redis
  • 网络延迟(~1ms)

方案2: 多级限流(本地+Redis)

L1: 本地限流(每台15 QPS,冗余50%)
L2: Redis限流(全局100 QPS)

流程:
1. 本地限流快速拦截(< 0.01ms)
2. 通过本地限流后,再检查Redis
3. Redis故障时降级到本地限流

效果对比:

方案延迟准确度可用性
纯本地< 0.01ms60%99.99%
纯Redis~1ms100%99.9%
多级限流~0.5ms95%99.99%

固定窗口的临界问题如何解决?

答案:

临界问题:

窗口1: 00:00:00 - 00:00:59  (100个请求) 
窗口2: 00:01:00 - 00:01:59  (100个请求) 

问题:00:00:30 - 00:01:30 实际200个请求 

解决方案1: 滑动窗口

// 使用Redis Sorted Set
func (l *SlidingWindowLimiter) Allow() bool {
    now := time.Now().UnixMilli()
    windowStart := now - 1000  // 1秒窗口

    // 清理过期数据
    redis.ZRemRangeByScore(key, 0, windowStart)

    // 统计当前窗口请求数
    count := redis.ZCard(key)

    if count < limit {
        redis.ZAdd(key, now, now)
        return true
    }
    return false
}

解决方案2: 滑动日志

// 记录每个请求的时间戳
type SlidingLogLimiter struct {
    requests []time.Time  // 请求时间戳列表
}

func (l *SlidingLogLimiter) Allow() bool {
    now := time.Now()

    // 清理1秒前的请求
    cutoff := now.Add(-time.Second)
    l.requests = filterAfter(l.requests, cutoff)

    if len(l.requests) < limit {
        l.requests = append(l.requests, now)
        return true
    }
    return false
}

权衡:

  • 滑动窗口:精度高,但内存占用大
  • 固定窗口+冗余:简单,限流值设为80(冗余20%)

Redis限流的Lua脚本为什么必须用?不用会怎样?

答案:

不用Lua的问题:

//  错误示例:非原子操作
func (r *RedisRateLimiter) Allow() bool {
    count, _ := redis.Get(key)                    // 1. 读
    if count < limit {
        redis.Incr(key)                            // 2. 写
        return true
    }
    return false
}

// 问题:并发时出现竞态条件
// 时刻1: 线程A读到count=99
// 时刻2: 线程B读到count=99
// 时刻3: 线程A写count=100 
// 时刻4: 线程B写count=100  (超限!)

使用Lua的好处:

--  正确示例:原子操作
local count = redis.call('GET', KEYS[1]) or 0
if tonumber(count) < tonumber(ARGV[1]) then
    redis.call('INCR', KEYS[1])
    return 1
else
    return 0
end

原子性保证:

  1. Lua脚本在Redis中单线程执行
  2. 执行期间不会被其他命令打断
  3. 读-判断-写三步合一,绝对原子

性能对比:

方案网络往返竞态条件性能
不用Lua2次(GET + INCR)有慢
用Lua1次(EVAL)无快

热点key怎么处理?比如秒杀场景

答案:

问题:

秒杀场景:
- 商品ID: item_12345
- 100万用户同时抢购
- Redis单key压力:100万 QPS 

瓶颈:
1. Redis单key吞吐有限(~10万QPS)
2. 网络带宽打满
3. CPU单核打满(Redis单线程)

解决方案:

方案1: 本地缓存热点数据

type HotKeyCache struct {
    local  *sync.Map          // 本地缓存
    redis  *redis.Client
    stats  map[string]int64   // 访问统计
}

func (h *HotKeyCache) Get(key string) (int64, error) {
    // 检测热点
    if h.isHot(key) {
        // 使用本地缓存
        if val, ok := h.local.Load(key); ok {
            return val.(int64), nil
        }
    }

    // 从Redis获取
    val, err := h.redis.Get(ctx, key).Int64()
    if err != nil {
        return 0, err
    }

    // 缓存热点key
    if h.isHot(key) {
        h.local.Store(key, val)
    }

    return val, nil
}

方案2: key分片

// 将热点key分散到多个key
func (r *RedisRateLimiter) AllowWithSharding(key string, shards int) bool {
    // 随机选择一个分片
    shard := rand.Intn(shards)
    shardKey := fmt.Sprintf("%s:shard:%d", key, shard)

    // 限流值也要分片
    limitPerShard := limit / shards

    return r.Allow(shardKey, limitPerShard)
}

// 示例:
// 原始:item_12345 (100万QPS) 
// 分片:item_12345:shard:0 (10万QPS) 
//      item_12345:shard:1 (10万QPS) 
//      ...
//      item_12345:shard:9 (10万QPS) 

效果对比:

方案Redis压力延迟准确度
无优化100万QPS5ms100%
本地缓存10万QPS0.1ms90%
key分片10万QPS1ms95%

如何防止限流被绕过?

答案:

攻击场景:

  1. 修改User-Agent绕过识别
  2. 使用代理IP池
  3. 分布式爬虫
  4. 恶意API调用

防护方案:

1. 多维度限流

// 同时限制多个维度
type MultiDimensionLimiter struct {
    userLimiter *RateLimiter  // 用户维度: 100/s
    ipLimiter   *RateLimiter  // IP维度: 1000/s
    apiLimiter  *RateLimiter  // API维度: 10000/s
}

func (m *MultiDimensionLimiter) Allow(user string, ip string, api string) bool {
    // 任一维度触发限流则拒绝
    return m.userLimiter.Allow(user) &&
           m.ipLimiter.Allow(ip) &&
           m.apiLimiter.Allow(api)
}

2. 设备指纹

// 生成设备唯一标识
func generateDeviceFingerprint(r *http.Request) string {
    // 组合多个因子
    factors := []string{
        r.Header.Get("User-Agent"),
        r.Header.Get("Accept-Language"),
        r.Header.Get("Accept-Encoding"),
        getIP(r),
        // Canvas指纹、WebGL指纹等
    }

    hash := sha256.Sum256([]byte(strings.Join(factors, "|")))
    return hex.EncodeToString(hash[:])
}

// 基于设备指纹限流
limiter.Allow("device:" + fingerprint)

限流规则如何动态调整?不重启服务

答案:

方案: 配置中心(推荐)

// 使用etcd/Consul/Nacos
type DynamicRuleManager struct {
    client     *etcd.Client
    rules      sync.Map         // 本地规则缓存
    watchChan  chan *RateLimitRule
}

// 启动监听
func (d *DynamicRuleManager) Watch() {
    watchChan := d.client.Watch(ctx, "/ratelimit/rules/", clientv3.WithPrefix())

    for resp := range watchChan {
        for _, ev := range resp.Events {
            switch ev.Type {
            case mvccpb.PUT:
                // 规则更新
                rule := parseRule(ev.Kv.Value)
                d.rules.Store(rule.Resource, rule)
                log.Printf("Rule updated: %s", rule.Resource)

            case mvccpb.DELETE:
                // 规则删除
                d.rules.Delete(string(ev.Kv.Key))
                log.Printf("Rule deleted: %s", ev.Kv.Key)
            }
        }
    }
}

网关限流和服务限流有什么区别?都需要吗?

答案:

对比分析

维度网关限流服务限流
位置入口处(统一网关)服务内部
目的保护整个系统保护单个服务
粒度粗粒度(API级别)细粒度(方法级别)
识别基于IP/Token基于用户/租户
降级返回429服务降级/熔断

推荐策略:

  1. 都需要:多层防护,层层拦截
  2. 网关限流:粗粒度保护,防止恶意流量
  3. 服务限流:细粒度控制,业务逻辑保护

限流失败后应该返回什么错误码?如何引导用户?

答案:

HTTP状态码选择

** 正确做法**:

  • 返回429 Too Many Requests(RFC 6585)

完整响应示例

func (m *RateLimitMiddleware) handleRateLimitExceeded(w http.ResponseWriter, result *CheckResult) {
    // 设置状态码
    w.WriteHeader(http.StatusTooManyRequests)  // 429

    // 设置响应头
    w.Header().Set("Content-Type", "application/json")
    w.Header().Set("X-RateLimit-Limit", "100")
    w.Header().Set("X-RateLimit-Remaining", "0")
    w.Header().Set("X-RateLimit-Reset", result.ResetTime.Format(time.RFC3339))
    w.Header().Set("Retry-After", strconv.Itoa(int(result.RetryAfter.Seconds())))

    // 返回友好的错误信息
    response := map[string]interface{}{
        "error": "rate_limit_exceeded",
        "message": "请求过于频繁,请稍后再试",
        "details": map[string]interface{}{
            "limit":       100,
            "remaining":   0,
            "reset_time":  result.ResetTime.Unix(),
            "retry_after": int(result.RetryAfter.Seconds()),
        },
        "help_url": "https://docs.example.com/rate-limits",
    }

    json.NewEncoder(w).Encode(response)
}

如何测试限流系统的正确性?

答案:

测试策略

1. 单元测试(算法正确性)

func TestTokenBucket(t *testing.T) {
    limiter := NewTokenBucketLimiter(10, 10)  // 容量10,速率10/s

    // 测试1: 正常情况
    for i := 0; i < 10; i++ {
        assert.True(t, limiter.Allow(), "Should allow first 10 requests")
    }

    // 测试2: 超过容量
    assert.False(t, limiter.Allow(), "Should reject 11th request")

    // 测试3: 等待恢复
    time.Sleep(1 * time.Second)
    for i := 0; i < 10; i++ {
        assert.True(t, limiter.Allow(), "Should allow after 1 second")
    }
}

2. 压力测试(性能验证)

func BenchmarkRateLimiter(b *testing.B) {
    limiter := NewRedisRateLimiter(redisClient)
    ctx := context.Background()

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            limiter.Allow(ctx, "bench", "user:1", 10000, 10000)
        }
    })

    // 输出: BenchmarkRateLimiter-8  1000000  1234 ns/op
}

Prev
05 - 分布式 ID 生成器设计
Next
第7章:搜索引擎设计