第6章:熔断降级
为什么需要熔断降级
雪崩效应(Cascading Failure)
场景:微服务调用链故障传播
用户请求 → 订单服务 → 库存服务 → 数据库
↓
支付服务
↓
优惠券服务
库存服务故障(响应慢/超时):
1. 订单服务线程等待库存服务响应
2. 大量请求堆积,订单服务线程池耗尽
3. 订单服务不可用,影响用户请求
4. 用户请求失败,继续重试,加剧压力
5. 整个系统崩溃
雪崩效应示意图:
正常情况:
User → Order Service (50ms)
↓
Stock Service (20ms)
↓
Database (10ms)
总响应时间: 80ms
故障情况(Stock Service响应慢):
User → Order Service
↓ (等待3秒超时)
Stock Service (响应慢/不响应)
问题:
- Order Service线程被占用3秒
- 线程池(100个线程)
- QPS=1000 → 需要3000个线程
- 线程池耗尽 → 订单服务不可用
- 影响面扩大 → 整个系统崩溃
容错机制的目标
目标:
快速失败(Fail Fast)
资源隔离(Isolation)
自动恢复(Auto Recovery)
降级可用(Degrade Gracefully)
熔断器模式
状态机
三种状态:
┌─────────────────────────────────────────┐
│ Closed (关闭状态) │
│ 正常请求,统计失败率 │
└─────────────────────────────────────────┘
↓ 失败率 ≥ 阈值
┌─────────────────────────────────────────┐
│ Open (打开状态) │
│ 拒绝所有请求,快速失败 │
└─────────────────────────────────────────┘
↓ 等待超时(如30秒)
┌─────────────────────────────────────────┐
│ Half-Open (半开状态) │
│ 允许少量请求通过,测试恢复 │
└─────────────────────────────────────────┘
↓ 成功 ↓ 失败
Closed Open
代码实现
基础熔断器:
package main
import (
"errors"
"sync"
"time"
)
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
maxRequests uint32 // Half-Open状态下的最大请求数
interval time.Duration // 统计周期
timeout time.Duration // Open状态持续时间
readyToTrip func(counts Counts) bool // 判断是否打开熔断器
state State
counts Counts
expiry time.Time
generation uint64
mu sync.Mutex
}
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
maxRequests: 3,
interval: 10 * time.Second,
timeout: 30 * time.Second,
readyToTrip: func(counts Counts) bool {
// 失败率 ≥ 50% 或 连续失败 ≥ 5次
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 10 && (failureRatio >= 0.5 || counts.ConsecutiveFailures >= 5)
},
state: StateClosed,
}
}
func (cb *CircuitBreaker) Call(req func() (interface{}, error)) (interface{}, error) {
// 1. 检查是否允许请求
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
// 2. 执行请求
result, err := req()
// 3. 记录请求结果
cb.afterRequest(generation, err == nil)
return result, err
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, errors.New("circuit breaker is open")
} else if state == StateHalfOpen {
// Half-Open状态,限制请求数
if cb.counts.Requests >= cb.maxRequests {
return generation, errors.New("too many requests in half-open state")
}
}
cb.counts.Requests++
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(generation uint64, success bool) {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
state, currentGeneration := cb.currentState(now)
// 如果generation不匹配,说明状态已变化,忽略此结果
if generation != currentGeneration {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
cb.counts.TotalSuccesses++
cb.counts.ConsecutiveSuccesses++
cb.counts.ConsecutiveFailures = 0
if state == StateHalfOpen {
// Half-Open状态下,连续成功次数达到阈值,关闭熔断器
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
cb.counts.TotalFailures++
cb.counts.ConsecutiveFailures++
cb.counts.ConsecutiveSuccesses = 0
if state == StateHalfOpen || cb.readyToTrip(cb.counts) {
// 打开熔断器
cb.setState(StateOpen, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
// 检查是否需要重置统计
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.resetCounts(now)
}
case StateOpen:
// 检查是否可以进入Half-Open状态
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.generation++
cb.resetCounts(now)
switch state {
case StateClosed:
cb.expiry = now.Add(cb.interval)
case StateOpen:
cb.expiry = now.Add(cb.timeout)
case StateHalfOpen:
cb.expiry = time.Time{}
}
log.Printf("Circuit breaker state changed: %v → %v", prev, state)
}
func (cb *CircuitBreaker) resetCounts(now time.Time) {
cb.counts = Counts{}
}
// 使用示例
func main() {
cb := NewCircuitBreaker()
// 模拟调用
for i := 0; i < 100; i++ {
result, err := cb.Call(func() (interface{}, error) {
// 模拟外部服务调用
if rand.Float64() < 0.3 {
return nil, errors.New("service error")
}
return "success", nil
})
if err != nil {
log.Printf("Request %d failed: %v", i+1, err)
} else {
log.Printf("Request %d success: %v", i+1, result)
}
time.Sleep(100 * time.Millisecond)
}
}
熔断器关键参数
| 参数 | 说明 | 建议值 |
|---|---|---|
| failureThreshold | 失败率阈值 | 50%~80% |
| requestVolumeThreshold | 最小请求数 | 10~20 |
| sleepWindow | Open状态持续时间 | 30~60秒 |
| successThreshold | Half-Open成功次数 | 3~5次 |
降级策略
1. 返回默认值
场景:推荐系统故障,返回默认推荐
type RecommendService struct {
client *http.Client
circuitBreaker *CircuitBreaker
}
func (rs *RecommendService) GetRecommendations(userID string) ([]string, error) {
result, err := rs.circuitBreaker.Call(func() (interface{}, error) {
// 调用推荐服务
return rs.callRecommendAPI(userID)
})
if err != nil {
// 降级:返回默认推荐(热门商品)
log.Printf("Recommend service unavailable, using default: %v", err)
return []string{"item1", "item2", "item3"}, nil
}
return result.([]string), nil
}
2. 返回缓存数据
场景:服务不可用,返回缓存
type ProductService struct {
client *http.Client
cache *redis.Client
circuitBreaker *CircuitBreaker
}
func (ps *ProductService) GetProduct(productID string) (*Product, error) {
result, err := ps.circuitBreaker.Call(func() (interface{}, error) {
return ps.callProductAPI(productID)
})
if err != nil {
// 降级:返回缓存数据
log.Printf("Product service unavailable, using cache: %v", err)
cachedProduct, cacheErr := ps.getFromCache(productID)
if cacheErr == nil {
return cachedProduct, nil
}
return nil, errors.New("service unavailable and no cache")
}
product := result.(*Product)
// 更新缓存
ps.updateCache(productID, product)
return product, nil
}
3. 快速失败
场景:非核心功能,直接失败
type CommentService struct {
client *http.Client
circuitBreaker *CircuitBreaker
}
func (cs *CommentService) GetComments(articleID string) ([]Comment, error) {
result, err := cs.circuitBreaker.Call(func() (interface{}, error) {
return cs.callCommentAPI(articleID)
})
if err != nil {
// 降级:评论功能不可用,返回空(不影响文章阅读)
log.Printf("Comment service unavailable: %v", err)
return []Comment{}, nil
}
return result.([]Comment), nil
}
4. 异步处理
场景:同步改异步,提高响应速度
type OrderService struct {
messageQueue *kafka.Producer
circuitBreaker *CircuitBreaker
}
func (os *OrderService) CreateOrder(order *Order) error {
err := os.circuitBreaker.Call(func() (interface{}, error) {
// 同步创建订单
return nil, os.createOrderSync(order)
})
if err != nil {
// 降级:异步创建订单
log.Printf("Sync order creation failed, using async: %v", err)
// 发送消息到MQ,异步处理
message, _ := json.Marshal(order)
os.messageQueue.Send("order.create", message)
return nil // 立即返回成功
}
return err.(error)
}
限流保护
令牌桶算法
原理:按固定速率生成令牌,请求消费令牌
type TokenBucketLimiter struct {
capacity int64 // 桶容量
tokens int64 // 当前令牌数
rate int64 // 生成速率(令牌/秒)
lastTime time.Time
mu sync.Mutex
}
func NewTokenBucketLimiter(capacity, rate int64) *TokenBucketLimiter {
return &TokenBucketLimiter{
capacity: capacity,
tokens: capacity,
rate: rate,
lastTime: time.Now(),
}
}
func (tbl *TokenBucketLimiter) Allow() bool {
tbl.mu.Lock()
defer tbl.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tbl.lastTime)
// 生成新令牌
newTokens := int64(elapsed.Seconds() * float64(tbl.rate))
tbl.tokens = min(tbl.capacity, tbl.tokens+newTokens)
tbl.lastTime = now
// 消费令牌
if tbl.tokens >= 1 {
tbl.tokens--
return true
}
return false
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
// 集成熔断器和限流器
type ProtectedService struct {
circuitBreaker *CircuitBreaker
limiter *TokenBucketLimiter
}
func (ps *ProtectedService) Call(req func() (interface{}, error)) (interface{}, error) {
// 1. 限流检查
if !ps.limiter.Allow() {
return nil, errors.New("rate limit exceeded")
}
// 2. 熔断器检查 + 执行
return ps.circuitBreaker.Call(req)
}
漏桶算法
原理:请求匀速流出,平滑突发流量
type LeakyBucketLimiter struct {
capacity int64 // 桶容量
queue int64 // 当前队列长度
rate int64 // 漏出速率
lastTime time.Time
mu sync.Mutex
}
func NewLeakyBucketLimiter(capacity, rate int64) *LeakyBucketLimiter {
lbl := &LeakyBucketLimiter{
capacity: capacity,
rate: rate,
lastTime: time.Now(),
}
// 启动漏桶
go lbl.leak()
return lbl
}
func (lbl *LeakyBucketLimiter) leak() {
ticker := time.NewTicker(time.Second / time.Duration(lbl.rate))
defer ticker.Stop()
for range ticker.C {
lbl.mu.Lock()
if lbl.queue > 0 {
lbl.queue--
}
lbl.mu.Unlock()
}
}
func (lbl *LeakyBucketLimiter) Allow() bool {
lbl.mu.Lock()
defer lbl.mu.Unlock()
if lbl.queue < lbl.capacity {
lbl.queue++
return true
}
return false
}
主流框架
1. Hystrix(Netflix)
特点:
- 熔断器模式
- 线程池隔离
- 信号量隔离
- 请求缓存
- 实时监控(Hystrix Dashboard)
注意:已停止维护(2018年),但思想仍广泛使用
2. Sentinel(阿里)
特点:
- 流量控制
- 熔断降级
- 系统负载保护
- 热点参数限流
- 实时监控控制台
Go SDK示例:
package main
import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
)
func initSentinel() {
// 初始化
err := sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// 配置流控规则
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "order_service",
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 1000, // QPS限制
StatIntervalInMs: 1000,
},
})
// 配置熔断规则
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
{
Resource: "order_service",
Strategy: circuitbreaker.ErrorRatio,
RetryTimeoutMs: 30000, // 30秒后重试
MinRequestAmount: 10,
StatIntervalMs: 10000,
Threshold: 0.5, // 失败率50%
},
})
}
func callOrderService() error {
// Sentinel保护
entry, err := sentinel.Entry("order_service")
if err != nil {
// 被限流或熔断
return errors.New("service unavailable")
}
defer entry.Exit()
// 业务逻辑
result, err := doOrderServiceCall()
if err != nil {
// 标记失败(用于熔断统计)
sentinel.TraceError(entry, err)
return err
}
return nil
}
3. Resilience4j(Spring Cloud推荐)
特点:
- 轻量级(无外部依赖)
- 模块化设计
- 函数式编程
- Spring Boot集成
实战案例
完整电商系统容错架构
场景:订单服务调用多个下游服务
package main
import (
"context"
"encoding/json"
"errors"
"log"
"net/http"
"time"
)
// 订单服务
type OrderService struct {
stockService *ProtectedStockService
paymentService *ProtectedPaymentService
couponService *ProtectedCouponService
}
// 库存服务(保护)
type ProtectedStockService struct {
circuitBreaker *CircuitBreaker
limiter *TokenBucketLimiter
cache *StockCache
}
func (pss *ProtectedStockService) CheckStock(productID string, quantity int) (bool, error) {
// 1. 限流检查
if !pss.limiter.Allow() {
log.Println("Stock service rate limited")
return false, errors.New("rate limit exceeded")
}
// 2. 熔断器保护
result, err := pss.circuitBreaker.Call(func() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
return pss.callStockAPI(ctx, productID, quantity)
})
if err != nil {
// 3. 降级:使用缓存
log.Printf("Stock service unavailable, using cache: %v", err)
cachedStock, cacheErr := pss.cache.GetStock(productID)
if cacheErr == nil && cachedStock >= quantity {
return true, nil
}
return false, err
}
stock := result.(bool)
// 4. 更新缓存
if stock {
pss.cache.SetStock(productID, quantity)
}
return stock, nil
}
// 支付服务(保护)
type ProtectedPaymentService struct {
circuitBreaker *CircuitBreaker
limiter *TokenBucketLimiter
messageQueue *MessageQueue
}
func (pps *ProtectedPaymentService) CreatePayment(orderID string, amount float64) error {
// 1. 限流检查
if !pps.limiter.Allow() {
// 降级:异步支付
log.Println("Payment service rate limited, using async")
return pps.asyncPayment(orderID, amount)
}
// 2. 熔断器保护
_, err := pps.circuitBreaker.Call(func() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return nil, pps.callPaymentAPI(ctx, orderID, amount)
})
if err != nil {
// 3. 降级:异步支付
log.Printf("Payment service unavailable, using async: %v", err)
return pps.asyncPayment(orderID, amount)
}
return nil
}
func (pps *ProtectedPaymentService) asyncPayment(orderID string, amount float64) error {
message := map[string]interface{}{
"order_id": orderID,
"amount": amount,
"time": time.Now(),
}
data, _ := json.Marshal(message)
return pps.messageQueue.Send("payment.create", data)
}
// 优惠券服务(保护)
type ProtectedCouponService struct {
circuitBreaker *CircuitBreaker
limiter *TokenBucketLimiter
}
func (pcs *ProtectedCouponService) CalculateDiscount(userID string, amount float64) (float64, error) {
// 1. 限流检查
if !pcs.limiter.Allow() {
// 降级:无优惠(不影响核心流程)
log.Println("Coupon service rate limited, no discount")
return 0, nil
}
// 2. 熔断器保护
result, err := pcs.circuitBreaker.Call(func() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return pcs.callCouponAPI(ctx, userID, amount)
})
if err != nil {
// 3. 降级:无优惠(优惠券非核心功能)
log.Printf("Coupon service unavailable, no discount: %v", err)
return 0, nil
}
return result.(float64), nil
}
// 创建订单(编排调用)
func (os *OrderService) CreateOrder(req *OrderRequest) (*OrderResponse, error) {
// 1. 检查库存(核心,失败则终止)
hasStock, err := os.stockService.CheckStock(req.ProductID, req.Quantity)
if err != nil || !hasStock {
return nil, errors.New("insufficient stock or service unavailable")
}
// 2. 计算优惠(非核心,失败也继续)
discount, _ := os.couponService.CalculateDiscount(req.UserID, req.Amount)
// 3. 创建支付(核心,但支持降级)
finalAmount := req.Amount - discount
err = os.paymentService.CreatePayment(req.OrderID, finalAmount)
if err != nil {
log.Printf("Payment creation failed, but order accepted: %v", err)
// 支付异步处理,订单仍然创建成功
}
return &OrderResponse{
OrderID: req.OrderID,
Amount: finalAmount,
Discount: discount,
Status: "pending_payment",
}, nil
}
// 监控指标
type CircuitBreakerMetrics struct {
Resource string
State string
TotalRequests uint32
SuccessRate float64
FailureRate float64
LastStateChange time.Time
}
func (cb *CircuitBreaker) GetMetrics() *CircuitBreakerMetrics {
cb.mu.Lock()
defer cb.mu.Unlock()
var stateStr string
switch cb.state {
case StateClosed:
stateStr = "CLOSED"
case StateOpen:
stateStr = "OPEN"
case StateHalfOpen:
stateStr = "HALF_OPEN"
}
successRate := 0.0
failureRate := 0.0
if cb.counts.Requests > 0 {
successRate = float64(cb.counts.TotalSuccesses) / float64(cb.counts.Requests)
failureRate = float64(cb.counts.TotalFailures) / float64(cb.counts.Requests)
}
return &CircuitBreakerMetrics{
State: stateStr,
TotalRequests: cb.counts.Requests,
SuccessRate: successRate,
FailureRate: failureRate,
LastStateChange: cb.expiry,
}
}
// HTTP监控端点
func metricsHandler(services map[string]*CircuitBreaker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metrics := make(map[string]*CircuitBreakerMetrics)
for name, cb := range services {
metrics[name] = cb.GetMetrics()
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics)
}
}
面试问答
熔断器的三种状态如何转换?
答案:
状态转换流程:
1. Closed → Open:
条件:失败率 ≥ 阈值(如50%)
示例:10个请求中5个失败 → Open
2. Open → Half-Open:
条件:等待超时(如30秒)
说明:给服务恢复的时间
3. Half-Open → Closed:
条件:连续成功次数达到阈值(如3次)
说明:服务已恢复
4. Half-Open → Open:
条件:任意请求失败
说明:服务仍未恢复
关键参数:
- failureThreshold: 50%~80%
- requestVolumeThreshold: 10~20
- sleepWindow: 30~60秒
- successThreshold: 3~5次
熔断和降级有什么区别?
答案:
| 对比维度 | 熔断 | 降级 |
|---|---|---|
| 触发条件 | 下游服务故障(被动) | 系统负载高/主动决策 |
| 目的 | 保护系统不被拖垮 | 保证核心功能可用 |
| 作用对象 | 特定服务/接口 | 非核心功能 |
| 恢复机制 | 自动恢复(Half-Open) | 手动恢复 |
| 实现方式 | 熔断器模式 | 返回默认值/缓存/快速失败 |
示例:
// 熔断:下游服务故障,自动拒绝请求
if circuitBreakerOpen {
return errors.New("service unavailable")
}
// 降级:系统负载高,主动降级非核心功能
if systemLoad > 0.8 {
return defaultRecommendations // 推荐系统降级
}
如何选择线程池隔离还是信号量隔离?
答案:
线程池隔离:
优点:
完全隔离(不影响主线程池)
支持超时控制
支持异步执行
缺点:
线程上下文切换开销
资源消耗大
适用场景:
- 慢服务(响应时间不可控)
- 需要超时控制
- 非核心服务
信号量隔离:
优点:
轻量级(无线程切换)
性能高
缺点:
不支持超时控制
同步执行
适用场景:
- 快速服务(响应时间可控)
- 性能要求高
- 内部服务调用
代码示例:
// 线程池隔离
type ThreadPoolIsolation struct {
workerPool chan struct{}
}
func (tpi *ThreadPoolIsolation) Execute(task func()) error {
select {
case tpi.workerPool <- struct{}{}:
defer func() { <-tpi.workerPool }()
task()
return nil
default:
return errors.New("thread pool full")
}
}
// 信号量隔离
type SemaphoreIsolation struct {
semaphore chan struct{}
}
func (si *SemaphoreIsolation) Execute(task func()) error {
select {
case si.semaphore <- struct{}{}:
defer func() { <-si.semaphore }()
task()
return nil
default:
return errors.New("semaphore full")
}
}
如何设计降级策略?
答案:
降级分级:
P0(核心功能):
- 订单创建、支付
- 不降级,必须保证可用
P1(重要功能):
- 用户登录、商品详情
- 降级策略:返回缓存
P2(一般功能):
- 推荐系统、评论
- 降级策略:返回默认值
P3(非核心功能):
- 积分、优惠券
- 降级策略:快速失败(直接不可用)
降级开关:
type FeatureSwitch struct {
switches map[string]bool
mu sync.RWMutex
}
func (fs *FeatureSwitch) IsEnabled(feature string) bool {
fs.mu.RLock()
defer fs.mu.RUnlock()
enabled, ok := fs.switches[feature]
return ok && enabled
}
// 使用示例
func (os *OrderService) CreateOrder(req *OrderRequest) (*OrderResponse, error) {
// 核心功能:库存检查(不降级)
hasStock, err := os.stockService.CheckStock(req.ProductID, req.Quantity)
if err != nil || !hasStock {
return nil, errors.New("insufficient stock")
}
// 非核心功能:优惠券(可降级)
var discount float64
if os.featureSwitch.IsEnabled("coupon") {
discount, _ = os.couponService.CalculateDiscount(req.UserID, req.Amount)
} else {
log.Println("Coupon feature is disabled (degraded)")
discount = 0
}
// ...
}
动态配置(Apollo/Nacos):
# 配置中心
degradation:
coupon:
enabled: false # 优惠券功能降级
reason: "系统负载过高"
recommend:
enabled: false
default_items: ["item1", "item2", "item3"]
如何监控熔断器状态?
答案:
1. 指标收集:
type CircuitBreakerMetrics struct {
// 基础指标
State string // 状态
TotalRequests uint32 // 总请求数
SuccessCount uint32 // 成功数
FailureCount uint32 // 失败数
SuccessRate float64 // 成功率
FailureRate float64 // 失败率
// 时间指标
LastStateChange time.Time // 上次状态变更时间
OpenDuration time.Duration // Open状态持续时间
// Half-Open指标
HalfOpenRequests uint32
HalfOpenSuccesses uint32
}
2. Prometheus指标:
import "github.com/prometheus/client_golang/prometheus"
var (
circuitBreakerState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "circuit_breaker_state",
Help: "Circuit breaker state (0=closed, 1=open, 2=half-open)",
},
[]string{"resource"},
)
circuitBreakerRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "circuit_breaker_requests_total",
Help: "Total number of requests",
},
[]string{"resource", "result"},
)
)
func (cb *CircuitBreaker) recordMetrics(resource string) {
// 记录状态
var stateValue float64
switch cb.state {
case StateClosed:
stateValue = 0
case StateOpen:
stateValue = 1
case StateHalfOpen:
stateValue = 2
}
circuitBreakerState.WithLabelValues(resource).Set(stateValue)
// 记录请求
circuitBreakerRequests.WithLabelValues(resource, "success").Add(float64(cb.counts.TotalSuccesses))
circuitBreakerRequests.WithLabelValues(resource, "failure").Add(float64(cb.counts.TotalFailures))
}
3. 告警规则:
groups:
- name: circuit_breaker
rules:
- alert: CircuitBreakerOpen
expr: circuit_breaker_state > 0
for: 1m
annotations:
summary: "Circuit breaker is open for {{ $labels.resource }}"
- alert: HighFailureRate
expr: |
rate(circuit_breaker_requests_total{result="failure"}[1m])
/
rate(circuit_breaker_requests_total[1m])
> 0.5
for: 5m
annotations:
summary: "High failure rate for {{ $labels.resource }}"