HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 分布式架构模式

    • 分布式架构模式手册
    • 第1章:分布式一致性
    • 第2章:分布式锁
    • 第3章:分布式协调
    • 第4章:服务发现与注册
    • 第5章:负载均衡
    • 第6章:熔断降级
    • 第7章:DDD领域驱动设计
    • 第8章:CQRS与Event Sourcing

第6章:熔断降级

为什么需要熔断降级

雪崩效应(Cascading Failure)

场景:微服务调用链故障传播

用户请求 → 订单服务 → 库存服务 → 数据库
                 ↓
             支付服务
                 ↓
             优惠券服务

库存服务故障(响应慢/超时):
1. 订单服务线程等待库存服务响应
2. 大量请求堆积,订单服务线程池耗尽
3. 订单服务不可用,影响用户请求
4. 用户请求失败,继续重试,加剧压力
5. 整个系统崩溃 

雪崩效应示意图:

正常情况:
User → Order Service (50ms)
         ↓
       Stock Service (20ms)
         ↓
       Database (10ms)

总响应时间: 80ms 

故障情况(Stock Service响应慢):
User → Order Service
         ↓ (等待3秒超时)
       Stock Service (响应慢/不响应)

问题:
- Order Service线程被占用3秒
- 线程池(100个线程)
- QPS=1000 → 需要3000个线程 
- 线程池耗尽 → 订单服务不可用
- 影响面扩大 → 整个系统崩溃

容错机制的目标

目标:
 快速失败(Fail Fast)
 资源隔离(Isolation)
 自动恢复(Auto Recovery)
 降级可用(Degrade Gracefully)

熔断器模式

状态机

三种状态:

┌─────────────────────────────────────────┐
│          Closed (关闭状态)              │
│      正常请求,统计失败率               │
└─────────────────────────────────────────┘
              ↓ 失败率 ≥ 阈值
┌─────────────────────────────────────────┐
│          Open (打开状态)                │
│      拒绝所有请求,快速失败             │
└─────────────────────────────────────────┘
              ↓ 等待超时(如30秒)
┌─────────────────────────────────────────┐
│        Half-Open (半开状态)             │
│      允许少量请求通过,测试恢复         │
└─────────────────────────────────────────┘
       ↓ 成功           ↓ 失败
    Closed            Open

代码实现

基础熔断器:

package main

import (
    "errors"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    maxRequests  uint32        // Half-Open状态下的最大请求数
    interval     time.Duration // 统计周期
    timeout      time.Duration // Open状态持续时间
    readyToTrip  func(counts Counts) bool // 判断是否打开熔断器

    state      State
    counts     Counts
    expiry     time.Time
    generation uint64
    mu         sync.Mutex
}

type Counts struct {
    Requests             uint32
    TotalSuccesses       uint32
    TotalFailures        uint32
    ConsecutiveSuccesses uint32
    ConsecutiveFailures  uint32
}

func NewCircuitBreaker() *CircuitBreaker {
    return &CircuitBreaker{
        maxRequests: 3,
        interval:    10 * time.Second,
        timeout:     30 * time.Second,
        readyToTrip: func(counts Counts) bool {
            // 失败率 ≥ 50% 或 连续失败 ≥ 5次
            failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
            return counts.Requests >= 10 && (failureRatio >= 0.5 || counts.ConsecutiveFailures >= 5)
        },
        state: StateClosed,
    }
}

func (cb *CircuitBreaker) Call(req func() (interface{}, error)) (interface{}, error) {
    // 1. 检查是否允许请求
    generation, err := cb.beforeRequest()
    if err != nil {
        return nil, err
    }

    // 2. 执行请求
    result, err := req()

    // 3. 记录请求结果
    cb.afterRequest(generation, err == nil)

    return result, err
}

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()
    state, generation := cb.currentState(now)

    if state == StateOpen {
        return generation, errors.New("circuit breaker is open")
    } else if state == StateHalfOpen {
        // Half-Open状态,限制请求数
        if cb.counts.Requests >= cb.maxRequests {
            return generation, errors.New("too many requests in half-open state")
        }
    }

    cb.counts.Requests++
    return generation, nil
}

func (cb *CircuitBreaker) afterRequest(generation uint64, success bool) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()
    state, currentGeneration := cb.currentState(now)

    // 如果generation不匹配,说明状态已变化,忽略此结果
    if generation != currentGeneration {
        return
    }

    if success {
        cb.onSuccess(state, now)
    } else {
        cb.onFailure(state, now)
    }
}

func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
    cb.counts.TotalSuccesses++
    cb.counts.ConsecutiveSuccesses++
    cb.counts.ConsecutiveFailures = 0

    if state == StateHalfOpen {
        // Half-Open状态下,连续成功次数达到阈值,关闭熔断器
        if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
            cb.setState(StateClosed, now)
        }
    }
}

func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
    cb.counts.TotalFailures++
    cb.counts.ConsecutiveFailures++
    cb.counts.ConsecutiveSuccesses = 0

    if state == StateHalfOpen || cb.readyToTrip(cb.counts) {
        // 打开熔断器
        cb.setState(StateOpen, now)
    }
}

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
    switch cb.state {
    case StateClosed:
        // 检查是否需要重置统计
        if !cb.expiry.IsZero() && cb.expiry.Before(now) {
            cb.resetCounts(now)
        }

    case StateOpen:
        // 检查是否可以进入Half-Open状态
        if cb.expiry.Before(now) {
            cb.setState(StateHalfOpen, now)
        }
    }

    return cb.state, cb.generation
}

func (cb *CircuitBreaker) setState(state State, now time.Time) {
    if cb.state == state {
        return
    }

    prev := cb.state
    cb.state = state
    cb.generation++
    cb.resetCounts(now)

    switch state {
    case StateClosed:
        cb.expiry = now.Add(cb.interval)
    case StateOpen:
        cb.expiry = now.Add(cb.timeout)
    case StateHalfOpen:
        cb.expiry = time.Time{}
    }

    log.Printf("Circuit breaker state changed: %v → %v", prev, state)
}

func (cb *CircuitBreaker) resetCounts(now time.Time) {
    cb.counts = Counts{}
}

// 使用示例
func main() {
    cb := NewCircuitBreaker()

    // 模拟调用
    for i := 0; i < 100; i++ {
        result, err := cb.Call(func() (interface{}, error) {
            // 模拟外部服务调用
            if rand.Float64() < 0.3 {
                return nil, errors.New("service error")
            }
            return "success", nil
        })

        if err != nil {
            log.Printf("Request %d failed: %v", i+1, err)
        } else {
            log.Printf("Request %d success: %v", i+1, result)
        }

        time.Sleep(100 * time.Millisecond)
    }
}

熔断器关键参数

参数说明建议值
failureThreshold失败率阈值50%~80%
requestVolumeThreshold最小请求数10~20
sleepWindowOpen状态持续时间30~60秒
successThresholdHalf-Open成功次数3~5次

降级策略

1. 返回默认值

场景:推荐系统故障,返回默认推荐

type RecommendService struct {
    client          *http.Client
    circuitBreaker  *CircuitBreaker
}

func (rs *RecommendService) GetRecommendations(userID string) ([]string, error) {
    result, err := rs.circuitBreaker.Call(func() (interface{}, error) {
        // 调用推荐服务
        return rs.callRecommendAPI(userID)
    })

    if err != nil {
        // 降级:返回默认推荐(热门商品)
        log.Printf("Recommend service unavailable, using default: %v", err)
        return []string{"item1", "item2", "item3"}, nil
    }

    return result.([]string), nil
}

2. 返回缓存数据

场景:服务不可用,返回缓存

type ProductService struct {
    client         *http.Client
    cache          *redis.Client
    circuitBreaker *CircuitBreaker
}

func (ps *ProductService) GetProduct(productID string) (*Product, error) {
    result, err := ps.circuitBreaker.Call(func() (interface{}, error) {
        return ps.callProductAPI(productID)
    })

    if err != nil {
        // 降级:返回缓存数据
        log.Printf("Product service unavailable, using cache: %v", err)

        cachedProduct, cacheErr := ps.getFromCache(productID)
        if cacheErr == nil {
            return cachedProduct, nil
        }

        return nil, errors.New("service unavailable and no cache")
    }

    product := result.(*Product)

    // 更新缓存
    ps.updateCache(productID, product)

    return product, nil
}

3. 快速失败

场景:非核心功能,直接失败

type CommentService struct {
    client         *http.Client
    circuitBreaker *CircuitBreaker
}

func (cs *CommentService) GetComments(articleID string) ([]Comment, error) {
    result, err := cs.circuitBreaker.Call(func() (interface{}, error) {
        return cs.callCommentAPI(articleID)
    })

    if err != nil {
        // 降级:评论功能不可用,返回空(不影响文章阅读)
        log.Printf("Comment service unavailable: %v", err)
        return []Comment{}, nil
    }

    return result.([]Comment), nil
}

4. 异步处理

场景:同步改异步,提高响应速度

type OrderService struct {
    messageQueue   *kafka.Producer
    circuitBreaker *CircuitBreaker
}

func (os *OrderService) CreateOrder(order *Order) error {
    err := os.circuitBreaker.Call(func() (interface{}, error) {
        // 同步创建订单
        return nil, os.createOrderSync(order)
    })

    if err != nil {
        // 降级:异步创建订单
        log.Printf("Sync order creation failed, using async: %v", err)

        // 发送消息到MQ,异步处理
        message, _ := json.Marshal(order)
        os.messageQueue.Send("order.create", message)

        return nil // 立即返回成功
    }

    return err.(error)
}

限流保护

令牌桶算法

原理:按固定速率生成令牌,请求消费令牌

type TokenBucketLimiter struct {
    capacity   int64     // 桶容量
    tokens     int64     // 当前令牌数
    rate       int64     // 生成速率(令牌/秒)
    lastTime   time.Time
    mu         sync.Mutex
}

func NewTokenBucketLimiter(capacity, rate int64) *TokenBucketLimiter {
    return &TokenBucketLimiter{
        capacity: capacity,
        tokens:   capacity,
        rate:     rate,
        lastTime: time.Now(),
    }
}

func (tbl *TokenBucketLimiter) Allow() bool {
    tbl.mu.Lock()
    defer tbl.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(tbl.lastTime)

    // 生成新令牌
    newTokens := int64(elapsed.Seconds() * float64(tbl.rate))
    tbl.tokens = min(tbl.capacity, tbl.tokens+newTokens)
    tbl.lastTime = now

    // 消费令牌
    if tbl.tokens >= 1 {
        tbl.tokens--
        return true
    }

    return false
}

func min(a, b int64) int64 {
    if a < b {
        return a
    }
    return b
}

// 集成熔断器和限流器
type ProtectedService struct {
    circuitBreaker *CircuitBreaker
    limiter        *TokenBucketLimiter
}

func (ps *ProtectedService) Call(req func() (interface{}, error)) (interface{}, error) {
    // 1. 限流检查
    if !ps.limiter.Allow() {
        return nil, errors.New("rate limit exceeded")
    }

    // 2. 熔断器检查 + 执行
    return ps.circuitBreaker.Call(req)
}

漏桶算法

原理:请求匀速流出,平滑突发流量

type LeakyBucketLimiter struct {
    capacity   int64     // 桶容量
    queue      int64     // 当前队列长度
    rate       int64     // 漏出速率
    lastTime   time.Time
    mu         sync.Mutex
}

func NewLeakyBucketLimiter(capacity, rate int64) *LeakyBucketLimiter {
    lbl := &LeakyBucketLimiter{
        capacity: capacity,
        rate:     rate,
        lastTime: time.Now(),
    }

    // 启动漏桶
    go lbl.leak()

    return lbl
}

func (lbl *LeakyBucketLimiter) leak() {
    ticker := time.NewTicker(time.Second / time.Duration(lbl.rate))
    defer ticker.Stop()

    for range ticker.C {
        lbl.mu.Lock()
        if lbl.queue > 0 {
            lbl.queue--
        }
        lbl.mu.Unlock()
    }
}

func (lbl *LeakyBucketLimiter) Allow() bool {
    lbl.mu.Lock()
    defer lbl.mu.Unlock()

    if lbl.queue < lbl.capacity {
        lbl.queue++
        return true
    }

    return false
}

主流框架

1. Hystrix(Netflix)

特点:

  • 熔断器模式
  • 线程池隔离
  • 信号量隔离
  • 请求缓存
  • 实时监控(Hystrix Dashboard)

注意:已停止维护(2018年),但思想仍广泛使用

2. Sentinel(阿里)

特点:

  • 流量控制
  • 熔断降级
  • 系统负载保护
  • 热点参数限流
  • 实时监控控制台

Go SDK示例:

package main

import (
    sentinel "github.com/alibaba/sentinel-golang/api"
    "github.com/alibaba/sentinel-golang/core/circuitbreaker"
    "github.com/alibaba/sentinel-golang/core/flow"
)

func initSentinel() {
    // 初始化
    err := sentinel.InitDefault()
    if err != nil {
        log.Fatal(err)
    }

    // 配置流控规则
    _, err = flow.LoadRules([]*flow.Rule{
        {
            Resource:               "order_service",
            TokenCalculateStrategy: flow.Direct,
            ControlBehavior:        flow.Reject,
            Threshold:              1000, // QPS限制
            StatIntervalInMs:       1000,
        },
    })

    // 配置熔断规则
    _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
        {
            Resource:         "order_service",
            Strategy:         circuitbreaker.ErrorRatio,
            RetryTimeoutMs:   30000, // 30秒后重试
            MinRequestAmount: 10,
            StatIntervalMs:   10000,
            Threshold:        0.5, // 失败率50%
        },
    })
}

func callOrderService() error {
    // Sentinel保护
    entry, err := sentinel.Entry("order_service")
    if err != nil {
        // 被限流或熔断
        return errors.New("service unavailable")
    }
    defer entry.Exit()

    // 业务逻辑
    result, err := doOrderServiceCall()
    if err != nil {
        // 标记失败(用于熔断统计)
        sentinel.TraceError(entry, err)
        return err
    }

    return nil
}

3. Resilience4j(Spring Cloud推荐)

特点:

  • 轻量级(无外部依赖)
  • 模块化设计
  • 函数式编程
  • Spring Boot集成

实战案例

完整电商系统容错架构

场景:订单服务调用多个下游服务

package main

import (
    "context"
    "encoding/json"
    "errors"
    "log"
    "net/http"
    "time"
)

// 订单服务
type OrderService struct {
    stockService   *ProtectedStockService
    paymentService *ProtectedPaymentService
    couponService  *ProtectedCouponService
}

// 库存服务(保护)
type ProtectedStockService struct {
    circuitBreaker *CircuitBreaker
    limiter        *TokenBucketLimiter
    cache          *StockCache
}

func (pss *ProtectedStockService) CheckStock(productID string, quantity int) (bool, error) {
    // 1. 限流检查
    if !pss.limiter.Allow() {
        log.Println("Stock service rate limited")
        return false, errors.New("rate limit exceeded")
    }

    // 2. 熔断器保护
    result, err := pss.circuitBreaker.Call(func() (interface{}, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
        defer cancel()

        return pss.callStockAPI(ctx, productID, quantity)
    })

    if err != nil {
        // 3. 降级:使用缓存
        log.Printf("Stock service unavailable, using cache: %v", err)
        cachedStock, cacheErr := pss.cache.GetStock(productID)
        if cacheErr == nil && cachedStock >= quantity {
            return true, nil
        }

        return false, err
    }

    stock := result.(bool)

    // 4. 更新缓存
    if stock {
        pss.cache.SetStock(productID, quantity)
    }

    return stock, nil
}

// 支付服务(保护)
type ProtectedPaymentService struct {
    circuitBreaker *CircuitBreaker
    limiter        *TokenBucketLimiter
    messageQueue   *MessageQueue
}

func (pps *ProtectedPaymentService) CreatePayment(orderID string, amount float64) error {
    // 1. 限流检查
    if !pps.limiter.Allow() {
        // 降级:异步支付
        log.Println("Payment service rate limited, using async")
        return pps.asyncPayment(orderID, amount)
    }

    // 2. 熔断器保护
    _, err := pps.circuitBreaker.Call(func() (interface{}, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        return nil, pps.callPaymentAPI(ctx, orderID, amount)
    })

    if err != nil {
        // 3. 降级:异步支付
        log.Printf("Payment service unavailable, using async: %v", err)
        return pps.asyncPayment(orderID, amount)
    }

    return nil
}

func (pps *ProtectedPaymentService) asyncPayment(orderID string, amount float64) error {
    message := map[string]interface{}{
        "order_id": orderID,
        "amount":   amount,
        "time":     time.Now(),
    }

    data, _ := json.Marshal(message)
    return pps.messageQueue.Send("payment.create", data)
}

// 优惠券服务(保护)
type ProtectedCouponService struct {
    circuitBreaker *CircuitBreaker
    limiter        *TokenBucketLimiter
}

func (pcs *ProtectedCouponService) CalculateDiscount(userID string, amount float64) (float64, error) {
    // 1. 限流检查
    if !pcs.limiter.Allow() {
        // 降级:无优惠(不影响核心流程)
        log.Println("Coupon service rate limited, no discount")
        return 0, nil
    }

    // 2. 熔断器保护
    result, err := pcs.circuitBreaker.Call(func() (interface{}, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()

        return pcs.callCouponAPI(ctx, userID, amount)
    })

    if err != nil {
        // 3. 降级:无优惠(优惠券非核心功能)
        log.Printf("Coupon service unavailable, no discount: %v", err)
        return 0, nil
    }

    return result.(float64), nil
}

// 创建订单(编排调用)
func (os *OrderService) CreateOrder(req *OrderRequest) (*OrderResponse, error) {
    // 1. 检查库存(核心,失败则终止)
    hasStock, err := os.stockService.CheckStock(req.ProductID, req.Quantity)
    if err != nil || !hasStock {
        return nil, errors.New("insufficient stock or service unavailable")
    }

    // 2. 计算优惠(非核心,失败也继续)
    discount, _ := os.couponService.CalculateDiscount(req.UserID, req.Amount)

    // 3. 创建支付(核心,但支持降级)
    finalAmount := req.Amount - discount
    err = os.paymentService.CreatePayment(req.OrderID, finalAmount)
    if err != nil {
        log.Printf("Payment creation failed, but order accepted: %v", err)
        // 支付异步处理,订单仍然创建成功
    }

    return &OrderResponse{
        OrderID:  req.OrderID,
        Amount:   finalAmount,
        Discount: discount,
        Status:   "pending_payment",
    }, nil
}

// 监控指标
type CircuitBreakerMetrics struct {
    Resource       string
    State          string
    TotalRequests  uint32
    SuccessRate    float64
    FailureRate    float64
    LastStateChange time.Time
}

func (cb *CircuitBreaker) GetMetrics() *CircuitBreakerMetrics {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    var stateStr string
    switch cb.state {
    case StateClosed:
        stateStr = "CLOSED"
    case StateOpen:
        stateStr = "OPEN"
    case StateHalfOpen:
        stateStr = "HALF_OPEN"
    }

    successRate := 0.0
    failureRate := 0.0
    if cb.counts.Requests > 0 {
        successRate = float64(cb.counts.TotalSuccesses) / float64(cb.counts.Requests)
        failureRate = float64(cb.counts.TotalFailures) / float64(cb.counts.Requests)
    }

    return &CircuitBreakerMetrics{
        State:          stateStr,
        TotalRequests:  cb.counts.Requests,
        SuccessRate:    successRate,
        FailureRate:    failureRate,
        LastStateChange: cb.expiry,
    }
}

// HTTP监控端点
func metricsHandler(services map[string]*CircuitBreaker) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        metrics := make(map[string]*CircuitBreakerMetrics)

        for name, cb := range services {
            metrics[name] = cb.GetMetrics()
        }

        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(metrics)
    }
}

面试问答

熔断器的三种状态如何转换?

答案:

状态转换流程:

1. Closed → Open:
   条件:失败率 ≥ 阈值(如50%)
   示例:10个请求中5个失败 → Open

2. Open → Half-Open:
   条件:等待超时(如30秒)
   说明:给服务恢复的时间

3. Half-Open → Closed:
   条件:连续成功次数达到阈值(如3次)
   说明:服务已恢复

4. Half-Open → Open:
   条件:任意请求失败
   说明:服务仍未恢复

关键参数:
- failureThreshold: 50%~80%
- requestVolumeThreshold: 10~20
- sleepWindow: 30~60秒
- successThreshold: 3~5次

熔断和降级有什么区别?

答案:

对比维度熔断降级
触发条件下游服务故障(被动)系统负载高/主动决策
目的保护系统不被拖垮保证核心功能可用
作用对象特定服务/接口非核心功能
恢复机制自动恢复(Half-Open)手动恢复
实现方式熔断器模式返回默认值/缓存/快速失败

示例:

// 熔断:下游服务故障,自动拒绝请求
if circuitBreakerOpen {
    return errors.New("service unavailable")
}

// 降级:系统负载高,主动降级非核心功能
if systemLoad > 0.8 {
    return defaultRecommendations // 推荐系统降级
}

如何选择线程池隔离还是信号量隔离?

答案:

线程池隔离:

优点:
 完全隔离(不影响主线程池)
 支持超时控制
 支持异步执行

缺点:
 线程上下文切换开销
 资源消耗大

适用场景:
- 慢服务(响应时间不可控)
- 需要超时控制
- 非核心服务

信号量隔离:

优点:
 轻量级(无线程切换)
 性能高

缺点:
 不支持超时控制
 同步执行

适用场景:
- 快速服务(响应时间可控)
- 性能要求高
- 内部服务调用

代码示例:

// 线程池隔离
type ThreadPoolIsolation struct {
    workerPool chan struct{}
}

func (tpi *ThreadPoolIsolation) Execute(task func()) error {
    select {
    case tpi.workerPool <- struct{}{}:
        defer func() { <-tpi.workerPool }()
        task()
        return nil
    default:
        return errors.New("thread pool full")
    }
}

// 信号量隔离
type SemaphoreIsolation struct {
    semaphore chan struct{}
}

func (si *SemaphoreIsolation) Execute(task func()) error {
    select {
    case si.semaphore <- struct{}{}:
        defer func() { <-si.semaphore }()
        task()
        return nil
    default:
        return errors.New("semaphore full")
    }
}

如何设计降级策略?

答案:

降级分级:

P0(核心功能):
- 订单创建、支付
- 不降级,必须保证可用

P1(重要功能):
- 用户登录、商品详情
- 降级策略:返回缓存

P2(一般功能):
- 推荐系统、评论
- 降级策略:返回默认值

P3(非核心功能):
- 积分、优惠券
- 降级策略:快速失败(直接不可用)

降级开关:

type FeatureSwitch struct {
    switches map[string]bool
    mu       sync.RWMutex
}

func (fs *FeatureSwitch) IsEnabled(feature string) bool {
    fs.mu.RLock()
    defer fs.mu.RUnlock()

    enabled, ok := fs.switches[feature]
    return ok && enabled
}

// 使用示例
func (os *OrderService) CreateOrder(req *OrderRequest) (*OrderResponse, error) {
    // 核心功能:库存检查(不降级)
    hasStock, err := os.stockService.CheckStock(req.ProductID, req.Quantity)
    if err != nil || !hasStock {
        return nil, errors.New("insufficient stock")
    }

    // 非核心功能:优惠券(可降级)
    var discount float64
    if os.featureSwitch.IsEnabled("coupon") {
        discount, _ = os.couponService.CalculateDiscount(req.UserID, req.Amount)
    } else {
        log.Println("Coupon feature is disabled (degraded)")
        discount = 0
    }

    // ...
}

动态配置(Apollo/Nacos):

# 配置中心
degradation:
  coupon:
    enabled: false  # 优惠券功能降级
    reason: "系统负载过高"
  recommend:
    enabled: false
    default_items: ["item1", "item2", "item3"]

如何监控熔断器状态?

答案:

1. 指标收集:

type CircuitBreakerMetrics struct {
    // 基础指标
    State              string    // 状态
    TotalRequests      uint32    // 总请求数
    SuccessCount       uint32    // 成功数
    FailureCount       uint32    // 失败数
    SuccessRate        float64   // 成功率
    FailureRate        float64   // 失败率

    // 时间指标
    LastStateChange    time.Time // 上次状态变更时间
    OpenDuration       time.Duration // Open状态持续时间

    // Half-Open指标
    HalfOpenRequests   uint32
    HalfOpenSuccesses  uint32
}

2. Prometheus指标:

import "github.com/prometheus/client_golang/prometheus"

var (
    circuitBreakerState = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "circuit_breaker_state",
            Help: "Circuit breaker state (0=closed, 1=open, 2=half-open)",
        },
        []string{"resource"},
    )

    circuitBreakerRequests = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "circuit_breaker_requests_total",
            Help: "Total number of requests",
        },
        []string{"resource", "result"},
    )
)

func (cb *CircuitBreaker) recordMetrics(resource string) {
    // 记录状态
    var stateValue float64
    switch cb.state {
    case StateClosed:
        stateValue = 0
    case StateOpen:
        stateValue = 1
    case StateHalfOpen:
        stateValue = 2
    }
    circuitBreakerState.WithLabelValues(resource).Set(stateValue)

    // 记录请求
    circuitBreakerRequests.WithLabelValues(resource, "success").Add(float64(cb.counts.TotalSuccesses))
    circuitBreakerRequests.WithLabelValues(resource, "failure").Add(float64(cb.counts.TotalFailures))
}

3. 告警规则:

groups:
  - name: circuit_breaker
    rules:
      - alert: CircuitBreakerOpen
        expr: circuit_breaker_state > 0
        for: 1m
        annotations:
          summary: "Circuit breaker is open for {{ $labels.resource }}"

      - alert: HighFailureRate
        expr: |
          rate(circuit_breaker_requests_total{result="failure"}[1m])
          /
          rate(circuit_breaker_requests_total[1m])
          > 0.5
        for: 5m
        annotations:
          summary: "High failure rate for {{ $labels.resource }}"

Prev
第5章:负载均衡
Next
第7章:DDD领域驱动设计