第5章:服务治理
服务治理概述
什么是服务治理
定义:服务治理是微服务架构中对服务的生命周期、运行状态、配置、调用关系等进行统一管理和控制的机制。
核心组件
┌──────────────────────────────────────┐
│ Service Governance │
│ │
│ ┌────────────────────────────────┐ │
│ │ Service Registry │ │ 服务注册与发现
│ │ (Consul/Eureka/Nacos) │ │
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Configuration Center │ │ 配置中心
│ │ (Apollo/Nacos/Consul) │ │
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Circuit Breaker & Rate Limit │ │ 熔断限流
│ │ (Hystrix/Sentinel/Resilience4j)│ │
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────┐ │
│ │ Monitoring & Tracing │ │ 监控追踪
│ │ (Prometheus/Jaeger/Zipkin) │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
服务注册与发现
原理
服务注册:
┌──────────────┐
│ Service A │
│ (启动) │
└──────────────┘
↓ 1. 注册(IP、端口、健康检查)
┌──────────────────┐
│ Service Registry │
│ - Service A: │
│ • 10.0.1.5:8080│
│ • 10.0.1.6:8080│
│ - Service B: │
│ • 10.0.2.5:8081│
└──────────────────┘
服务发现:
┌──────────────┐
│ Service B │
│(需要调用A) │
└──────────────┘
↓ 2. 查询Service A的地址
┌──────────────────┐
│ Service Registry │
│ 返回: │
│ [10.0.1.5:8080, │
│ 10.0.1.6:8080] │
└──────────────────┘
↓ 3. 负载均衡选择一个
┌──────────────┐
│ Service A │
│ 10.0.1.5 │
└──────────────┘
Consul实战
安装Consul:
# Docker方式
docker run -d --name=consul \
-p 8500:8500 \
-p 8600:8600/udp \
consul agent -server -ui -bootstrap-expect=1 \
-client='0.0.0.0'
# 访问UI
open http://localhost:8500
服务注册(Go SDK):
package main
import (
"fmt"
"log"
"net/http"
consulapi "github.com/hashicorp/consul/api"
)
type ServiceRegistry struct {
client *consulapi.Client
}
func NewServiceRegistry(consulAddr string) (*ServiceRegistry, error) {
config := consulapi.DefaultConfig()
config.Address = consulAddr
client, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceRegistry{client: client}, nil
}
// 注册服务
func (sr *ServiceRegistry) Register(serviceName, serviceID, address string, port int) error {
registration := &consulapi.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Address: address,
Port: port,
Check: &consulapi.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", address, port),
Interval: "10s",
Timeout: "3s",
},
}
return sr.client.Agent().ServiceRegister(registration)
}
// 注销服务
func (sr *ServiceRegistry) Deregister(serviceID string) error {
return sr.client.Agent().ServiceDeregister(serviceID)
}
// 发现服务
func (sr *ServiceRegistry) Discover(serviceName string) ([]*consulapi.ServiceEntry, error) {
services, _, err := sr.client.Health().Service(serviceName, "", true, nil)
return services, err
}
// 使用示例
func main() {
registry, err := NewServiceRegistry("localhost:8500")
if err != nil {
log.Fatal(err)
}
// 注册服务
err = registry.Register(
"user-service", // 服务名
"user-service-001", // 服务ID
"192.168.1.100", // IP
8080, // 端口
)
if err != nil {
log.Fatal(err)
}
// 启动HTTP服务
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
http.HandleFunc("/api/users", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("User API"))
})
log.Println("Service started on :8080")
http.ListenAndServe(":8080", nil)
// 退出时注销
defer registry.Deregister("user-service-001")
}
服务发现(客户端):
package main
import (
"fmt"
"log"
"math/rand"
"net/http"
consulapi "github.com/hashicorp/consul/api"
)
type ServiceClient struct {
registry *ServiceRegistry
}
func NewServiceClient(consulAddr string) (*ServiceClient, error) {
registry, err := NewServiceRegistry(consulAddr)
if err != nil {
return nil, err
}
return &ServiceClient{registry: registry}, nil
}
// 调用服务(带负载均衡)
func (sc *ServiceClient) Call(serviceName, path string) (string, error) {
// 1. 发现服务实例
services, err := sc.registry.Discover(serviceName)
if err != nil {
return "", err
}
if len(services) == 0 {
return "", fmt.Errorf("no available instances for %s", serviceName)
}
// 2. 负载均衡选择实例(随机)
service := services[rand.Intn(len(services))]
addr := fmt.Sprintf("http://%s:%d%s",
service.Service.Address,
service.Service.Port,
path,
)
// 3. 发起HTTP请求
resp, err := http.Get(addr)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
return string(body), nil
}
// 使用示例
func main() {
client, err := NewServiceClient("localhost:8500")
if err != nil {
log.Fatal(err)
}
// 调用user-service
result, err := client.Call("user-service", "/api/users")
if err != nil {
log.Fatal(err)
}
fmt.Println("Result:", result)
}
Nacos实战
Docker安装Nacos:
docker run -d --name nacos \
-e MODE=standalone \
-p 8848:8848 \
nacos/nacos-server:latest
# 访问UI
open http://localhost:8848/nacos
# 默认账号密码:nacos/nacos
服务注册(Go SDK):
package main
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
)
func main() {
// 1. 创建客户端配置
sc := []constant.ServerConfig{
{
IpAddr: "localhost",
Port: 8848,
},
}
cc := constant.ClientConfig{
NamespaceId: "public",
TimeoutMs: 5000,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
}
// 2. 创建命名服务客户端
client, err := clients.CreateNamingClient(map[string]interface{}{
"serverConfigs": sc,
"clientConfig": cc,
})
if err != nil {
panic(err)
}
// 3. 注册服务实例
success, err := client.RegisterInstance(vo.RegisterInstanceParam{
Ip: "192.168.1.100",
Port: 8080,
ServiceName: "user-service",
GroupName: "DEFAULT_GROUP",
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"version": "v1"},
})
if err != nil || !success {
panic(err)
}
// 4. 服务发现
instances, err := client.SelectInstances(vo.SelectInstancesParam{
ServiceName: "user-service",
GroupName: "DEFAULT_GROUP",
HealthyOnly: true,
})
for _, instance := range instances {
fmt.Printf("Instance: %s:%d, Weight: %f\n",
instance.Ip, instance.Port, instance.Weight)
}
// 5. 注销服务
defer client.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: "192.168.1.100",
Port: 8080,
ServiceName: "user-service",
GroupName: "DEFAULT_GROUP",
})
}
配置中心
为什么需要配置中心
传统配置方式:
问题:
配置分散在各个服务
配置修改需要重启服务
无法动态调整
配置管理混乱
敏感信息泄露风险
配置中心方案:
优势:
集中管理配置
动态更新(无需重启)
配置版本管理
灰度配置
权限控制
配置审计
Apollo实战
Docker安装Apollo:
# 使用docker-compose
version: '3'
services:
apollo-configservice:
image: apolloconfig/apollo-configservice
ports:
- "8080:8080"
environment:
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/ApolloConfigDB
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=root
apollo-adminservice:
image: apolloconfig/apollo-adminservice
ports:
- "8090:8090"
environment:
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/ApolloConfigDB
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=root
apollo-portal:
image: apolloconfig/apollo-portal
ports:
- "8070:8070"
environment:
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/ApolloPortalDB
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=root
配置读取(Go SDK):
package main
import (
"fmt"
"time"
"github.com/apolloconfig/agollo/v4"
"github.com/apolloconfig/agollo/v4/env/config"
)
type ConfigManager struct {
client agollo.Client
}
func NewConfigManager() (*ConfigManager, error) {
c := &config.AppConfig{
AppID: "user-service",
Cluster: "default",
IP: "http://localhost:8080",
NamespaceName: "application",
IsBackupConfig: true,
Secret: "",
}
client, err := agollo.StartWithConfig(func() (*config.AppConfig, error) {
return c, nil
})
if err != nil {
return nil, err
}
return &ConfigManager{client: client}, nil
}
// 获取配置
func (cm *ConfigManager) GetString(key string) string {
return cm.client.GetStringValue(key, "")
}
func (cm *ConfigManager) GetInt(key string) int {
return cm.client.GetIntValue(key, 0)
}
// 监听配置变化
func (cm *ConfigManager) Watch(key string, callback func(old, new string)) {
cm.client.AddChangeListener(&ConfigChangeListener{
key: key,
callback: callback,
})
}
type ConfigChangeListener struct {
key string
callback func(old, new string)
}
func (c *ConfigChangeListener) OnChange(event *agollo.ChangeEvent) {
for key, change := range event.Changes {
if key == c.key {
c.callback(change.OldValue.(string), change.NewValue.(string))
}
}
}
func (c *ConfigChangeListener) OnNewestChange(event *agollo.FullChangeEvent) {}
// 使用示例
func main() {
cm, err := NewConfigManager()
if err != nil {
panic(err)
}
// 读取配置
dbHost := cm.GetString("db.host")
dbPort := cm.GetInt("db.port")
fmt.Printf("Database: %s:%d\n", dbHost, dbPort)
// 监听配置变化
cm.Watch("feature.new_ui", func(old, new string) {
fmt.Printf("Config changed: %s -> %s\n", old, new)
// 动态启用/禁用新功能
if new == "true" {
enableNewUI()
} else {
disableNewUI()
}
})
// 保持运行
select {}
}
func enableNewUI() {
fmt.Println("New UI enabled")
}
func disableNewUI() {
fmt.Println("New UI disabled")
}
Nacos配置中心
package main
import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
)
func main() {
// 1. 创建配置客户端
sc := []constant.ServerConfig{{
IpAddr: "localhost",
Port: 8848,
}}
cc := constant.ClientConfig{
NamespaceId: "public",
TimeoutMs: 5000,
}
client, err := clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": sc,
"clientConfig": cc,
})
if err != nil {
panic(err)
}
// 2. 获取配置
content, err := client.GetConfig(vo.ConfigParam{
DataId: "user-service",
Group: "DEFAULT_GROUP",
})
fmt.Println("Config:", content)
// 3. 监听配置变化
err = client.ListenConfig(vo.ConfigParam{
DataId: "user-service",
Group: "DEFAULT_GROUP",
OnChange: func(namespace, group, dataId, data string) {
fmt.Printf("Config changed: %s\n", data)
},
})
// 4. 发布配置
success, err := client.PublishConfig(vo.ConfigParam{
DataId: "user-service",
Group: "DEFAULT_GROUP",
Content: "db.host=localhost\ndb.port=3306",
})
fmt.Println("Publish success:", success)
}
服务降级
降级策略
1. 返回默认值:
func GetUserInfo(userID string) (*User, error) {
// 尝试调用服务
user, err := userService.GetUser(userID)
if err != nil {
// 降级:返回默认用户
return &User{
ID: userID,
Name: "Guest",
}, nil
}
return user, nil
}
2. 返回缓存数据:
type UserService struct {
cache *Cache
}
func (s *UserService) GetUserInfo(userID string) (*User, error) {
// 先查缓存
if user, ok := s.cache.Get(userID); ok {
return user.(*User), nil
}
// 调用服务
user, err := s.callRemoteService(userID)
if err != nil {
// 降级:返回缓存(即使过期)
if staleUser, ok := s.cache.GetStale(userID); ok {
return staleUser.(*User), nil
}
return nil, err
}
// 更新缓存
s.cache.Set(userID, user, 5*time.Minute)
return user, nil
}
3. 快速失败:
func GetRecommendations(userID string) ([]Product, error) {
// 推荐服务不是核心功能,失败直接返回空
products, err := recommendService.GetRecommendations(userID)
if err != nil {
return []Product{}, nil // 降级:返回空列表
}
return products, nil
}
4. 降级开关:
type FeatureToggle struct {
config *ConfigManager
}
func (ft *FeatureToggle) IsEnabled(feature string) bool {
return ft.config.GetBool(feature)
}
func GetOrderDetail(orderID string) (*OrderDetail, error) {
detail := &OrderDetail{
Order: getOrder(orderID),
}
// 推荐商品功能降级开关
if featureToggle.IsEnabled("feature.recommendations") {
detail.Recommendations = getRecommendations(orderID)
}
// 评论功能降级开关
if featureToggle.IsEnabled("feature.comments") {
detail.Comments = getComments(orderID)
}
return detail, nil
}
熔断限流
Sentinel实战
安装Sentinel Dashboard:
# 下载Sentinel Dashboard
wget https://github.com/alibaba/Sentinel/releases/download/1.8.6/sentinel-dashboard-1.8.6.jar
# 启动
java -Dserver.port=8080 -jar sentinel-dashboard-1.8.6.jar
# 访问
open http://localhost:8080
# 默认账号密码:sentinel/sentinel
Go集成Sentinel:
package main
import (
"fmt"
"log"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/flow"
)
func main() {
// 1. 初始化Sentinel
err := sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// 2. 配置限流规则
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "user-api",
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 100, // QPS限制:100
StatIntervalInMs: 1000,
},
})
if err != nil {
log.Fatal(err)
}
// 3. 使用限流
for i := 0; i < 200; i++ {
e, b := sentinel.Entry("user-api")
if b != nil {
// 限流触发
fmt.Println("Request blocked:", i)
time.Sleep(10 * time.Millisecond)
} else {
// 请求通过
fmt.Println("Request passed:", i)
e.Exit()
}
}
}
熔断规则:
import (
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
)
func setupCircuitBreaker() {
// 配置熔断规则
_, err := circuitbreaker.LoadRules([]*circuitbreaker.Rule{
{
Resource: "order-service",
Strategy: circuitbreaker.ErrorRatio, // 错误率策略
RetryTimeoutMs: 10000, // 熔断后10秒尝试恢复
MinRequestAmount: 10, // 最小请求数
StatIntervalMs: 1000, // 统计窗口
Threshold: 0.5, // 错误率阈值:50%
},
})
}
// 使用熔断
func callOrderService() error {
e, b := sentinel.Entry("order-service")
if b != nil {
// 熔断触发
return errors.New("circuit breaker open")
}
defer e.Exit()
// 调用服务
err := orderService.CreateOrder()
if err != nil {
sentinel.TraceError(e, err) // 记录错误
return err
}
return nil
}
服务监控
Prometheus + Grafana
服务暴露指标(Go):
package main
import (
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
// 请求总数
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "path", "status"},
)
// 请求延迟
httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "path"},
)
// 当前进行中的请求
httpRequestsInFlight = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "http_requests_in_flight",
Help: "Current number of HTTP requests being served",
},
)
)
func init() {
// 注册指标
prometheus.MustRegister(httpRequestsTotal)
prometheus.MustRegister(httpRequestDuration)
prometheus.MustRegister(httpRequestsInFlight)
}
// 监控中间件
func prometheusMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
httpRequestsInFlight.Inc()
defer httpRequestsInFlight.Dec()
// 包装ResponseWriter以记录状态码
rw := &responseWriter{ResponseWriter: w, statusCode: 200}
next.ServeHTTP(rw, r)
duration := time.Since(start).Seconds()
// 记录指标
httpRequestsTotal.WithLabelValues(
r.Method,
r.URL.Path,
fmt.Sprintf("%d", rw.statusCode),
).Inc()
httpRequestDuration.WithLabelValues(
r.Method,
r.URL.Path,
).Observe(duration)
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
func main() {
mux := http.NewServeMux()
// 业务API
mux.HandleFunc("/api/users", func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond) // 模拟处理时间
w.Write([]byte("User API"))
})
// 暴露指标端点
mux.Handle("/metrics", promhttp.Handler())
// 应用监控中间件
handler := prometheusMiddleware(mux)
http.ListenAndServe(":8080", handler)
}
Prometheus配置:
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'user-service'
static_configs:
- targets: ['localhost:8080']
labels:
service: 'user-service'
- job_name: 'order-service'
static_configs:
- targets: ['localhost:8081']
labels:
service: 'order-service'
Grafana Dashboard配置:
{
"dashboard": {
"title": "Service Metrics",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(http_requests_total[1m])"
}
]
},
{
"title": "P95 Latency",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[1m]))"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[1m]) / rate(http_requests_total[1m])"
}
]
}
]
}
}
实战案例
完整服务治理架构
┌──────────────────────────────────────┐
│ Service Governance │
└──────────────────────────────────────┘
↓
┌────────────────────────┐
│ Service Registry │
│ (Consul/Nacos) │
└────────────────────────┘
↓
┌────────────────────────┐
│ Configuration Center │
│ (Apollo/Nacos) │
└────────────────────────┘
↓
┌────────────────────────┐
│ Circuit Breaker │
│ (Sentinel) │
└────────────────────────┘
↓
┌────────────────────────┐
│ Monitoring │
│ (Prometheus/Grafana) │
└────────────────────────┘
完整示例代码:
// service.go - 完整服务实现
package main
import (
"fmt"
"log"
"net/http"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type UserService struct {
registry *ServiceRegistry
config *ConfigManager
circuitBreaker *CircuitBreaker
}
func NewUserService() (*UserService, error) {
// 1. 初始化服务注册
registry, err := NewServiceRegistry("localhost:8500")
if err != nil {
return nil, err
}
// 2. 初始化配置中心
config, err := NewConfigManager()
if err != nil {
return nil, err
}
// 3. 初始化熔断器
circuitBreaker := NewCircuitBreaker(5, 10*time.Second)
return &UserService{
registry: registry,
config: config,
circuitBreaker: circuitBreaker,
}, nil
}
func (s *UserService) Start() error {
// 1. 注册服务
err := s.registry.Register(
"user-service",
"user-service-001",
"192.168.1.100",
8080,
)
if err != nil {
return err
}
// 2. 加载配置
dbHost := s.config.GetString("db.host")
dbPort := s.config.GetInt("db.port")
log.Printf("DB Config: %s:%d", dbHost, dbPort)
// 3. 监听配置变化
s.config.Watch("feature.new_api", func(old, new string) {
log.Printf("Config changed: %s -> %s", old, new)
})
// 4. 设置HTTP路由
http.HandleFunc("/api/users", s.handleUsers)
http.HandleFunc("/health", s.handleHealth)
http.Handle("/metrics", promhttp.Handler())
// 5. 启动服务
log.Println("User service started on :8080")
return http.ListenAndServe(":8080", nil)
}
func (s *UserService) handleUsers(w http.ResponseWriter, r *http.Request) {
// 熔断保护
err := s.circuitBreaker.Call(func() error {
// 业务逻辑
return s.getUsersWithRetry()
})
if err != nil {
// 降级:返回缓存数据
w.Write([]byte("Degraded response"))
return
}
w.Write([]byte("User data"))
}
func (s *UserService) getUsersWithRetry() error {
// 重试逻辑(最多3次)
for i := 0; i < 3; i++ {
err := s.getUsers()
if err == nil {
return nil
}
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}
return fmt.Errorf("max retries exceeded")
}
func (s *UserService) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
func main() {
service, err := NewUserService()
if err != nil {
log.Fatal(err)
}
if err := service.Start(); err != nil {
log.Fatal(err)
}
}
面试问答
服务注册与发现有哪些实现方式?
答案:
客户端发现模式:
┌──────────┐
│ Service A│
└──────────┘
↓ 1. 查询Service B
┌──────────────┐
│ Registry │
└──────────────┘
↓ 2. 返回实例列表
┌──────────┐
│ Service A│ ─3. 直接调用─> Service B
└──────────┘
优点:
客户端负载均衡
无单点故障
性能好
缺点:
客户端逻辑复杂
语言绑定
服务端发现模式:
┌──────────┐
│ Service A│ ─1. 请求─> Load Balancer ─2. 查询─> Registry
└──────────┘ ↓
3. 转发
↓
Service B
优点:
客户端简单
语言无关
集中管理
缺点:
Load Balancer是单点
多一跳网络延迟
主流方案对比:
| 方案 | 类型 | CAP | 特点 |
|---|---|---|---|
| Consul | 客户端 | CP | 健康检查、KV存储 |
| Eureka | 客户端 | AP | Spring生态、自我保护 |
| Nacos | 客户端 | AP/CP | 注册+配置、双模式 |
| ZooKeeper | 客户端 | CP | 强一致、重量级 |
如何实现配置的动态更新?
答案:
实现机制:
1. 长轮询(Long Polling)
客户端 ─1. 请求配置─> 配置中心
↓ 配置未变化,hold住请求
等待30秒
↓ 配置变化 或 超时
客户端 <─2. 返回配置─ 配置中心
↓
3. 立即发起新请求
2. 服务端推送(WebSocket)
客户端 <─1. 建立WebSocket─> 配置中心
<─2. 推送配置更新─
<─3. 推送配置更新─
3. 消息队列
配置中心 ─1. 配置变化─> MQ ─2. 订阅─> 客户端
代码示例(长轮询):
type ConfigClient struct {
configURL string
version int64
cache map[string]string
}
func (c *ConfigClient) Watch() {
for {
// 长轮询请求(30秒超时)
url := fmt.Sprintf("%s?version=%d&timeout=30000", c.configURL, c.version)
resp, err := http.Get(url)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
// 解析配置
var config map[string]interface{}
json.NewDecoder(resp.Body).Decode(&config)
resp.Body.Close()
// 检查是否有变化
if newVersion := config["version"].(int64); newVersion > c.version {
// 更新配置
c.updateConfig(config["data"].(map[string]string))
c.version = newVersion
// 回调通知
c.notifyChange()
}
}
}