第4章:服务发现与注册
为什么需要服务发现
单体应用 vs 微服务
单体应用:
Client → Load Balancer → Application Server (固定IP)
- 192.168.1.10
- 192.168.1.11
- 192.168.1.12
配置文件:
upstream backend {
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
问题:
手动配置IP
扩容需要修改配置
故障节点无法自动剔除
微服务架构:
订单服务需要调用:
- 用户服务(3个实例)
- 库存服务(5个实例)
- 支付服务(4个实例)
- 优惠券服务(2个实例)
...
挑战:
服务实例动态变化(自动扩缩容)
IP地址不固定(容器化部署)
服务依赖关系复杂
手动配置不可行
服务发现解决的问题
核心需求:
1. 服务注册
服务启动 → 注册到注册中心
{
"service": "order-service",
"instance": "192.168.1.10:8080",
"health": "UP"
}
2. 服务发现
Client → 查询注册中心 → 获取可用实例列表
[
"192.168.1.10:8080",
"192.168.1.11:8080",
"192.168.1.12:8080"
]
3. 健康检查
注册中心 → 定期检查实例健康状态
→ 自动剔除故障实例
4. 负载均衡
Client → 从实例列表中选择一个
(轮询、随机、加权等策略)
架构对比:
传统架构(静态配置):
┌────────┐ ┌─────────────┐
│ Client │───────→│ Nginx │
└────────┘ │ (硬编码IP) │
└─────────────┘
↓
┌───────┴───────┐
↓ ↓
Server1 Server2
微服务架构(服务发现):
┌────────┐ ┌──────────────┐
│ Client │───────→│ Registry │
└────────┘ ①查询│ (Consul/etcd)│
└──────────────┘
↑ ②注册
┌───────┴───────┐
↓ ↓
Service A Service B
- Instance1 - Instance1
- Instance2 - Instance2
- Instance3
优势:
动态感知服务变化
自动故障转移
支持自动扩缩容
无需手动配置
服务注册模式
1. 客户端注册(Self-Registration)
定义:服务实例自己负责注册到注册中心
流程:
1. 服务启动
↓
2. 调用注册中心API,注册自己
POST /v1/agent/service/register
{
"ID": "order-service-1",
"Name": "order-service",
"Address": "192.168.1.10",
"Port": 8080,
"Check": {
"HTTP": "http://192.168.1.10:8080/health",
"Interval": "10s"
}
}
↓
3. 定期发送心跳(保持注册状态)
↓
4. 服务关闭时,主动注销
代码示例(Go + Consul):
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
"log"
"os"
"os/signal"
"syscall"
)
type ServiceRegistry struct {
client *api.Client
serviceID string
}
func NewServiceRegistry(consulAddr string) (*ServiceRegistry, error) {
config := api.DefaultConfig()
config.Address = consulAddr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceRegistry{client: client}, nil
}
// 注册服务
func (sr *ServiceRegistry) Register(serviceName, addr string, port int) error {
serviceID := fmt.Sprintf("%s-%s-%d", serviceName, addr, port)
sr.serviceID = serviceID
registration := &api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Address: addr,
Port: port,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", addr, port),
Interval: "10s",
Timeout: "3s",
DeregisterCriticalServiceAfter: "30s", // 30秒无响应自动注销
},
}
err := sr.client.Agent().ServiceRegister(registration)
if err != nil {
return err
}
log.Printf("Service registered: %s", serviceID)
return nil
}
// 注销服务
func (sr *ServiceRegistry) Deregister() error {
err := sr.client.Agent().ServiceDeregister(sr.serviceID)
if err != nil {
return err
}
log.Printf("Service deregistered: %s", sr.serviceID)
return nil
}
// 优雅退出
func (sr *ServiceRegistry) WaitForShutdown() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan
log.Println("Shutdown signal received")
// 注销服务
sr.Deregister()
}
// 使用示例
func main() {
registry, err := NewServiceRegistry("localhost:8500")
if err != nil {
log.Fatal(err)
}
// 注册服务
err = registry.Register("order-service", "192.168.1.10", 8080)
if err != nil {
log.Fatal(err)
}
// 启动HTTP服务
// http.HandleFunc("/health", healthCheck)
// go http.ListenAndServe(":8080", nil)
// 等待退出信号
registry.WaitForShutdown()
}
优点:
- 简单直接
- 服务实例完全控制注册逻辑
- 无需额外组件
缺点:
- 每个服务都需要实现注册逻辑
- 耦合了注册中心的客户端库
- 多语言环境需要多套客户端
2. 第三方注册(Third-Party Registration)
定义:由独立的注册组件(Registrar)负责服务注册
流程:
┌──────────────┐
│ Service │
└──────┬───────┘
│ ①启动
↓
┌──────────────┐
│ Registrar │ ←──── 监听服务启动事件
│ (Kubernetes │ (如:容器创建)
│ Service) │
└──────┬───────┘
│ ②注册
↓
┌──────────────┐
│ Registry │
│ (etcd/DNS) │
└──────────────┘
典型实现:
- Kubernetes Service
apiVersion: v1
kind: Service
metadata:
name: order-service
spec:
selector:
app: order-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
# Kubernetes自动注册到内部DNS
# order-service.default.svc.cluster.local → 多个Pod IP
- Registrator(Docker)
# 自动监听Docker容器启动,注册到Consul
docker run -d \
--name=registrator \
--volume=/var/run/docker.sock:/tmp/docker.sock \
gliderlabs/registrator:latest \
consul://consul:8500
优点:
- 服务代码无需关心注册逻辑
- 解耦服务和注册中心
- 统一管理
缺点:
- 需要额外组件
- 增加运维复杂度
服务发现模式
1. 客户端发现(Client-Side Discovery)
定义:客户端直接查询注册中心,获取服务实例列表,自己选择实例调用
流程:
┌────────┐
│ Client │
└───┬────┘
│ ①查询服务实例
↓
┌──────────┐
│ Registry │
│ (Consul) │
└────┬─────┘
│ ②返回实例列表
│ [ip1, ip2, ip3]
↓
┌────────┐
│ Client │ ③负载均衡(客户端)
└───┬────┘
│ ④直接调用
↓
┌─────────┐
│ Service │
│ Instance│
└─────────┘
代码示例:
package main
import (
"github.com/hashicorp/consul/api"
"math/rand"
"time"
)
type ServiceDiscovery struct {
client *api.Client
}
func NewServiceDiscovery(consulAddr string) (*ServiceDiscovery, error) {
config := api.DefaultConfig()
config.Address = consulAddr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceDiscovery{client: client}, nil
}
// 发现服务实例(健康的)
func (sd *ServiceDiscovery) DiscoverService(serviceName string) ([]string, error) {
services, _, err := sd.client.Health().Service(
serviceName,
"",
true, // 只返回健康的实例
nil,
)
if err != nil {
return nil, err
}
instances := make([]string, 0, len(services))
for _, service := range services {
addr := fmt.Sprintf("%s:%d",
service.Service.Address,
service.Service.Port,
)
instances = append(instances, addr)
}
return instances, nil
}
// 客户端负载均衡器
type LoadBalancer struct {
discovery *ServiceDiscovery
cache map[string][]string // 缓存实例列表
mu sync.RWMutex
}
func NewLoadBalancer(discovery *ServiceDiscovery) *LoadBalancer {
lb := &LoadBalancer{
discovery: discovery,
cache: make(map[string][]string),
}
// 定期刷新实例列表
go lb.refreshLoop()
return lb
}
func (lb *LoadBalancer) refreshLoop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
for serviceName := range lb.cache {
instances, err := lb.discovery.DiscoverService(serviceName)
if err == nil {
lb.mu.Lock()
lb.cache[serviceName] = instances
lb.mu.Unlock()
}
}
}
}
// 随机负载均衡
func (lb *LoadBalancer) ChooseInstance(serviceName string) (string, error) {
lb.mu.RLock()
instances, ok := lb.cache[serviceName]
lb.mu.RUnlock()
if !ok || len(instances) == 0 {
// 首次访问,从注册中心获取
var err error
instances, err = lb.discovery.DiscoverService(serviceName)
if err != nil {
return "", err
}
lb.mu.Lock()
lb.cache[serviceName] = instances
lb.mu.Unlock()
}
if len(instances) == 0 {
return "", errors.New("no available instances")
}
// 随机选择
index := rand.Intn(len(instances))
return instances[index], nil
}
// 使用示例
func main() {
discovery, _ := NewServiceDiscovery("localhost:8500")
lb := NewLoadBalancer(discovery)
// 调用服务
for i := 0; i < 10; i++ {
instance, err := lb.ChooseInstance("order-service")
if err != nil {
log.Printf("Error: %v", err)
continue
}
log.Printf("Calling instance: %s", instance)
// 发起HTTP请求到instance
// resp, _ := http.Get(fmt.Sprintf("http://%s/api", instance))
}
}
优点:
- 客户端直接调用,无额外跳转(性能高)
- 负载均衡灵活(可自定义策略)
- 无单点故障
缺点:
- 客户端逻辑复杂
- 每个客户端都需要实现负载均衡
- 多语言环境需要多套客户端
适用场景:
- 高性能要求
- 同语言技术栈
- 微服务内部调用
2. 服务端发现(Server-Side Discovery)
定义:客户端通过负载均衡器(Load Balancer)访问服务,由负载均衡器查询注册中心
流程:
┌────────┐
│ Client │
└───┬────┘
│ ①请求
↓
┌────────────┐
│Load Balancer│ ②查询注册中心
│ (Nginx) │ ③负载均衡
└───┬─────────┘
│ ④转发请求
↓
┌─────────┐
│ Service │
│ Instance│
└─────────┘
典型实现:
1. AWS ELB + ECS
Client → ELB (Elastic Load Balancer)
→ ECS Service (自动注册到ELB)
→ Container Instances
2. Kubernetes Service
apiVersion: v1
kind: Service
metadata:
name: order-service
spec:
type: LoadBalancer
selector:
app: order-service
ports:
- port: 80
targetPort: 8080
# Client访问:
# http://order-service:80
# Kubernetes自动转发到健康的Pod
3. Nginx + Consul Template
# consul-template动态生成nginx配置
upstream order-service {
{{ range service "order-service" }}
server {{ .Address }}:{{ .Port }};
{{ end }}
}
server {
listen 80;
location / {
proxy_pass http://order-service;
}
}
# 启动consul-template,监听服务变化
consul-template \
-consul-addr=localhost:8500 \
-template="nginx.conf.tpl:nginx.conf:nginx -s reload"
优点:
- 客户端简单(只需知道负载均衡器地址)
- 多语言友好(HTTP标准)
- 统一入口(易于管理)
缺点:
- 负载均衡器是单点(需要HA)
- 额外的网络跳转(性能略低)
- 负载均衡器压力大
适用场景:
- 多语言环境
- 外部客户端访问
- 需要统一网关
主流实现方案
1. Consul
简介:HashiCorp开源的服务发现和配置管理工具
核心特性:
- 服务注册与发现
- 健康检查(HTTP、TCP、脚本)
- KV存储(配置管理)
- 多数据中心支持
- Service Mesh(Consul Connect)
架构:
┌─────────────────────────────────────┐
│ Consul Cluster │
│ ┌────────┐ ┌────────┐ ┌────────┐│
│ │Server 1│ │Server 2│ │Server 3││
│ │(Leader)│ │(Follower)│(Follower)││
│ └────────┘ └────────┘ └────────┘│
└─────────────────────────────────────┘
↑ ↑
│ │
┌──────┴────┐ ┌──────┴────┐
│Consul Agent│ │Consul Agent│
│ (Client) │ │ (Client) │
└──────┬─────┘ └──────┬─────┘
│ │
┌──────┴────┐ ┌──────┴────┐
│ Service A │ │ Service B │
└───────────┘ └───────────┘
完整代码示例:
package main
import (
"encoding/json"
"fmt"
"github.com/hashicorp/consul/api"
"log"
"net/http"
"time"
)
// 服务注册
type ConsulRegistry struct {
client *api.Client
serviceID string
stopCh chan struct{}
}
func NewConsulRegistry(consulAddr string) (*ConsulRegistry, error) {
config := api.DefaultConfig()
config.Address = consulAddr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulRegistry{
client: client,
stopCh: make(chan struct{}),
}, nil
}
func (cr *ConsulRegistry) Register(serviceName, addr string, port int, tags []string) error {
serviceID := fmt.Sprintf("%s-%s-%d", serviceName, addr, port)
cr.serviceID = serviceID
registration := &api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Tags: tags,
Address: addr,
Port: port,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", addr, port),
Interval: "10s",
Timeout: "3s",
DeregisterCriticalServiceAfter: "30s",
},
}
err := cr.client.Agent().ServiceRegister(registration)
if err != nil {
return err
}
log.Printf("Service registered: %s", serviceID)
// 启动心跳(可选,HTTP Check已经包含)
// go cr.sendHeartbeat()
return nil
}
func (cr *ConsulRegistry) Deregister() error {
close(cr.stopCh)
return cr.client.Agent().ServiceDeregister(cr.serviceID)
}
// 服务发现
type ConsulDiscovery struct {
client *api.Client
}
func NewConsulDiscovery(consulAddr string) (*ConsulDiscovery, error) {
config := api.DefaultConfig()
config.Address = consulAddr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulDiscovery{client: client}, nil
}
func (cd *ConsulDiscovery) DiscoverService(serviceName string) ([]*ServiceInstance, error) {
services, _, err := cd.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
instances := make([]*ServiceInstance, 0, len(services))
for _, service := range services {
instance := &ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
}
instances = append(instances, instance)
}
return instances, nil
}
type ServiceInstance struct {
ID string
Name string
Address string
Port int
Tags []string
}
// HTTP服务
type OrderService struct {
registry *ConsulRegistry
}
func (s *OrderService) healthCheck(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "UP",
"time": time.Now().Format(time.RFC3339),
})
}
func (s *OrderService) createOrder(w http.ResponseWriter, r *http.Request) {
// 业务逻辑
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"order_id": "12345",
"status": "created",
})
}
func main() {
// 注册服务
registry, err := NewConsulRegistry("localhost:8500")
if err != nil {
log.Fatal(err)
}
err = registry.Register(
"order-service",
"192.168.1.10",
8080,
[]string{"v1", "production"},
)
if err != nil {
log.Fatal(err)
}
// 启动HTTP服务
service := &OrderService{registry: registry}
http.HandleFunc("/health", service.healthCheck)
http.HandleFunc("/order", service.createOrder)
go func() {
log.Println("Starting HTTP server on :8080")
http.ListenAndServe(":8080", nil)
}()
// 优雅退出
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan
log.Println("Shutting down...")
registry.Deregister()
}
2. Eureka
简介:Netflix开源的服务注册与发现组件(Spring Cloud核心组件)
特点:
- AP系统(高可用优先)
- 自我保护机制
- 客户端缓存
- REST API
架构:
┌─────────────────────────────────────┐
│ Eureka Server Cluster │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Eureka 1 │ │ Eureka 2 │ │
│ │ (Peer) │←→│ (Peer) │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────┘
↑ ↑
│ Register │ Fetch
│ Heartbeat │ Registry
↓ ↓
┌─────────────┐ ┌─────────────┐
│ Service A │ │ Service B │
│ (Provider) │ │ (Consumer) │
└─────────────┘ └─────────────┘
Spring Cloud集成:
// Eureka Server
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
// application.yml
server:
port: 8761
eureka:
client:
register-with-eureka: false
fetch-registry: false
// Eureka Client (服务注册)
@SpringBootApplication
@EnableDiscoveryClient
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}
}
// application.yml
spring:
application:
name: order-service
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true
lease-renewal-interval-in-seconds: 10 # 心跳间隔
lease-expiration-duration-in-seconds: 30 # 过期时间
Go客户端示例:
// Eureka Go客户端(使用REST API)
type EurekaClient struct {
serverURL string
client *http.Client
}
func NewEurekaClient(serverURL string) *EurekaClient {
return &EurekaClient{
serverURL: serverURL,
client: &http.Client{Timeout: 10 * time.Second},
}
}
type Instance struct {
HostName string `json:"hostName"`
App string `json:"app"`
IPAddr string `json:"ipAddr"`
Port int `json:"port"`
Status string `json:"status"`
}
func (ec *EurekaClient) Register(instance *Instance) error {
url := fmt.Sprintf("%s/eureka/apps/%s", ec.serverURL, instance.App)
body := map[string]interface{}{
"instance": instance,
}
jsonData, _ := json.Marshal(body)
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")
resp, err := ec.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("register failed: %d", resp.StatusCode)
}
return nil
}
func (ec *EurekaClient) SendHeartbeat(appName, instanceID string) error {
url := fmt.Sprintf("%s/eureka/apps/%s/%s", ec.serverURL, appName, instanceID)
req, _ := http.NewRequest("PUT", url, nil)
resp, err := ec.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
func (ec *EurekaClient) Discover(appName string) ([]*Instance, error) {
url := fmt.Sprintf("%s/eureka/apps/%s", ec.serverURL, appName)
resp, err := ec.client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result struct {
Application struct {
Instance []*Instance `json:"instance"`
} `json:"application"`
}
json.NewDecoder(resp.Body).Decode(&result)
return result.Application.Instance, nil
}
3. Nacos
简介:阿里开源的动态服务发现、配置管理和服务管理平台
特点:
- 服务发现(AP/CP可选)
- 配置管理
- 动态DNS
- 服务健康检查
- 多语言支持
Go SDK示例:
package main
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"log"
)
type NacosRegistry struct {
client naming_client.INamingClient
}
func NewNacosRegistry(serverAddr string, port uint64) (*NacosRegistry, error) {
sc := []constant.ServerConfig{
*constant.NewServerConfig(serverAddr, port),
}
cc := constant.ClientConfig{
NamespaceId: "public",
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
LogLevel: "info",
}
client, err := clients.CreateNamingClient(map[string]interface{}{
"serverConfigs": sc,
"clientConfig": cc,
})
if err != nil {
return nil, err
}
return &NacosRegistry{client: client}, nil
}
func (nr *NacosRegistry) Register(serviceName, ip string, port uint64) error {
_, err := nr.client.RegisterInstance(vo.RegisterInstanceParam{
Ip: ip,
Port: port,
ServiceName: serviceName,
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"version": "1.0"},
})
if err != nil {
return err
}
log.Printf("Service registered: %s", serviceName)
return nil
}
func (nr *NacosRegistry) Discover(serviceName string) ([]vo.Instance, error) {
instances, err := nr.client.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
HealthyOnly: true,
})
return instances, err
}
func (nr *NacosRegistry) Subscribe(serviceName string, callback func([]vo.Instance)) error {
return nr.client.Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: func(services []model.SubscribeService, err error) {
if err != nil {
log.Printf("Subscribe error: %v", err)
return
}
instances := make([]vo.Instance, 0)
for _, service := range services {
instances = append(instances, vo.Instance{
Ip: service.Ip,
Port: service.Port,
ServiceName: service.ServiceName,
})
}
callback(instances)
},
})
}
func main() {
registry, _ := NewNacosRegistry("127.0.0.1", 8848)
// 注册服务
registry.Register("order-service", "192.168.1.10", 8080)
// 发现服务
instances, _ := registry.Discover("order-service")
for _, instance := range instances {
log.Printf("Instance: %s:%d", instance.Ip, instance.Port)
}
// 订阅服务变化
registry.Subscribe("order-service", func(instances []vo.Instance) {
log.Printf("Service instances changed: %d", len(instances))
})
select {}
}
健康检查机制
1. 心跳检查(Heartbeat)
原理:客户端定期发送心跳到注册中心
Client → Registry: Heartbeat (每10秒)
Registry: 更新最后心跳时间
如果30秒未收到心跳 → 标记为Down
实现:
type HeartbeatRegistry struct {
lastHeartbeat map[string]time.Time
mu sync.RWMutex
}
func (hr *HeartbeatRegistry) Heartbeat(serviceID string) {
hr.mu.Lock()
defer hr.mu.Unlock()
hr.lastHeartbeat[serviceID] = time.Now()
}
func (hr *HeartbeatRegistry) CheckHealth() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
hr.mu.Lock()
for serviceID, lastTime := range hr.lastHeartbeat {
if time.Since(lastTime) > 30*time.Second {
log.Printf("Service %s is down", serviceID)
delete(hr.lastHeartbeat, serviceID)
}
}
hr.mu.Unlock()
}
}
2. HTTP检查
原理:注册中心定期发送HTTP请求到服务的健康检查端点
Registry → Service: GET /health (每10秒)
Service → Registry: 200 OK
如果返回非200或超时 → 标记为Down
代码示例:
// 服务端健康检查端点
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
// 检查数据库连接
if err := db.Ping(); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "DOWN",
"reason": "database unreachable",
})
return
}
// 检查Redis连接
if err := redis.Ping(); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "DOWN",
"reason": "redis unreachable",
})
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "UP",
"timestamp": time.Now().Format(time.RFC3339),
})
}
// 注册中心健康检查
func (registry *Registry) httpCheck(instance *Instance) bool {
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(instance.HealthCheckURL)
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
3. TCP检查
原理:检查TCP端口是否可连接
func (registry *Registry) tcpCheck(addr string) bool {
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
return false
}
conn.Close()
return true
}
实战案例
完整微服务示例
架构:
┌─────────────┐
│ Consul │
│ (Registry) │
└──────┬──────┘
│
┌───┴───┬───────┬────────┐
↓ ↓ ↓ ↓
┌──────┐┌──────┐┌──────┐┌──────┐
│User ││Order ││Stock ││Pay │
│Service│Service│Service│Service│
└──────┘└──────┘└──────┘└──────┘
代码实现:
// 通用服务框架
type MicroService struct {
name string
addr string
port int
registry *ConsulRegistry
discovery *ConsulDiscovery
httpServer *http.Server
}
func NewMicroService(name, addr string, port int, consulAddr string) (*MicroService, error) {
registry, err := NewConsulRegistry(consulAddr)
if err != nil {
return nil, err
}
discovery, err := NewConsulDiscovery(consulAddr)
if err != nil {
return nil, err
}
return &MicroService{
name: name,
addr: addr,
port: port,
registry: registry,
discovery: discovery,
}, nil
}
func (ms *MicroService) Start() error {
// 注册服务
err := ms.registry.Register(ms.name, ms.addr, ms.port, nil)
if err != nil {
return err
}
// 启动HTTP服务
mux := http.NewServeMux()
mux.HandleFunc("/health", ms.healthCheck)
ms.httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", ms.port),
Handler: mux,
}
go func() {
log.Printf("Starting %s on %s:%d", ms.name, ms.addr, ms.port)
if err := ms.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
return nil
}
func (ms *MicroService) Stop() error {
// 注销服务
ms.registry.Deregister()
// 关闭HTTP服务
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return ms.httpServer.Shutdown(ctx)
}
func (ms *MicroService) healthCheck(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "UP",
"service": ms.name,
})
}
// 调用其他服务
func (ms *MicroService) CallService(serviceName, path string) ([]byte, error) {
// 发现服务
instances, err := ms.discovery.DiscoverService(serviceName)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, errors.New("no available instances")
}
// 随机选择实例
instance := instances[rand.Intn(len(instances))]
// 发起HTTP请求
url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
// 订单服务
func main() {
service, _ := NewMicroService(
"order-service",
"192.168.1.10",
8080,
"localhost:8500",
)
// 添加业务处理器
http.HandleFunc("/order/create", func(w http.ResponseWriter, r *http.Request) {
// 调用库存服务
stockData, err := service.CallService("stock-service", "/stock/check")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("Stock response: %s", stockData)
// 创建订单
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"order_id": "12345",
"status": "created",
})
})
service.Start()
// 优雅退出
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan
service.Stop()
}
面试问答
客户端发现和服务端发现有什么区别?如何选择?
答案:
| 对比维度 | 客户端发现 | 服务端发现 |
|---|---|---|
| 实现复杂度 | 高(客户端需实现负载均衡) | 低(客户端简单) |
| 性能 | 高(直连服务) | 中(多一跳) |
| 单点风险 | 无 | 有(负载均衡器) |
| 多语言支持 | 难(需多套客户端) | 易(HTTP标准) |
| 适用场景 | 内部微服务 | 外部API、多语言 |
选择建议:
内部微服务(同语言):
→ 客户端发现(性能优先)
示例:Dubbo、Spring Cloud Ribbon
外部API、多语言环境:
→ 服务端发现(简单优先)
示例:Kubernetes Service、AWS ELB
如何保证服务发现的高可用?
答案:
1. 注册中心集群部署
Consul/etcd: 3-5节点集群
容忍(n-1)/2个节点故障
示例(5节点):
- 2个节点故障 → 仍可用
- 3个节点故障 → 不可用
2. 客户端缓存
type ResilientDiscovery struct {
discovery *ServiceDiscovery
cache map[string][]*Instance
mu sync.RWMutex
}
func (rd *ResilientDiscovery) GetInstances(serviceName string) []*Instance {
// 1. 尝试从注册中心获取
instances, err := rd.discovery.DiscoverService(serviceName)
if err == nil {
rd.updateCache(serviceName, instances)
return instances
}
// 2. 注册中心故障,使用缓存
rd.mu.RLock()
defer rd.mu.RUnlock()
if cached, ok := rd.cache[serviceName]; ok {
log.Println("Using cached instances (registry unavailable)")
return cached
}
return nil
}
3. 健康检查
客户端本地健康检查:
- 快速失败(连接超时)
- 重试其他实例
- 定期刷新实例列表
Consul、Eureka、Nacos有什么区别?
答案:
| 对比维度 | Consul | Eureka | Nacos |
|---|---|---|---|
| CAP | CP | AP | AP/CP可选 |
| 健康检查 | 丰富(HTTP/TCP/脚本) | HTTP心跳 | HTTP/TCP/MySQL |
| KV存储 | 支持 | 不支持 | 支持 |
| 多数据中心 | 支持 | 不支持 | 支持 |
| Spring Cloud | 支持 | 原生支持 | 支持 |
| K8s集成 | 好 | 一般 | 好 |
| UI | 有 | 有 | 有(更友好) |
| 性能 | 高 | 中 | 高 |
选择建议:
Spring Cloud生态 → Eureka(Netflix原生)
多语言、Kubernetes → Consul
国内、配置管理需求强 → Nacos(阿里开源)
服务实例上下线如何避免流量损失?
答案:
问题场景:
服务实例下线:
1. 服务收到SIGTERM信号
2. 立即停止接收新请求
3. 但注册中心可能还未感知(心跳间隔10秒)
4. 客户端继续调用 → 失败
解决方案:
1. 优雅停机(Graceful Shutdown)
func (ms *MicroService) GracefulShutdown() {
// 1. 注销服务(立即通知注册中心)
log.Println("Deregistering from registry...")
ms.registry.Deregister()
// 2. 等待客户端刷新(给客户端一些时间)
log.Println("Waiting for clients to refresh...")
time.Sleep(15 * time.Second)
// 3. 停止接收新请求
log.Println("Stopping HTTP server...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 4. 等待正在处理的请求完成
ms.httpServer.Shutdown(ctx)
log.Println("Shutdown complete")
}
2. 健康检查端点控制
type MicroService struct {
// ...
shuttingDown atomic.Bool
}
func (ms *MicroService) healthCheck(w http.ResponseWriter, r *http.Request) {
if ms.shuttingDown.Load() {
// 停机中,返回503
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
func (ms *MicroService) Shutdown() {
// 1. 标记为停机状态(健康检查返回503)
ms.shuttingDown.Store(true)
// 2. 等待注册中心感知(健康检查会失败)
time.Sleep(15 * time.Second)
// 3. 正式停机
ms.httpServer.Shutdown(context.Background())
}
3. 客户端重试
func (client *ServiceClient) CallWithRetry(serviceName, path string, maxRetries int) ([]byte, error) {
var lastErr error
for i := 0; i < maxRetries; i++ {
instance, err := client.chooseInstance(serviceName)
if err != nil {
return nil, err
}
data, err := client.httpCall(instance, path)
if err == nil {
return data, nil
}
lastErr = err
log.Printf("Retry %d/%d failed: %v", i+1, maxRetries, err)
// 标记实例为故障(短期内不再选择)
client.markInstanceDown(instance)
}
return nil, lastErr
}
如何实现服务的灰度发布?
答案:
方案1: 基于Tag的路由
// 注册服务时打标签
registry.Register(
"order-service",
"192.168.1.10",
8080,
[]string{"version=v2", "env=canary"}, // 金丝雀版本
)
// 客户端路由
type VersionRouter struct {
discovery *ServiceDiscovery
}
func (vr *VersionRouter) ChooseInstance(serviceName, version string) (*Instance, error) {
instances, _ := vr.discovery.DiscoverService(serviceName)
// 按版本过滤
filtered := make([]*Instance, 0)
for _, instance := range instances {
for _, tag := range instance.Tags {
if tag == "version="+version {
filtered = append(filtered, instance)
break
}
}
}
if len(filtered) == 0 {
// 降级到稳定版本
return vr.ChooseInstance(serviceName, "v1")
}
// 随机选择
return filtered[rand.Intn(len(filtered))], nil
}
// 使用
func main() {
router := &VersionRouter{discovery: discovery}
// 5%流量到v2(金丝雀)
if rand.Float64() < 0.05 {
instance, _ := router.ChooseInstance("order-service", "v2")
} else {
instance, _ := router.ChooseInstance("order-service", "v1")
}
}
方案2: Consul的流量分割
# Consul Service Router配置
Kind = "service-router"
Name = "order-service"
Routes = [
{
Match {
HTTP {
Header = [
{
Name = "X-Version"
Exact = "v2"
}
]
}
}
Destination {
Service = "order-service"
ServiceSubset = "v2"
}
},
{
Match {
HTTP {
PathPrefix = "/"
}
}
Destination {
Service = "order-service"
ServiceSubset = "v1"
}
}
]