HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 分布式架构模式

    • 分布式架构模式手册
    • 第1章:分布式一致性
    • 第2章:分布式锁
    • 第3章:分布式协调
    • 第4章:服务发现与注册
    • 第5章:负载均衡
    • 第6章:熔断降级
    • 第7章:DDD领域驱动设计
    • 第8章:CQRS与Event Sourcing

第4章:服务发现与注册

为什么需要服务发现

单体应用 vs 微服务

单体应用:

Client → Load Balancer → Application Server (固定IP)
                         - 192.168.1.10
                         - 192.168.1.11
                         - 192.168.1.12

配置文件:
upstream backend {
    server 192.168.1.10:8080;
    server 192.168.1.11:8080;
    server 192.168.1.12:8080;
}

问题:
 手动配置IP
 扩容需要修改配置
 故障节点无法自动剔除

微服务架构:

订单服务需要调用:
- 用户服务(3个实例)
- 库存服务(5个实例)
- 支付服务(4个实例)
- 优惠券服务(2个实例)
...

挑战:
 服务实例动态变化(自动扩缩容)
 IP地址不固定(容器化部署)
 服务依赖关系复杂
 手动配置不可行

服务发现解决的问题

核心需求:

1. 服务注册
   服务启动 → 注册到注册中心
   {
       "service": "order-service",
       "instance": "192.168.1.10:8080",
       "health": "UP"
   }

2. 服务发现
   Client → 查询注册中心 → 获取可用实例列表
   [
       "192.168.1.10:8080",
       "192.168.1.11:8080",
       "192.168.1.12:8080"
   ]

3. 健康检查
   注册中心 → 定期检查实例健康状态
   → 自动剔除故障实例

4. 负载均衡
   Client → 从实例列表中选择一个
   (轮询、随机、加权等策略)

架构对比:

传统架构(静态配置):
┌────────┐        ┌─────────────┐
│ Client │───────→│ Nginx       │
└────────┘        │ (硬编码IP)  │
                  └─────────────┘
                        ↓
                ┌───────┴───────┐
                ↓               ↓
           Server1          Server2

微服务架构(服务发现):
┌────────┐        ┌──────────────┐
│ Client │───────→│ Registry     │
└────────┘    ①查询│ (Consul/etcd)│
                  └──────────────┘
                        ↑ ②注册
                ┌───────┴───────┐
                ↓               ↓
           Service A       Service B
           - Instance1     - Instance1
           - Instance2     - Instance2
                           - Instance3

优势:
 动态感知服务变化
 自动故障转移
 支持自动扩缩容
 无需手动配置

服务注册模式

1. 客户端注册(Self-Registration)

定义:服务实例自己负责注册到注册中心

流程:

1. 服务启动
   ↓
2. 调用注册中心API,注册自己
   POST /v1/agent/service/register
   {
       "ID": "order-service-1",
       "Name": "order-service",
       "Address": "192.168.1.10",
       "Port": 8080,
       "Check": {
           "HTTP": "http://192.168.1.10:8080/health",
           "Interval": "10s"
       }
   }
   ↓
3. 定期发送心跳(保持注册状态)
   ↓
4. 服务关闭时,主动注销

代码示例(Go + Consul):

package main

import (
    "fmt"
    "github.com/hashicorp/consul/api"
    "log"
    "os"
    "os/signal"
    "syscall"
)

type ServiceRegistry struct {
    client     *api.Client
    serviceID  string
}

func NewServiceRegistry(consulAddr string) (*ServiceRegistry, error) {
    config := api.DefaultConfig()
    config.Address = consulAddr

    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }

    return &ServiceRegistry{client: client}, nil
}

// 注册服务
func (sr *ServiceRegistry) Register(serviceName, addr string, port int) error {
    serviceID := fmt.Sprintf("%s-%s-%d", serviceName, addr, port)
    sr.serviceID = serviceID

    registration := &api.AgentServiceRegistration{
        ID:      serviceID,
        Name:    serviceName,
        Address: addr,
        Port:    port,
        Check: &api.AgentServiceCheck{
            HTTP:                           fmt.Sprintf("http://%s:%d/health", addr, port),
            Interval:                       "10s",
            Timeout:                        "3s",
            DeregisterCriticalServiceAfter: "30s",  // 30秒无响应自动注销
        },
    }

    err := sr.client.Agent().ServiceRegister(registration)
    if err != nil {
        return err
    }

    log.Printf("Service registered: %s", serviceID)
    return nil
}

// 注销服务
func (sr *ServiceRegistry) Deregister() error {
    err := sr.client.Agent().ServiceDeregister(sr.serviceID)
    if err != nil {
        return err
    }

    log.Printf("Service deregistered: %s", sr.serviceID)
    return nil
}

// 优雅退出
func (sr *ServiceRegistry) WaitForShutdown() {
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

    <-signalChan
    log.Println("Shutdown signal received")

    // 注销服务
    sr.Deregister()
}

// 使用示例
func main() {
    registry, err := NewServiceRegistry("localhost:8500")
    if err != nil {
        log.Fatal(err)
    }

    // 注册服务
    err = registry.Register("order-service", "192.168.1.10", 8080)
    if err != nil {
        log.Fatal(err)
    }

    // 启动HTTP服务
    // http.HandleFunc("/health", healthCheck)
    // go http.ListenAndServe(":8080", nil)

    // 等待退出信号
    registry.WaitForShutdown()
}

优点:

  • 简单直接
  • 服务实例完全控制注册逻辑
  • 无需额外组件

缺点:

  • 每个服务都需要实现注册逻辑
  • 耦合了注册中心的客户端库
  • 多语言环境需要多套客户端

2. 第三方注册(Third-Party Registration)

定义:由独立的注册组件(Registrar)负责服务注册

流程:

┌──────────────┐
│   Service    │
└──────┬───────┘
       │ ①启动
       ↓
┌──────────────┐
│  Registrar   │ ←──── 监听服务启动事件
│  (Kubernetes │       (如:容器创建)
│   Service)   │
└──────┬───────┘
       │ ②注册
       ↓
┌──────────────┐
│   Registry   │
│  (etcd/DNS)  │
└──────────────┘

典型实现:

  1. Kubernetes Service
apiVersion: v1
kind: Service
metadata:
  name: order-service
spec:
  selector:
    app: order-service
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080

# Kubernetes自动注册到内部DNS
# order-service.default.svc.cluster.local → 多个Pod IP
  1. Registrator(Docker)
# 自动监听Docker容器启动,注册到Consul
docker run -d \
  --name=registrator \
  --volume=/var/run/docker.sock:/tmp/docker.sock \
  gliderlabs/registrator:latest \
  consul://consul:8500

优点:

  • 服务代码无需关心注册逻辑
  • 解耦服务和注册中心
  • 统一管理

缺点:

  • 需要额外组件
  • 增加运维复杂度

服务发现模式

1. 客户端发现(Client-Side Discovery)

定义:客户端直接查询注册中心,获取服务实例列表,自己选择实例调用

流程:

┌────────┐
│ Client │
└───┬────┘
    │ ①查询服务实例
    ↓
┌──────────┐
│ Registry │
│ (Consul) │
└────┬─────┘
     │ ②返回实例列表
     │ [ip1, ip2, ip3]
     ↓
┌────────┐
│ Client │ ③负载均衡(客户端)
└───┬────┘
    │ ④直接调用
    ↓
┌─────────┐
│ Service │
│ Instance│
└─────────┘

代码示例:

package main

import (
    "github.com/hashicorp/consul/api"
    "math/rand"
    "time"
)

type ServiceDiscovery struct {
    client *api.Client
}

func NewServiceDiscovery(consulAddr string) (*ServiceDiscovery, error) {
    config := api.DefaultConfig()
    config.Address = consulAddr

    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }

    return &ServiceDiscovery{client: client}, nil
}

// 发现服务实例(健康的)
func (sd *ServiceDiscovery) DiscoverService(serviceName string) ([]string, error) {
    services, _, err := sd.client.Health().Service(
        serviceName,
        "",
        true,  // 只返回健康的实例
        nil,
    )
    if err != nil {
        return nil, err
    }

    instances := make([]string, 0, len(services))
    for _, service := range services {
        addr := fmt.Sprintf("%s:%d",
            service.Service.Address,
            service.Service.Port,
        )
        instances = append(instances, addr)
    }

    return instances, nil
}

// 客户端负载均衡器
type LoadBalancer struct {
    discovery *ServiceDiscovery
    cache     map[string][]string  // 缓存实例列表
    mu        sync.RWMutex
}

func NewLoadBalancer(discovery *ServiceDiscovery) *LoadBalancer {
    lb := &LoadBalancer{
        discovery: discovery,
        cache:     make(map[string][]string),
    }

    // 定期刷新实例列表
    go lb.refreshLoop()

    return lb
}

func (lb *LoadBalancer) refreshLoop() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        for serviceName := range lb.cache {
            instances, err := lb.discovery.DiscoverService(serviceName)
            if err == nil {
                lb.mu.Lock()
                lb.cache[serviceName] = instances
                lb.mu.Unlock()
            }
        }
    }
}

// 随机负载均衡
func (lb *LoadBalancer) ChooseInstance(serviceName string) (string, error) {
    lb.mu.RLock()
    instances, ok := lb.cache[serviceName]
    lb.mu.RUnlock()

    if !ok || len(instances) == 0 {
        // 首次访问,从注册中心获取
        var err error
        instances, err = lb.discovery.DiscoverService(serviceName)
        if err != nil {
            return "", err
        }

        lb.mu.Lock()
        lb.cache[serviceName] = instances
        lb.mu.Unlock()
    }

    if len(instances) == 0 {
        return "", errors.New("no available instances")
    }

    // 随机选择
    index := rand.Intn(len(instances))
    return instances[index], nil
}

// 使用示例
func main() {
    discovery, _ := NewServiceDiscovery("localhost:8500")
    lb := NewLoadBalancer(discovery)

    // 调用服务
    for i := 0; i < 10; i++ {
        instance, err := lb.ChooseInstance("order-service")
        if err != nil {
            log.Printf("Error: %v", err)
            continue
        }

        log.Printf("Calling instance: %s", instance)
        // 发起HTTP请求到instance
        // resp, _ := http.Get(fmt.Sprintf("http://%s/api", instance))
    }
}

优点:

  • 客户端直接调用,无额外跳转(性能高)
  • 负载均衡灵活(可自定义策略)
  • 无单点故障

缺点:

  • 客户端逻辑复杂
  • 每个客户端都需要实现负载均衡
  • 多语言环境需要多套客户端

适用场景:

  • 高性能要求
  • 同语言技术栈
  • 微服务内部调用

2. 服务端发现(Server-Side Discovery)

定义:客户端通过负载均衡器(Load Balancer)访问服务,由负载均衡器查询注册中心

流程:

┌────────┐
│ Client │
└───┬────┘
    │ ①请求
    ↓
┌────────────┐
│Load Balancer│ ②查询注册中心
│  (Nginx)    │ ③负载均衡
└───┬─────────┘
    │ ④转发请求
    ↓
┌─────────┐
│ Service │
│ Instance│
└─────────┘

典型实现:

1. AWS ELB + ECS

Client → ELB (Elastic Load Balancer)
       → ECS Service (自动注册到ELB)
       → Container Instances

2. Kubernetes Service

apiVersion: v1
kind: Service
metadata:
  name: order-service
spec:
  type: LoadBalancer
  selector:
    app: order-service
  ports:
    - port: 80
      targetPort: 8080

# Client访问:
# http://order-service:80
# Kubernetes自动转发到健康的Pod

3. Nginx + Consul Template

# consul-template动态生成nginx配置
upstream order-service {
{{ range service "order-service" }}
    server {{ .Address }}:{{ .Port }};
{{ end }}
}

server {
    listen 80;
    location / {
        proxy_pass http://order-service;
    }
}
# 启动consul-template,监听服务变化
consul-template \
  -consul-addr=localhost:8500 \
  -template="nginx.conf.tpl:nginx.conf:nginx -s reload"

优点:

  • 客户端简单(只需知道负载均衡器地址)
  • 多语言友好(HTTP标准)
  • 统一入口(易于管理)

缺点:

  • 负载均衡器是单点(需要HA)
  • 额外的网络跳转(性能略低)
  • 负载均衡器压力大

适用场景:

  • 多语言环境
  • 外部客户端访问
  • 需要统一网关

主流实现方案

1. Consul

简介:HashiCorp开源的服务发现和配置管理工具

核心特性:

  • 服务注册与发现
  • 健康检查(HTTP、TCP、脚本)
  • KV存储(配置管理)
  • 多数据中心支持
  • Service Mesh(Consul Connect)

架构:

┌─────────────────────────────────────┐
│          Consul Cluster             │
│  ┌────────┐  ┌────────┐  ┌────────┐│
│  │Server 1│  │Server 2│  │Server 3││
│  │(Leader)│  │(Follower)│(Follower)││
│  └────────┘  └────────┘  └────────┘│
└─────────────────────────────────────┘
        ↑                 ↑
        │                 │
 ┌──────┴────┐     ┌──────┴────┐
 │Consul Agent│   │Consul Agent│
 │  (Client)  │   │  (Client)  │
 └──────┬─────┘   └──────┬─────┘
        │                 │
 ┌──────┴────┐     ┌──────┴────┐
 │ Service A │     │ Service B │
 └───────────┘     └───────────┘

完整代码示例:

package main

import (
    "encoding/json"
    "fmt"
    "github.com/hashicorp/consul/api"
    "log"
    "net/http"
    "time"
)

// 服务注册
type ConsulRegistry struct {
    client     *api.Client
    serviceID  string
    stopCh     chan struct{}
}

func NewConsulRegistry(consulAddr string) (*ConsulRegistry, error) {
    config := api.DefaultConfig()
    config.Address = consulAddr

    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }

    return &ConsulRegistry{
        client: client,
        stopCh: make(chan struct{}),
    }, nil
}

func (cr *ConsulRegistry) Register(serviceName, addr string, port int, tags []string) error {
    serviceID := fmt.Sprintf("%s-%s-%d", serviceName, addr, port)
    cr.serviceID = serviceID

    registration := &api.AgentServiceRegistration{
        ID:      serviceID,
        Name:    serviceName,
        Tags:    tags,
        Address: addr,
        Port:    port,
        Check: &api.AgentServiceCheck{
            HTTP:                           fmt.Sprintf("http://%s:%d/health", addr, port),
            Interval:                       "10s",
            Timeout:                        "3s",
            DeregisterCriticalServiceAfter: "30s",
        },
    }

    err := cr.client.Agent().ServiceRegister(registration)
    if err != nil {
        return err
    }

    log.Printf("Service registered: %s", serviceID)

    // 启动心跳(可选,HTTP Check已经包含)
    // go cr.sendHeartbeat()

    return nil
}

func (cr *ConsulRegistry) Deregister() error {
    close(cr.stopCh)
    return cr.client.Agent().ServiceDeregister(cr.serviceID)
}

// 服务发现
type ConsulDiscovery struct {
    client *api.Client
}

func NewConsulDiscovery(consulAddr string) (*ConsulDiscovery, error) {
    config := api.DefaultConfig()
    config.Address = consulAddr

    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }

    return &ConsulDiscovery{client: client}, nil
}

func (cd *ConsulDiscovery) DiscoverService(serviceName string) ([]*ServiceInstance, error) {
    services, _, err := cd.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }

    instances := make([]*ServiceInstance, 0, len(services))
    for _, service := range services {
        instance := &ServiceInstance{
            ID:      service.Service.ID,
            Name:    service.Service.Service,
            Address: service.Service.Address,
            Port:    service.Service.Port,
            Tags:    service.Service.Tags,
        }
        instances = append(instances, instance)
    }

    return instances, nil
}

type ServiceInstance struct {
    ID      string
    Name    string
    Address string
    Port    int
    Tags    []string
}

// HTTP服务
type OrderService struct {
    registry *ConsulRegistry
}

func (s *OrderService) healthCheck(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "status": "UP",
        "time":   time.Now().Format(time.RFC3339),
    })
}

func (s *OrderService) createOrder(w http.ResponseWriter, r *http.Request) {
    // 业务逻辑
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "order_id": "12345",
        "status":   "created",
    })
}

func main() {
    // 注册服务
    registry, err := NewConsulRegistry("localhost:8500")
    if err != nil {
        log.Fatal(err)
    }

    err = registry.Register(
        "order-service",
        "192.168.1.10",
        8080,
        []string{"v1", "production"},
    )
    if err != nil {
        log.Fatal(err)
    }

    // 启动HTTP服务
    service := &OrderService{registry: registry}
    http.HandleFunc("/health", service.healthCheck)
    http.HandleFunc("/order", service.createOrder)

    go func() {
        log.Println("Starting HTTP server on :8080")
        http.ListenAndServe(":8080", nil)
    }()

    // 优雅退出
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
    <-signalChan

    log.Println("Shutting down...")
    registry.Deregister()
}

2. Eureka

简介:Netflix开源的服务注册与发现组件(Spring Cloud核心组件)

特点:

  • AP系统(高可用优先)
  • 自我保护机制
  • 客户端缓存
  • REST API

架构:

┌─────────────────────────────────────┐
│      Eureka Server Cluster          │
│  ┌─────────────┐  ┌─────────────┐  │
│  │  Eureka 1   │  │  Eureka 2   │  │
│  │  (Peer)     │←→│  (Peer)     │  │
│  └─────────────┘  └─────────────┘  │
└─────────────────────────────────────┘
         ↑                   ↑
         │ Register          │ Fetch
         │ Heartbeat         │ Registry
         ↓                   ↓
  ┌─────────────┐     ┌─────────────┐
  │ Service A   │     │ Service B   │
  │ (Provider)  │     │ (Consumer)  │
  └─────────────┘     └─────────────┘

Spring Cloud集成:

// Eureka Server
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

// application.yml
server:
  port: 8761

eureka:
  client:
    register-with-eureka: false
    fetch-registry: false
// Eureka Client (服务注册)
@SpringBootApplication
@EnableDiscoveryClient
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}

// application.yml
spring:
  application:
    name: order-service

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/
  instance:
    prefer-ip-address: true
    lease-renewal-interval-in-seconds: 10  # 心跳间隔
    lease-expiration-duration-in-seconds: 30  # 过期时间

Go客户端示例:

// Eureka Go客户端(使用REST API)
type EurekaClient struct {
    serverURL string
    client    *http.Client
}

func NewEurekaClient(serverURL string) *EurekaClient {
    return &EurekaClient{
        serverURL: serverURL,
        client:    &http.Client{Timeout: 10 * time.Second},
    }
}

type Instance struct {
    HostName string `json:"hostName"`
    App      string `json:"app"`
    IPAddr   string `json:"ipAddr"`
    Port     int    `json:"port"`
    Status   string `json:"status"`
}

func (ec *EurekaClient) Register(instance *Instance) error {
    url := fmt.Sprintf("%s/eureka/apps/%s", ec.serverURL, instance.App)

    body := map[string]interface{}{
        "instance": instance,
    }

    jsonData, _ := json.Marshal(body)

    req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
    req.Header.Set("Content-Type", "application/json")

    resp, err := ec.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusNoContent {
        return fmt.Errorf("register failed: %d", resp.StatusCode)
    }

    return nil
}

func (ec *EurekaClient) SendHeartbeat(appName, instanceID string) error {
    url := fmt.Sprintf("%s/eureka/apps/%s/%s", ec.serverURL, appName, instanceID)

    req, _ := http.NewRequest("PUT", url, nil)
    resp, err := ec.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    return nil
}

func (ec *EurekaClient) Discover(appName string) ([]*Instance, error) {
    url := fmt.Sprintf("%s/eureka/apps/%s", ec.serverURL, appName)

    resp, err := ec.client.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    var result struct {
        Application struct {
            Instance []*Instance `json:"instance"`
        } `json:"application"`
    }

    json.NewDecoder(resp.Body).Decode(&result)

    return result.Application.Instance, nil
}

3. Nacos

简介:阿里开源的动态服务发现、配置管理和服务管理平台

特点:

  • 服务发现(AP/CP可选)
  • 配置管理
  • 动态DNS
  • 服务健康检查
  • 多语言支持

Go SDK示例:

package main

import (
    "github.com/nacos-group/nacos-sdk-go/clients"
    "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
    "github.com/nacos-group/nacos-sdk-go/common/constant"
    "github.com/nacos-group/nacos-sdk-go/vo"
    "log"
)

type NacosRegistry struct {
    client naming_client.INamingClient
}

func NewNacosRegistry(serverAddr string, port uint64) (*NacosRegistry, error) {
    sc := []constant.ServerConfig{
        *constant.NewServerConfig(serverAddr, port),
    }

    cc := constant.ClientConfig{
        NamespaceId:         "public",
        TimeoutMs:           5000,
        NotLoadCacheAtStart: true,
        LogDir:              "/tmp/nacos/log",
        CacheDir:            "/tmp/nacos/cache",
        LogLevel:            "info",
    }

    client, err := clients.CreateNamingClient(map[string]interface{}{
        "serverConfigs": sc,
        "clientConfig":  cc,
    })
    if err != nil {
        return nil, err
    }

    return &NacosRegistry{client: client}, nil
}

func (nr *NacosRegistry) Register(serviceName, ip string, port uint64) error {
    _, err := nr.client.RegisterInstance(vo.RegisterInstanceParam{
        Ip:          ip,
        Port:        port,
        ServiceName: serviceName,
        Weight:      10,
        Enable:      true,
        Healthy:     true,
        Ephemeral:   true,
        Metadata:    map[string]string{"version": "1.0"},
    })

    if err != nil {
        return err
    }

    log.Printf("Service registered: %s", serviceName)
    return nil
}

func (nr *NacosRegistry) Discover(serviceName string) ([]vo.Instance, error) {
    instances, err := nr.client.SelectInstances(vo.SelectInstancesParam{
        ServiceName: serviceName,
        HealthyOnly: true,
    })

    return instances, err
}

func (nr *NacosRegistry) Subscribe(serviceName string, callback func([]vo.Instance)) error {
    return nr.client.Subscribe(&vo.SubscribeParam{
        ServiceName: serviceName,
        SubscribeCallback: func(services []model.SubscribeService, err error) {
            if err != nil {
                log.Printf("Subscribe error: %v", err)
                return
            }

            instances := make([]vo.Instance, 0)
            for _, service := range services {
                instances = append(instances, vo.Instance{
                    Ip:          service.Ip,
                    Port:        service.Port,
                    ServiceName: service.ServiceName,
                })
            }

            callback(instances)
        },
    })
}

func main() {
    registry, _ := NewNacosRegistry("127.0.0.1", 8848)

    // 注册服务
    registry.Register("order-service", "192.168.1.10", 8080)

    // 发现服务
    instances, _ := registry.Discover("order-service")
    for _, instance := range instances {
        log.Printf("Instance: %s:%d", instance.Ip, instance.Port)
    }

    // 订阅服务变化
    registry.Subscribe("order-service", func(instances []vo.Instance) {
        log.Printf("Service instances changed: %d", len(instances))
    })

    select {}
}

健康检查机制

1. 心跳检查(Heartbeat)

原理:客户端定期发送心跳到注册中心

Client → Registry: Heartbeat (每10秒)
Registry: 更新最后心跳时间

如果30秒未收到心跳 → 标记为Down

实现:

type HeartbeatRegistry struct {
    lastHeartbeat map[string]time.Time
    mu            sync.RWMutex
}

func (hr *HeartbeatRegistry) Heartbeat(serviceID string) {
    hr.mu.Lock()
    defer hr.mu.Unlock()

    hr.lastHeartbeat[serviceID] = time.Now()
}

func (hr *HeartbeatRegistry) CheckHealth() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        hr.mu.Lock()
        for serviceID, lastTime := range hr.lastHeartbeat {
            if time.Since(lastTime) > 30*time.Second {
                log.Printf("Service %s is down", serviceID)
                delete(hr.lastHeartbeat, serviceID)
            }
        }
        hr.mu.Unlock()
    }
}

2. HTTP检查

原理:注册中心定期发送HTTP请求到服务的健康检查端点

Registry → Service: GET /health (每10秒)
Service → Registry: 200 OK

如果返回非200或超时 → 标记为Down

代码示例:

// 服务端健康检查端点
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
    // 检查数据库连接
    if err := db.Ping(); err != nil {
        w.WriteHeader(http.StatusServiceUnavailable)
        json.NewEncoder(w).Encode(map[string]string{
            "status": "DOWN",
            "reason": "database unreachable",
        })
        return
    }

    // 检查Redis连接
    if err := redis.Ping(); err != nil {
        w.WriteHeader(http.StatusServiceUnavailable)
        json.NewEncoder(w).Encode(map[string]string{
            "status": "DOWN",
            "reason": "redis unreachable",
        })
        return
    }

    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "status": "UP",
        "timestamp": time.Now().Format(time.RFC3339),
    })
}

// 注册中心健康检查
func (registry *Registry) httpCheck(instance *Instance) bool {
    client := &http.Client{Timeout: 3 * time.Second}
    resp, err := client.Get(instance.HealthCheckURL)
    if err != nil {
        return false
    }
    defer resp.Body.Close()

    return resp.StatusCode == http.StatusOK
}

3. TCP检查

原理:检查TCP端口是否可连接

func (registry *Registry) tcpCheck(addr string) bool {
    conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
    if err != nil {
        return false
    }
    conn.Close()
    return true
}

实战案例

完整微服务示例

架构:

┌─────────────┐
│   Consul    │
│  (Registry) │
└──────┬──────┘
       │
   ┌───┴───┬───────┬────────┐
   ↓       ↓       ↓        ↓
┌──────┐┌──────┐┌──────┐┌──────┐
│User  ││Order ││Stock ││Pay   │
│Service│Service│Service│Service│
└──────┘└──────┘└──────┘└──────┘

代码实现:

// 通用服务框架
type MicroService struct {
    name        string
    addr        string
    port        int
    registry    *ConsulRegistry
    discovery   *ConsulDiscovery
    httpServer  *http.Server
}

func NewMicroService(name, addr string, port int, consulAddr string) (*MicroService, error) {
    registry, err := NewConsulRegistry(consulAddr)
    if err != nil {
        return nil, err
    }

    discovery, err := NewConsulDiscovery(consulAddr)
    if err != nil {
        return nil, err
    }

    return &MicroService{
        name:      name,
        addr:      addr,
        port:      port,
        registry:  registry,
        discovery: discovery,
    }, nil
}

func (ms *MicroService) Start() error {
    // 注册服务
    err := ms.registry.Register(ms.name, ms.addr, ms.port, nil)
    if err != nil {
        return err
    }

    // 启动HTTP服务
    mux := http.NewServeMux()
    mux.HandleFunc("/health", ms.healthCheck)

    ms.httpServer = &http.Server{
        Addr:    fmt.Sprintf(":%d", ms.port),
        Handler: mux,
    }

    go func() {
        log.Printf("Starting %s on %s:%d", ms.name, ms.addr, ms.port)
        if err := ms.httpServer.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    return nil
}

func (ms *MicroService) Stop() error {
    // 注销服务
    ms.registry.Deregister()

    // 关闭HTTP服务
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    return ms.httpServer.Shutdown(ctx)
}

func (ms *MicroService) healthCheck(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "status": "UP",
        "service": ms.name,
    })
}

// 调用其他服务
func (ms *MicroService) CallService(serviceName, path string) ([]byte, error) {
    // 发现服务
    instances, err := ms.discovery.DiscoverService(serviceName)
    if err != nil {
        return nil, err
    }

    if len(instances) == 0 {
        return nil, errors.New("no available instances")
    }

    // 随机选择实例
    instance := instances[rand.Intn(len(instances))]

    // 发起HTTP请求
    url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    return ioutil.ReadAll(resp.Body)
}

// 订单服务
func main() {
    service, _ := NewMicroService(
        "order-service",
        "192.168.1.10",
        8080,
        "localhost:8500",
    )

    // 添加业务处理器
    http.HandleFunc("/order/create", func(w http.ResponseWriter, r *http.Request) {
        // 调用库存服务
        stockData, err := service.CallService("stock-service", "/stock/check")
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        log.Printf("Stock response: %s", stockData)

        // 创建订单
        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(map[string]string{
            "order_id": "12345",
            "status":   "created",
        })
    })

    service.Start()

    // 优雅退出
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
    <-signalChan

    service.Stop()
}

面试问答

客户端发现和服务端发现有什么区别?如何选择?

答案:

对比维度客户端发现服务端发现
实现复杂度高(客户端需实现负载均衡)低(客户端简单)
性能高(直连服务)中(多一跳)
单点风险无有(负载均衡器)
多语言支持难(需多套客户端)易(HTTP标准)
适用场景内部微服务外部API、多语言

选择建议:

内部微服务(同语言):
→ 客户端发现(性能优先)
示例:Dubbo、Spring Cloud Ribbon

外部API、多语言环境:
→ 服务端发现(简单优先)
示例:Kubernetes Service、AWS ELB

如何保证服务发现的高可用?

答案:

1. 注册中心集群部署

Consul/etcd: 3-5节点集群
容忍(n-1)/2个节点故障

示例(5节点):
- 2个节点故障 → 仍可用
- 3个节点故障 → 不可用

2. 客户端缓存

type ResilientDiscovery struct {
    discovery *ServiceDiscovery
    cache     map[string][]*Instance
    mu        sync.RWMutex
}

func (rd *ResilientDiscovery) GetInstances(serviceName string) []*Instance {
    // 1. 尝试从注册中心获取
    instances, err := rd.discovery.DiscoverService(serviceName)
    if err == nil {
        rd.updateCache(serviceName, instances)
        return instances
    }

    // 2. 注册中心故障,使用缓存
    rd.mu.RLock()
    defer rd.mu.RUnlock()

    if cached, ok := rd.cache[serviceName]; ok {
        log.Println("Using cached instances (registry unavailable)")
        return cached
    }

    return nil
}

3. 健康检查

客户端本地健康检查:
- 快速失败(连接超时)
- 重试其他实例
- 定期刷新实例列表

Consul、Eureka、Nacos有什么区别?

答案:

对比维度ConsulEurekaNacos
CAPCPAPAP/CP可选
健康检查丰富(HTTP/TCP/脚本)HTTP心跳HTTP/TCP/MySQL
KV存储支持不支持支持
多数据中心支持不支持支持
Spring Cloud支持原生支持支持
K8s集成好一般好
UI有有有(更友好)
性能高中高

选择建议:

Spring Cloud生态 → Eureka(Netflix原生)
多语言、Kubernetes → Consul
国内、配置管理需求强 → Nacos(阿里开源)

服务实例上下线如何避免流量损失?

答案:

问题场景:

服务实例下线:
1. 服务收到SIGTERM信号
2. 立即停止接收新请求
3. 但注册中心可能还未感知(心跳间隔10秒)
4. 客户端继续调用 → 失败 

解决方案:

1. 优雅停机(Graceful Shutdown)

func (ms *MicroService) GracefulShutdown() {
    // 1. 注销服务(立即通知注册中心)
    log.Println("Deregistering from registry...")
    ms.registry.Deregister()

    // 2. 等待客户端刷新(给客户端一些时间)
    log.Println("Waiting for clients to refresh...")
    time.Sleep(15 * time.Second)

    // 3. 停止接收新请求
    log.Println("Stopping HTTP server...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // 4. 等待正在处理的请求完成
    ms.httpServer.Shutdown(ctx)

    log.Println("Shutdown complete")
}

2. 健康检查端点控制

type MicroService struct {
    // ...
    shuttingDown atomic.Bool
}

func (ms *MicroService) healthCheck(w http.ResponseWriter, r *http.Request) {
    if ms.shuttingDown.Load() {
        // 停机中,返回503
        w.WriteHeader(http.StatusServiceUnavailable)
        return
    }

    w.WriteHeader(http.StatusOK)
}

func (ms *MicroService) Shutdown() {
    // 1. 标记为停机状态(健康检查返回503)
    ms.shuttingDown.Store(true)

    // 2. 等待注册中心感知(健康检查会失败)
    time.Sleep(15 * time.Second)

    // 3. 正式停机
    ms.httpServer.Shutdown(context.Background())
}

3. 客户端重试

func (client *ServiceClient) CallWithRetry(serviceName, path string, maxRetries int) ([]byte, error) {
    var lastErr error

    for i := 0; i < maxRetries; i++ {
        instance, err := client.chooseInstance(serviceName)
        if err != nil {
            return nil, err
        }

        data, err := client.httpCall(instance, path)
        if err == nil {
            return data, nil
        }

        lastErr = err
        log.Printf("Retry %d/%d failed: %v", i+1, maxRetries, err)

        // 标记实例为故障(短期内不再选择)
        client.markInstanceDown(instance)
    }

    return nil, lastErr
}

如何实现服务的灰度发布?

答案:

方案1: 基于Tag的路由

// 注册服务时打标签
registry.Register(
    "order-service",
    "192.168.1.10",
    8080,
    []string{"version=v2", "env=canary"},  // 金丝雀版本
)

// 客户端路由
type VersionRouter struct {
    discovery *ServiceDiscovery
}

func (vr *VersionRouter) ChooseInstance(serviceName, version string) (*Instance, error) {
    instances, _ := vr.discovery.DiscoverService(serviceName)

    // 按版本过滤
    filtered := make([]*Instance, 0)
    for _, instance := range instances {
        for _, tag := range instance.Tags {
            if tag == "version="+version {
                filtered = append(filtered, instance)
                break
            }
        }
    }

    if len(filtered) == 0 {
        // 降级到稳定版本
        return vr.ChooseInstance(serviceName, "v1")
    }

    // 随机选择
    return filtered[rand.Intn(len(filtered))], nil
}

// 使用
func main() {
    router := &VersionRouter{discovery: discovery}

    // 5%流量到v2(金丝雀)
    if rand.Float64() < 0.05 {
        instance, _ := router.ChooseInstance("order-service", "v2")
    } else {
        instance, _ := router.ChooseInstance("order-service", "v1")
    }
}

方案2: Consul的流量分割

# Consul Service Router配置
Kind = "service-router"
Name = "order-service"

Routes = [
  {
    Match {
      HTTP {
        Header = [
          {
            Name  = "X-Version"
            Exact = "v2"
          }
        ]
      }
    }
    Destination {
      Service = "order-service"
      ServiceSubset = "v2"
    }
  },
  {
    Match {
      HTTP {
        PathPrefix = "/"
      }
    }
    Destination {
      Service = "order-service"
      ServiceSubset = "v1"
    }
  }
]

Prev
第3章:分布式协调
Next
第5章:负载均衡