HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 分布式架构模式

    • 分布式架构模式手册
    • 第1章:分布式一致性
    • 第2章:分布式锁
    • 第3章:分布式协调
    • 第4章:服务发现与注册
    • 第5章:负载均衡
    • 第6章:熔断降级
    • 第7章:DDD领域驱动设计
    • 第8章:CQRS与Event Sourcing

第3章:分布式协调

什么是分布式协调

定义

分布式协调服务:在分布式系统中,提供配置管理、命名服务、分布式同步、组服务等功能的基础设施。

为什么需要分布式协调

单机场景:

Application
    ↓
Local Config File (config.yml)
    ↓
简单直接,重启应用生效

分布式场景问题:

Service A (Node1) → config.yml (version 1.0)
Service A (Node2) → config.yml (version 1.0)
Service A (Node3) → config.yml (version 1.1)  配置不一致!

问题:
1. 配置不一致 → 行为不一致
2. 配置变更需要重启 → 影响可用性
3. 无法动态感知 → 无法实时更新
4. 难以管理 → 运维复杂

分布式协调方案:

┌─────────────────────────────────┐
│   ZooKeeper / etcd Cluster      │
│   (统一配置存储 + Watch通知)    │
└─────────────────────────────────┘
      ↓          ↓          ↓
  Node1       Node2       Node3
  (Watch)     (Watch)     (Watch)

配置变更流程:
1. 管理员更新配置到ZK/etcd
2. ZK/etcd通知所有Watch的节点
3. 节点收到通知,重新加载配置
4. 无需重启,实时生效 

核心功能

1. 配置管理(Configuration Management)

场景:数据库连接串、限流阈值、开关配置

// 传统方式(不推荐)
const (
    DBHost = "mysql.prod.com"  // 硬编码,修改需重新发布
    MaxQPS = 1000
)

// 分布式协调方式(推荐)
type ConfigCenter struct {
    zkConn *zk.Conn
}

func (c *ConfigCenter) GetConfig(key string) (string, error) {
    data, _, err := c.zkConn.Get("/config/" + key)
    return string(data), err
}

func (c *ConfigCenter) WatchConfig(key string, callback func(string)) {
    for {
        data, _, eventCh, err := c.zkConn.GetW("/config/" + key)
        if err != nil {
            continue
        }

        callback(string(data))  // 首次读取

        // 等待变更通知
        event := <-eventCh
        if event.Type == zk.EventNodeDataChanged {
            // 配置变更,递归监听
        }
    }
}

2. 服务注册与发现(Service Registry)

场景:微服务动态上下线

服务注册:
OrderService (192.168.1.10:8080) → ZK: /services/order/node1
OrderService (192.168.1.11:8080) → ZK: /services/order/node2

服务发现:
UserService → ZK查询 /services/order/*
           → 获取所有可用实例列表
           → 负载均衡调用

3. 分布式锁(Distributed Lock)

场景:保证多个节点互斥访问共享资源

ZooKeeper临时顺序节点:
/locks/my-lock/0000000001  ← Node1 (最小序号,获得锁)
/locks/my-lock/0000000002  ← Node2 (等待)
/locks/my-lock/0000000003  ← Node3 (等待)

Node1释放锁 → Node2自动获得锁

4. Leader选举(Leader Election)

场景:分布式系统中选出主节点

Kafka Controller选举:
Broker1 → 创建 /controller (临时节点) → 成为Leader 
Broker2 → 创建失败 → 成为Follower
Broker3 → 创建失败 → 成为Follower

Broker1宕机 → 临时节点删除 → Broker2/3竞争创建 → 新Leader产生

ZooKeeper详解

简介

ZooKeeper:Apache开源的分布式协调服务,源于Google Chubby论文

特点:

  • 强一致性(CP系统)
  • 高可用(集群部署)
  • 顺序保证(全局有序)
  • 原子性(操作要么成功要么失败)

ZAB协议(ZooKeeper Atomic Broadcast)

ZAB vs Paxos:

  • ZAB是Paxos的工程实现
  • 针对ZooKeeper场景优化

核心流程:

1. Leader选举:
   ┌────────────────────────────────┐
   │  所有节点启动,都是LOOKING状态  │
   └────────────────────────────────┘
              ↓
   ┌────────────────────────────────┐
   │  投票给自己,广播投票信息       │
   │  (myid, zxid)                  │
   └────────────────────────────────┘
              ↓
   ┌────────────────────────────────┐
   │  接收其他节点投票,比较:       │
   │  1. zxid更大的优先(数据更新)  │
   │  2. zxid相同,myid更大的优先    │
   └────────────────────────────────┘
              ↓
   ┌────────────────────────────────┐
   │  获得超过半数投票 → Leader      │
   │  其他节点 → Follower            │
   └────────────────────────────────┘

2. 数据同步:
   Leader → Follower同步最新数据

3. 消息广播:
   Client写请求 → Leader → 2PC广播 → 过半确认 → 提交

2PC广播流程:

// Leader接收客户端写请求
func (l *Leader) HandleWrite(request WriteRequest) {
    // 1. 生成全局递增的zxid
    zxid := l.GenerateZxid()

    // 2. 发送Proposal给所有Follower
    proposal := Proposal{
        Zxid: zxid,
        Data: request.Data,
    }

    ackCount := 1  // Leader自己先ACK
    for _, follower := range l.followers {
        go func(f *Follower) {
            if f.SendProposal(proposal) {
                ackCount++
            }
        }(follower)
    }

    // 3. 等待过半ACK
    if ackCount > len(l.followers)/2 {
        // 4. 发送Commit给所有Follower
        for _, follower := range l.followers {
            follower.SendCommit(zxid)
        }

        // 5. 返回客户端成功
        return Success
    }

    return Failed
}

ZXID(ZooKeeper Transaction ID):

ZXID = 64位整数
    ↓
高32位: epoch(Leader纪元)
低32位: counter(递增计数器)

示例:
Zxid = 0x0000000100000001
       ↑         ↑
       epoch=1   counter=1

作用:
1. 全局有序(单调递增)
2. 识别数据新旧
3. Leader选举依据

数据模型

树形结构:

/                           (根节点)
├── /services               (服务注册目录)
│   ├── /order
│   │   ├── node1 (临时节点)
│   │   └── node2 (临时节点)
│   └── /user
│       └── node1
├── /config                 (配置管理)
│   ├── /db
│   │   └── host (持久节点)
│   └── /cache
│       └── ttl
└── /locks                  (分布式锁)
    └── /my-lock
        ├── 0000000001 (临时顺序节点)
        └── 0000000002

节点类型:

类型说明应用场景
持久节点创建后一直存在,除非主动删除配置存储
临时节点客户端断开连接后自动删除服务注册、分布式锁
持久顺序节点自动追加序号,持久-
临时顺序节点自动追加序号,临时分布式锁、选举

代码示例:

package main

import (
    "github.com/samuel/go-zookeeper/zk"
    "time"
)

type ZKClient struct {
    conn *zk.Conn
}

func NewZKClient(servers []string) (*ZKClient, error) {
    conn, _, err := zk.Connect(servers, 10*time.Second)
    if err != nil {
        return nil, err
    }
    return &ZKClient{conn: conn}, nil
}

// 创建持久节点(配置存储)
func (c *ZKClient) CreatePersistent(path, data string) error {
    _, err := c.conn.Create(
        path,
        []byte(data),
        0,  // flags=0 表示持久节点
        zk.WorldACL(zk.PermAll),
    )
    return err
}

// 创建临时节点(服务注册)
func (c *ZKClient) CreateEphemeral(path, data string) error {
    _, err := c.conn.Create(
        path,
        []byte(data),
        zk.FlagEphemeral,  // 临时节点
        zk.WorldACL(zk.PermAll),
    )
    return err
}

// 创建临时顺序节点(分布式锁)
func (c *ZKClient) CreateEphemeralSequential(path, data string) (string, error) {
    return c.conn.Create(
        path,
        []byte(data),
        zk.FlagEphemeral|zk.FlagSequence,  // 临时+顺序
        zk.WorldACL(zk.PermAll),
    )
}

Watch机制

定义:客户端可以监听ZNode的变化,当节点数据/子节点发生变化时,ZooKeeper会主动通知客户端

特点:

  • 一次性触发:Watch触发后需要重新设置
  • 异步通知:回调方式通知客户端
  • 顺序保证:事件通知顺序与操作顺序一致

Watch类型:

1. GetW: 监听节点数据变化
   Event: EventNodeDataChanged

2. ExistsW: 监听节点创建/删除/数据变化
   Event: EventNodeCreated, EventNodeDeleted, EventNodeDataChanged

3. ChildrenW: 监听子节点列表变化
   Event: EventNodeChildrenChanged

代码示例:

// 配置热更新
type ConfigWatcher struct {
    zkClient *ZKClient
    cache    map[string]string
}

func (w *ConfigWatcher) WatchConfig(key string) {
    path := "/config/" + key

    for {
        // GetW: 获取数据并设置Watch
        data, stat, eventCh, err := w.zkClient.conn.GetW(path)
        if err != nil {
            time.Sleep(1 * time.Second)
            continue
        }

        // 更新缓存
        w.cache[key] = string(data)
        log.Printf("Config updated: %s = %s (version=%d)",
            key, string(data), stat.Version)

        // 等待变更事件
        event := <-eventCh
        log.Printf("Received event: %v", event.Type)

        // Watch是一次性的,需要重新设置(循环继续)
    }
}

// 服务发现
type ServiceDiscovery struct {
    zkClient *ZKClient
    services map[string][]string  // serviceName -> [instances]
}

func (sd *ServiceDiscovery) WatchService(serviceName string) {
    path := "/services/" + serviceName

    for {
        // ChildrenW: 获取子节点并设置Watch
        children, stat, eventCh, err := sd.zkClient.conn.ChildrenW(path)
        if err != nil {
            time.Sleep(1 * time.Second)
            continue
        }

        // 更新服务实例列表
        instances := make([]string, 0)
        for _, child := range children {
            data, _, _ := sd.zkClient.conn.Get(path + "/" + child)
            instances = append(instances, string(data))
        }

        sd.services[serviceName] = instances
        log.Printf("Service instances updated: %v", instances)

        // 等待子节点变更事件
        event := <-eventCh
        if event.Type == zk.EventNodeChildrenChanged {
            log.Printf("Service instances changed")
        }
    }
}

Watch实现原理:

Client端:
┌─────────────────────────────────┐
│  WatchManager                   │
│  - 维护Watch注册表              │
│  - path -> []Watcher            │
└─────────────────────────────────┘
          ↓
    getW("/config/db")
          ↓
┌─────────────────────────────────┐
│  发送请求给Server               │
│  Request: GET + Watch flag      │
└─────────────────────────────────┘

Server端:
┌─────────────────────────────────┐
│  WatchManager                   │
│  - path -> Set<Session>         │
│  - 记录哪些客户端监听哪些路径   │
└─────────────────────────────────┘
          ↓
    数据变更时
          ↓
┌─────────────────────────────────┐
│  触发Watch                      │
│  - 查找监听该path的所有Session  │
│  - 发送WatchEvent给客户端       │
│  - 删除Watch(一次性)          │
└─────────────────────────────────┘

ZooKeeper集群

部署模式:

推荐集群规模:
- 3节点:容忍1个故障(3/2+1=2)
- 5节点:容忍2个故障(5/2+1=3)
- 7节点:容忍3个故障(7/2+1=4)

不推荐偶数节点:
- 4节点和3节点容忍度相同(都是1个)
- 浪费资源

读写流程:

读请求(性能高):
Client → 任意节点(Leader/Follower)→ 直接返回
         (可能读到稍旧的数据)

写请求(强一致):
Client → Follower → 转发给Leader
      → Leader → 2PC广播 → 过半ACK → 提交 → 返回客户端

顺序:
Client A: write(x=1) → zxid=1
Client B: write(x=2) → zxid=2
Client C: write(x=3) → zxid=3

所有节点看到的顺序一致:1 → 2 → 3

会话机制:

// 客户端连接
conn, sessionEvent, err := zk.Connect(
    []string{"zk1:2181", "zk2:2181", "zk3:2181"},
    10*time.Second,  // sessionTimeout
)

// 监听会话事件
go func() {
    for event := range sessionEvent {
        switch event.State {
        case zk.StateConnecting:
            log.Println("Connecting to ZK...")
        case zk.StateConnected:
            log.Println("Connected to ZK")
        case zk.StateHasSession:
            log.Println("Session established")
        case zk.StateDisconnected:
            log.Println("Disconnected, will retry...")
        case zk.StateExpired:
            log.Println("Session expired, need reconnect")
            // 临时节点会被删除,需要重新注册
        }
    }
}()

Session过期机制:

1. 客户端定期发送心跳(ping)给Server
   ↓
2. Server更新Session的lastAccessTime
   ↓
3. 如果超过sessionTimeout未收到心跳
   ↓
4. Server标记Session为Expired
   ↓
5. 删除该Session创建的所有临时节点
   ↓
6. 触发Watch通知其他客户端

etcd详解

简介

etcd:CoreOS开源的分布式键值存储,使用Go语言实现

特点:

  • 强一致性(Raft算法)
  • 高可用(集群部署)
  • 简单API(HTTP/gRPC)
  • Watch支持(流式推送)
  • 租约机制(Lease)

与ZooKeeper的区别:

  • 更现代的设计(2013 vs 2008)
  • 更简洁的API
  • 更好的性能(gRPC vs 自定义协议)
  • Kubernetes的默认选择

Raft协议

Raft vs Paxos:

  • Raft更易理解(Strong Leader)
  • Paxos理论完备,实现复杂

核心角色:

Leader(领导者):
- 处理所有客户端请求
- 复制日志给Follower
- 定期发送心跳

Follower(跟随者):
- 被动接收日志
- 转发请求给Leader
- 参与投票

Candidate(候选者):
- Follower超时未收到心跳 → 转为Candidate
- 发起选举
- 获得多数票 → 成为Leader

选举流程:

初始状态:所有节点都是Follower
┌─────────────────────────────────┐
│  Follower (term=0)              │
└─────────────────────────────────┘
          ↓ (超时未收到心跳)
┌─────────────────────────────────┐
│  Candidate (term=1)             │
│  - 投票给自己                    │
│  - 向其他节点请求投票            │
└─────────────────────────────────┘
          ↓
    获得多数票?
     ↙       ↘
    Yes      No
     ↓        ↓
  Leader  重新选举

选举约束:
1. 一个term内,每个节点只能投一票
2. 先到先得(First-Come-First-Served)
3. Candidate的日志必须至少和Follower一样新

日志复制:

type LogEntry struct {
    Term    int64   // Leader的term
    Index   int64   // 日志索引
    Command string  // 实际命令
}

// Leader接收客户端写请求
func (l *Leader) HandleWrite(cmd string) error {
    // 1. 追加到本地日志
    entry := LogEntry{
        Term:    l.currentTerm,
        Index:   l.getLastLogIndex() + 1,
        Command: cmd,
    }
    l.log = append(l.log, entry)

    // 2. 并行复制给所有Follower
    successCount := 1  // Leader自己
    for _, follower := range l.followers {
        go func(f *Follower) {
            if f.AppendEntries(entry) {
                successCount++
            }
        }(follower)
    }

    // 3. 等待多数确认
    if successCount > len(l.followers)/2 {
        // 4. 提交日志
        l.commitIndex = entry.Index

        // 5. 应用到状态机
        l.applyToStateMachine(entry)

        return nil
    }

    return errors.New("replication failed")
}

Raft保证:

1. Election Safety(选举安全):
   一个term内最多只有一个Leader

2. Leader Append-Only(Leader只追加):
   Leader的日志只能追加,不能删除或覆盖

3. Log Matching(日志匹配):
   如果两个日志条目有相同的index和term,
   则它们存储相同的命令,
   且之前的所有日志条目都相同

4. Leader Completeness(Leader完整性):
   如果某个日志条目在某个term被提交,
   则该条目必然出现在更大term的Leader的日志中

5. State Machine Safety(状态机安全):
   如果某个节点已应用某个日志条目到状态机,
   则其他节点不会在相同index应用不同的命令

数据模型

扁平键值模型(与ZK的树形结构不同):

Key-Value Store:
/services/order/node1 → "192.168.1.10:8080"
/services/order/node2 → "192.168.1.11:8080"
/config/db/host       → "mysql.prod.com"
/config/db/port       → "3306"

特点:
- 简单直接
- 前缀查询
- 范围扫描

版本机制:

每个Key都有版本信息:
{
    "key": "/config/db/host",
    "value": "mysql.prod.com",
    "create_revision": 100,    // 创建时的revision
    "mod_revision": 150,       // 最后修改时的revision
    "version": 5,              // 该key的修改次数
    "lease": 12345             // 租约ID(可选)
}

全局Revision:
- 单调递增
- 类似ZK的zxid
- 用于MVCC(多版本并发控制)

代码示例:

package main

import (
    "context"
    "go.etcd.io/etcd/client/v3"
    "time"
)

type EtcdClient struct {
    client *clientv3.Client
}

func NewEtcdClient(endpoints []string) (*EtcdClient, error) {
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }
    return &EtcdClient{client: client}, nil
}

// 写入数据
func (c *EtcdClient) Put(key, value string) error {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    _, err := c.client.Put(ctx, key, value)
    return err
}

// 读取数据
func (c *EtcdClient) Get(key string) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    resp, err := c.client.Get(ctx, key)
    if err != nil {
        return "", err
    }

    if len(resp.Kvs) == 0 {
        return "", nil
    }

    return string(resp.Kvs[0].Value), nil
}

// 前缀查询
func (c *EtcdClient) GetPrefix(prefix string) (map[string]string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    resp, err := c.client.Get(ctx, prefix, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }

    result := make(map[string]string)
    for _, kv := range resp.Kvs {
        result[string(kv.Key)] = string(kv.Value)
    }

    return result, nil
}

// CAS(Compare-And-Swap)操作
func (c *EtcdClient) CAS(key, oldValue, newValue string) (bool, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    // 事务:如果value=oldValue,则更新为newValue
    txn := c.client.Txn(ctx)
    txn = txn.If(
        clientv3.Compare(clientv3.Value(key), "=", oldValue),
    ).Then(
        clientv3.OpPut(key, newValue),
    )

    resp, err := txn.Commit()
    if err != nil {
        return false, err
    }

    return resp.Succeeded, nil
}

Lease租约机制

定义:Lease是etcd的租约机制,用于实现临时节点(类似ZK的Ephemeral)

特点:

  • Key绑定Lease
  • Lease过期 → 绑定的Key自动删除
  • 支持KeepAlive续约

代码示例:

// 服务注册(使用Lease实现临时节点)
type ServiceRegistry struct {
    client   *clientv3.Client
    leaseID  clientv3.LeaseID
    stopCh   chan struct{}
}

func (sr *ServiceRegistry) Register(serviceName, addr string, ttl int64) error {
    ctx := context.Background()

    // 1. 创建租约
    leaseResp, err := sr.client.Grant(ctx, ttl)
    if err != nil {
        return err
    }
    sr.leaseID = leaseResp.ID

    // 2. 注册服务(绑定租约)
    key := "/services/" + serviceName + "/" + addr
    _, err = sr.client.Put(ctx, key, addr, clientv3.WithLease(sr.leaseID))
    if err != nil {
        return err
    }

    // 3. 自动续约
    sr.stopCh = make(chan struct{})
    go sr.keepAlive()

    return nil
}

func (sr *ServiceRegistry) keepAlive() {
    ctx := context.Background()

    // KeepAlive返回一个channel,定期发送续约响应
    keepAliveCh, err := sr.client.KeepAlive(ctx, sr.leaseID)
    if err != nil {
        log.Printf("KeepAlive error: %v", err)
        return
    }

    for {
        select {
        case resp := <-keepAliveCh:
            if resp == nil {
                log.Println("Lease expired")
                return
            }
            log.Printf("Lease %d keepalive, TTL=%d", resp.ID, resp.TTL)

        case <-sr.stopCh:
            // 主动撤销租约
            sr.client.Revoke(ctx, sr.leaseID)
            return
        }
    }
}

func (sr *ServiceRegistry) Unregister() {
    close(sr.stopCh)
}

Lease vs Session:

特性etcd LeaseZK Session
实现方式显式创建Lease隐式Session
续约方式KeepAlive主动续约心跳自动续约
灵活性高(可单独管理)低(绑定连接)
复杂度稍高简单

Watch机制

与ZK Watch的区别:

特性etcd WatchZK Watch
触发次数持续监听一次性
实现方式gRPC流式推送事件通知
历史回溯支持(通过revision)不支持
性能高中等

代码示例:

// 配置热更新(持续监听)
type ConfigWatcher struct {
    client *clientv3.Client
    cache  map[string]string
}

func (w *ConfigWatcher) WatchConfig(key string) {
    ctx := context.Background()

    // Watch返回一个channel,持续推送事件
    watchCh := w.client.Watch(ctx, key)

    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            switch event.Type {
            case mvccpb.PUT:
                log.Printf("Config updated: %s = %s",
                    string(event.Kv.Key), string(event.Kv.Value))
                w.cache[string(event.Kv.Key)] = string(event.Kv.Value)

            case mvccpb.DELETE:
                log.Printf("Config deleted: %s", string(event.Kv.Key))
                delete(w.cache, string(event.Kv.Key))
            }
        }
    }
}

// 服务发现(监听前缀)
type ServiceDiscovery struct {
    client   *clientv3.Client
    services map[string][]string
}

func (sd *ServiceDiscovery) WatchService(serviceName string) {
    ctx := context.Background()
    prefix := "/services/" + serviceName + "/"

    // 1. 首次获取所有实例
    resp, _ := sd.client.Get(ctx, prefix, clientv3.WithPrefix())
    instances := make([]string, 0)
    for _, kv := range resp.Kvs {
        instances = append(instances, string(kv.Value))
    }
    sd.services[serviceName] = instances

    // 2. 监听前缀(持续)
    watchCh := sd.client.Watch(ctx, prefix, clientv3.WithPrefix())

    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            switch event.Type {
            case mvccpb.PUT:
                // 新实例上线
                addr := string(event.Kv.Value)
                sd.services[serviceName] = append(sd.services[serviceName], addr)
                log.Printf("Service instance added: %s", addr)

            case mvccpb.DELETE:
                // 实例下线
                addr := string(event.Kv.Value)
                sd.removeInstance(serviceName, addr)
                log.Printf("Service instance removed: %s", addr)
            }
        }
    }
}

func (sd *ServiceDiscovery) removeInstance(serviceName, addr string) {
    instances := sd.services[serviceName]
    for i, instance := range instances {
        if instance == addr {
            sd.services[serviceName] = append(instances[:i], instances[i+1:]...)
            break
        }
    }
}

历史版本查询:

// 查询某个revision时的数据(时间旅行)
func (c *EtcdClient) GetAtRevision(key string, revision int64) (string, error) {
    ctx := context.Background()

    resp, err := c.client.Get(ctx, key, clientv3.WithRev(revision))
    if err != nil {
        return "", err
    }

    if len(resp.Kvs) == 0 {
        return "", nil
    }

    return string(resp.Kvs[0].Value), nil
}

// 监听从某个revision开始的变更
func (c *EtcdClient) WatchFromRevision(key string, revision int64) {
    ctx := context.Background()

    // 从指定revision开始监听
    watchCh := c.client.Watch(ctx, key, clientv3.WithRev(revision))

    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            log.Printf("Event at revision %d: %v",
                event.Kv.ModRevision, event.Type)
        }
    }
}

ZooKeeper vs etcd

全面对比

对比维度ZooKeeperetcd
开发语言JavaGo
发布时间2008年2013年
共识算法ZAB(Paxos变种)Raft
数据模型树形结构(ZNode)扁平KV
协议自定义协议HTTP/gRPC
Watch机制一次性,需重新注册持续监听
临时节点Session绑定Lease机制
客户端库多语言支持Go优先,多语言支持
性能中等(万级QPS)高(万级-十万级QPS)
运维复杂度较高(JVM调优)较低
生态Hadoop、Kafka、HBaseKubernetes、Service Mesh

适用场景

ZooKeeper适合:

  1. Java技术栈
场景:
- Kafka(Controller选举、Broker管理)
- HBase(RegionServer管理)
- Dubbo(服务注册发现)
- 大数据生态(Hadoop、Storm)

原因:
- 同为Java,集成方便
- 成熟稳定(10+ years)
- 大量现有案例
  1. 需要树形结构
场景:
- 权限管理(ACL树)
- 配置管理(层级配置)
- 命名服务(路径即名称)

优势:
- 树形结构更直观
- 父子关系明确
- 批量操作方便

etcd适合:

  1. Kubernetes生态
场景:
- Kubernetes(存储集群状态)
- Service Mesh(Istio配置)
- Cloud Native应用

原因:
- Kubernetes默认选择
- gRPC性能高
- Watch机制更强大
  1. 高性能要求
场景:
- 高频配置变更
- 大量Watch监听
- 微服务注册发现

优势:
- gRPC协议效率高
- 持续Watch,无需重新注册
- 更好的并发性能
  1. 简单运维
优势:
- 单一二进制文件
- 无JVM内存调优
- 配置简单
- 容器化友好

性能对比

写性能测试(3节点集群):

操作类型ZooKeeperetcd
单次写入延迟~5ms~3ms
写入QPS10,00030,000+
批量写入不支持支持(Txn)

读性能测试:

操作类型ZooKeeperetcd
单次读取延迟~1ms~0.5ms
读取QPS50,000+100,000+
范围查询不支持支持

Watch性能:

对比维度ZooKeeperetcd
Watch数量10,000+100,000+
事件延迟~10ms~5ms
历史回溯不支持支持

应用场景

1. 配置中心

场景:统一管理配置,动态更新

// 配置中心客户端(etcd实现)
type ConfigCenter struct {
    client *clientv3.Client
    cache  sync.Map  // 本地缓存
}

func NewConfigCenter(endpoints []string) (*ConfigCenter, error) {
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }

    cc := &ConfigCenter{client: client}

    // 启动时加载所有配置
    cc.loadAllConfigs()

    // 监听配置变更
    go cc.watchConfigs()

    return cc, nil
}

func (cc *ConfigCenter) loadAllConfigs() error {
    ctx := context.Background()
    resp, err := cc.client.Get(ctx, "/config/", clientv3.WithPrefix())
    if err != nil {
        return err
    }

    for _, kv := range resp.Kvs {
        cc.cache.Store(string(kv.Key), string(kv.Value))
    }

    return nil
}

func (cc *ConfigCenter) watchConfigs() {
    ctx := context.Background()
    watchCh := cc.client.Watch(ctx, "/config/", clientv3.WithPrefix())

    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            key := string(event.Kv.Key)

            switch event.Type {
            case mvccpb.PUT:
                value := string(event.Kv.Value)
                cc.cache.Store(key, value)
                log.Printf("Config updated: %s = %s", key, value)

                // 触发回调(通知应用层)
                cc.notifyConfigChange(key, value)

            case mvccpb.DELETE:
                cc.cache.Delete(key)
                log.Printf("Config deleted: %s", key)
            }
        }
    }
}

// 获取配置(从本地缓存)
func (cc *ConfigCenter) GetString(key string) string {
    if value, ok := cc.cache.Load("/config/" + key); ok {
        return value.(string)
    }
    return ""
}

func (cc *ConfigCenter) GetInt(key string) int {
    value := cc.GetString(key)
    if value == "" {
        return 0
    }
    intVal, _ := strconv.Atoi(value)
    return intVal
}

// 使用示例
func main() {
    cc, _ := NewConfigCenter([]string{"localhost:2379"})

    // 读取配置
    dbHost := cc.GetString("db.host")
    maxQPS := cc.GetInt("rate_limit.max_qps")

    log.Printf("DB Host: %s, Max QPS: %d", dbHost, maxQPS)

    // 配置变更会自动更新到本地缓存
    select {}
}

2. 服务注册与发现

场景:微服务动态上下线

// 服务注册
type ServiceRegistry struct {
    client  *clientv3.Client
    leaseID clientv3.LeaseID
    stopCh  chan struct{}
}

func (sr *ServiceRegistry) Register(serviceName, addr string, ttl int64) error {
    ctx := context.Background()

    // 创建租约
    leaseResp, err := sr.client.Grant(ctx, ttl)
    if err != nil {
        return err
    }
    sr.leaseID = leaseResp.ID

    // 注册服务
    key := fmt.Sprintf("/services/%s/%s", serviceName, addr)
    _, err = sr.client.Put(ctx, key, addr, clientv3.WithLease(sr.leaseID))
    if err != nil {
        return err
    }

    // 自动续约
    sr.stopCh = make(chan struct{})
    go sr.keepAlive()

    log.Printf("Service registered: %s -> %s", serviceName, addr)
    return nil
}

func (sr *ServiceRegistry) keepAlive() {
    ctx := context.Background()
    keepAliveCh, _ := sr.client.KeepAlive(ctx, sr.leaseID)

    for {
        select {
        case <-keepAliveCh:
            // 续约成功
        case <-sr.stopCh:
            sr.client.Revoke(ctx, sr.leaseID)
            return
        }
    }
}

func (sr *ServiceRegistry) Unregister() {
    close(sr.stopCh)
}

// 服务发现
type ServiceDiscovery struct {
    client    *clientv3.Client
    services  sync.Map  // serviceName -> []string
    callbacks sync.Map  // serviceName -> []func([]string)
}

func NewServiceDiscovery(endpoints []string) (*ServiceDiscovery, error) {
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }

    return &ServiceDiscovery{client: client}, nil
}

func (sd *ServiceDiscovery) Discover(serviceName string) []string {
    prefix := "/services/" + serviceName + "/"

    // 首次获取
    ctx := context.Background()
    resp, _ := sd.client.Get(ctx, prefix, clientv3.WithPrefix())

    instances := make([]string, 0)
    for _, kv := range resp.Kvs {
        instances = append(instances, string(kv.Value))
    }

    sd.services.Store(serviceName, instances)

    // 监听变更
    go sd.watchService(serviceName)

    return instances
}

func (sd *ServiceDiscovery) watchService(serviceName string) {
    prefix := "/services/" + serviceName + "/"
    ctx := context.Background()

    watchCh := sd.client.Watch(ctx, prefix, clientv3.WithPrefix())

    for watchResp := range watchCh {
        instances := sd.getCurrentInstances(serviceName)

        for _, event := range watchResp.Events {
            addr := string(event.Kv.Value)

            switch event.Type {
            case mvccpb.PUT:
                instances = append(instances, addr)
                log.Printf("Service instance added: %s", addr)

            case mvccpb.DELETE:
                instances = sd.removeInstance(instances, addr)
                log.Printf("Service instance removed: %s", addr)
            }
        }

        sd.services.Store(serviceName, instances)
        sd.notifyCallbacks(serviceName, instances)
    }
}

func (sd *ServiceDiscovery) OnChange(serviceName string, callback func([]string)) {
    callbacks, _ := sd.callbacks.LoadOrStore(serviceName, []func([]string){})
    callbacks = append(callbacks.([]func([]string)), callback)
    sd.callbacks.Store(serviceName, callbacks)
}

// 使用示例
func main() {
    // 服务注册
    registry := &ServiceRegistry{client: etcdClient}
    registry.Register("order-service", "192.168.1.10:8080", 10)

    // 服务发现
    discovery, _ := NewServiceDiscovery([]string{"localhost:2379"})
    instances := discovery.Discover("order-service")

    fmt.Printf("Order service instances: %v\n", instances)

    // 监听服务变更
    discovery.OnChange("order-service", func(instances []string) {
        fmt.Printf("Instances updated: %v\n", instances)
    })
}

3. Leader选举

场景:分布式系统中选出主节点

// Leader选举(etcd实现)
type LeaderElection struct {
    client    *clientv3.Client
    session   *concurrency.Session
    election  *concurrency.Election
    isLeader  bool
    callbacks struct {
        onElected func()
        onLost    func()
    }
}

func NewLeaderElection(endpoints []string, electionKey string) (*LeaderElection, error) {
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }

    // 创建Session
    session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
    if err != nil {
        return nil, err
    }

    // 创建Election对象
    election := concurrency.NewElection(session, electionKey)

    return &LeaderElection{
        client:   client,
        session:  session,
        election: election,
    }, nil
}

// 竞选Leader
func (le *LeaderElection) Campaign(ctx context.Context, value string) error {
    // Campaign会阻塞直到成为Leader
    if err := le.election.Campaign(ctx, value); err != nil {
        return err
    }

    le.isLeader = true
    log.Printf("Elected as leader: %s", value)

    // 触发回调
    if le.callbacks.onElected != nil {
        le.callbacks.onElected()
    }

    // 监听Leader变更
    go le.watchLeader(ctx)

    return nil
}

func (le *LeaderElection) watchLeader(ctx context.Context) {
    observeCh := le.election.Observe(ctx)

    for resp := range observeCh {
        if string(resp.Kvs[0].Value) != le.election.Key() {
            // Leader变更
            le.isLeader = false
            log.Println("Lost leadership")

            if le.callbacks.onLost != nil {
                le.callbacks.onLost()
            }
        }
    }
}

// 主动放弃Leader
func (le *LeaderElection) Resign(ctx context.Context) error {
    le.isLeader = false
    return le.election.Resign(ctx)
}

func (le *LeaderElection) IsLeader() bool {
    return le.isLeader
}

func (le *LeaderElection) OnElected(callback func()) {
    le.callbacks.onElected = callback
}

func (le *LeaderElection) OnLost(callback func()) {
    le.callbacks.onLost = callback
}

// 使用示例
func main() {
    nodeID := "node-1"

    election, _ := NewLeaderElection(
        []string{"localhost:2379"},
        "/election/master",
    )

    // 设置回调
    election.OnElected(func() {
        fmt.Println("I am the leader now!")
        // 执行Leader逻辑
    })

    election.OnLost(func() {
        fmt.Println("I lost leadership")
        // 停止Leader逻辑
    })

    // 参与竞选
    ctx := context.Background()
    election.Campaign(ctx, nodeID)
}

实战案例

案例1:分布式配置中心(完整实现)

需求:

  • 集中管理配置
  • 配置热更新
  • 多环境支持(dev、test、prod)
  • 配置版本管理

架构设计:

┌─────────────────────────────────────────┐
│           Management Console            │
│       (配置管理界面)                     │
└─────────────────────────────────────────┘
                  ↓ (HTTP API)
┌─────────────────────────────────────────┐
│         Config Center Server            │
│  - RESTful API                          │
│  - 权限校验                              │
│  - 审计日志                              │
└─────────────────────────────────────────┘
                  ↓
┌─────────────────────────────────────────┐
│              etcd Cluster               │
│  /config/{env}/{app}/{key}              │
│  /config/prod/order/db.host             │
└─────────────────────────────────────────┘
                  ↑ (Watch)
┌──────────┬──────────┬──────────┬─────────┐
│ Service1 │ Service2 │ Service3 │ ...     │
│ (Client) │ (Client) │ (Client) │         │
└──────────┴──────────┴──────────┴─────────┘

代码实现:

// 配置中心服务端
type ConfigServer struct {
    etcdClient *clientv3.Client
}

// 创建/更新配置
func (s *ConfigServer) SetConfig(env, app, key, value string) error {
    ctx := context.Background()

    configKey := fmt.Sprintf("/config/%s/%s/%s", env, app, key)

    // 记录审计日志
    s.auditLog("SET", configKey, value)

    _, err := s.etcdClient.Put(ctx, configKey, value)
    return err
}

// 获取配置
func (s *ConfigServer) GetConfig(env, app, key string) (string, error) {
    ctx := context.Background()

    configKey := fmt.Sprintf("/config/%s/%s/%s", env, app, key)

    resp, err := s.etcdClient.Get(ctx, configKey)
    if err != nil {
        return "", err
    }

    if len(resp.Kvs) == 0 {
        return "", errors.New("config not found")
    }

    return string(resp.Kvs[0].Value), nil
}

// 配置中心客户端
type ConfigClient struct {
    etcdClient *clientv3.Client
    env        string
    app        string
    cache      sync.Map
    listeners  []ConfigListener
}

type ConfigListener interface {
    OnConfigChange(key, value string)
}

func NewConfigClient(endpoints []string, env, app string) (*ConfigClient, error) {
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }

    cc := &ConfigClient{
        etcdClient: client,
        env:        env,
        app:        app,
    }

    // 加载所有配置
    cc.loadConfigs()

    // 监听配置变更
    go cc.watchConfigs()

    return cc, nil
}

func (cc *ConfigClient) loadConfigs() error {
    ctx := context.Background()
    prefix := fmt.Sprintf("/config/%s/%s/", cc.env, cc.app)

    resp, err := cc.etcdClient.Get(ctx, prefix, clientv3.WithPrefix())
    if err != nil {
        return err
    }

    for _, kv := range resp.Kvs {
        key := strings.TrimPrefix(string(kv.Key), prefix)
        cc.cache.Store(key, string(kv.Value))
    }

    log.Printf("Loaded %d configs", len(resp.Kvs))
    return nil
}

func (cc *ConfigClient) watchConfigs() {
    ctx := context.Background()
    prefix := fmt.Sprintf("/config/%s/%s/", cc.env, cc.app)

    watchCh := cc.etcdClient.Watch(ctx, prefix, clientv3.WithPrefix())

    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            key := strings.TrimPrefix(string(event.Kv.Key), prefix)
            value := string(event.Kv.Value)

            switch event.Type {
            case mvccpb.PUT:
                cc.cache.Store(key, value)
                log.Printf("Config updated: %s = %s", key, value)

                // 通知监听器
                for _, listener := range cc.listeners {
                    listener.OnConfigChange(key, value)
                }
            }
        }
    }
}

func (cc *ConfigClient) Get(key string) string {
    if value, ok := cc.cache.Load(key); ok {
        return value.(string)
    }
    return ""
}

func (cc *ConfigClient) GetInt(key string, defaultValue int) int {
    value := cc.Get(key)
    if value == "" {
        return defaultValue
    }
    intVal, _ := strconv.Atoi(value)
    return intVal
}

func (cc *ConfigClient) AddListener(listener ConfigListener) {
    cc.listeners = append(cc.listeners, listener)
}

// 使用示例
type MyApp struct {
    configClient *ConfigClient
    maxQPS       int
}

func (app *MyApp) OnConfigChange(key, value string) {
    if key == "max_qps" {
        newQPS, _ := strconv.Atoi(value)
        app.maxQPS = newQPS
        log.Printf("QPS limit updated to %d", newQPS)
    }
}

func main() {
    // 创建配置客户端
    configClient, _ := NewConfigClient(
        []string{"localhost:2379"},
        "prod",
        "order-service",
    )

    app := &MyApp{
        configClient: configClient,
        maxQPS:       configClient.GetInt("max_qps", 1000),
    }

    // 注册监听器
    configClient.AddListener(app)

    // 读取配置
    dbHost := configClient.Get("db.host")
    dbPort := configClient.GetInt("db.port", 3306)

    log.Printf("DB: %s:%d, Max QPS: %d", dbHost, dbPort, app.maxQPS)

    select {}
}

面试问答

ZooKeeper的ZAB协议和Raft有什么区别?

答案:

相似之处:

  • 都是基于Leader的共识算法
  • 都需要多数派确认
  • 都保证强一致性

核心区别:

对比维度ZABRaft
设计目标为ZooKeeper定制通用共识算法
Leader选举基于zxid+myid基于term+日志完整性
日志复制可以乱序,最终排序严格有序
状态Looking/Following/LeadingFollower/Candidate/Leader
理解难度较难相对容易

代码对比:

// ZAB选举规则
func (node *ZKNode) CompareVote(vote1, vote2 Vote) Vote {
    // 1. zxid更大的优先(数据更新)
    if vote1.Zxid > vote2.Zxid {
        return vote1
    }
    if vote2.Zxid > vote1.Zxid {
        return vote2
    }

    // 2. zxid相同,myid更大的优先
    if vote1.MyId > vote2.MyId {
        return vote1
    }
    return vote2
}

// Raft选举规则
func (node *RaftNode) RequestVote(req VoteRequest) bool {
    // 1. term更大的优先
    if req.Term < node.currentTerm {
        return false
    }

    // 2. 已经投过票,拒绝
    if node.votedFor != "" && node.votedFor != req.CandidateId {
        return false
    }

    // 3. 候选者的日志必须至少和自己一样新
    if !node.isLogUpToDate(req.LastLogIndex, req.LastLogTerm) {
        return false
    }

    return true
}

etcd的Watch机制是如何实现的?

答案:

实现原理:

1. 客户端建立gRPC流式连接:
   Client → etcd Server (Watch RPC)

2. etcd维护Watch表:
   WatchableStore:
   - key -> []WatchStream
   - 记录哪些客户端监听哪些key

3. 数据变更时:
   Write → Apply to State Machine
        → Notify WatchableStore
        → Push Event to WatchStream
        → Client收到事件

4. MVCC支持历史版本:
   每次变更生成新revision
   Watch可以从任意revision开始

代码示例:

// etcd Watch服务端实现(简化版)
type WatchableStore struct {
    mu       sync.RWMutex
    watchers map[string][]*WatchStream  // key -> streams
    revisions map[int64]*Event          // revision -> event
}

func (ws *WatchableStore) Watch(key string, startRevision int64) *WatchStream {
    ws.mu.Lock()
    defer ws.mu.Unlock()

    stream := &WatchStream{
        key:    key,
        eventC: make(chan *Event, 100),
    }

    ws.watchers[key] = append(ws.watchers[key], stream)

    // 发送历史事件(从startRevision开始)
    go func() {
        for rev := startRevision; ; rev++ {
            if event, ok := ws.revisions[rev]; ok {
                if event.Key == key {
                    stream.eventC <- event
                }
            } else {
                break
            }
        }
    }()

    return stream
}

func (ws *WatchableStore) Notify(key string, event *Event) {
    ws.mu.RLock()
    defer ws.mu.RUnlock()

    // 保存事件(MVCC)
    ws.revisions[event.Revision] = event

    // 通知所有Watch该key的stream
    for _, stream := range ws.watchers[key] {
        select {
        case stream.eventC <- event:
        default:
            // channel满了,跳过(客户端消费慢)
        }
    }
}

与ZK Watch的对比:

// ZK Watch(一次性)
for {
    data, _, eventCh, _ := zkConn.GetW("/config/key")
    handleData(data)

    event := <-eventCh
    // 需要重新设置Watch(循环继续)
}

// etcd Watch(持续)
watchCh := etcdClient.Watch(ctx, "/config/key")
for watchResp := range watchCh {
    for _, event := range watchResp.Events {
        handleEvent(event)
    }
    // 无需重新设置,持续接收
}

如何保证分布式协调服务的高可用?

答案:

1. 集群部署(多数派)

推荐部署:
- 3节点:容忍1个故障
- 5节点:容忍2个故障
- 7节点:容忍3个故障

示例:
┌─────────────────────────────────┐
│  ZK Cluster (5 nodes)           │
│  Node1 (Leader)                 │
│  Node2, Node3, Node4, Node5     │
└─────────────────────────────────┘

故障场景:
- Node1宕机 → 自动选举新Leader
- Node1+Node2宕机 → 仍有多数派(3/5)
- Node1+Node2+Node3宕机 → 不可用(2/5 < 3)

2. 客户端重连机制

type ResilientClient struct {
    endpoints []string
    client    *clientv3.Client
    mu        sync.RWMutex
}

func (rc *ResilientClient) ensureConnected() error {
    rc.mu.Lock()
    defer rc.mu.Unlock()

    // 检查连接状态
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    _, err := rc.client.Get(ctx, "health_check")
    if err == nil {
        return nil  // 连接正常
    }

    // 重新连接
    log.Println("Reconnecting to etcd...")
    newClient, err := clientv3.New(clientv3.Config{
        Endpoints:   rc.endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return err
    }

    rc.client.Close()
    rc.client = newClient
    log.Println("Reconnected successfully")

    return nil
}

func (rc *ResilientClient) Get(key string) (string, error) {
    // 自动重连
    if err := rc.ensureConnected(); err != nil {
        return "", err
    }

    ctx := context.Background()
    resp, err := rc.client.Get(ctx, key)
    if err != nil {
        return "", err
    }

    if len(resp.Kvs) == 0 {
        return "", nil
    }

    return string(resp.Kvs[0].Value), nil
}

3. 数据备份与恢复

# etcd快照备份
ETCDCTL_API=3 etcdctl snapshot save snapshot.db \
  --endpoints=https://127.0.0.1:2379 \
  --cacert=/etc/etcd/ca.crt \
  --cert=/etc/etcd/server.crt \
  --key=/etc/etcd/server.key

# 恢复
ETCDCTL_API=3 etcdctl snapshot restore snapshot.db \
  --name=node1 \
  --initial-cluster=node1=http://localhost:2380 \
  --initial-advertise-peer-urls=http://localhost:2380

4. 监控告警

# Prometheus告警规则
groups:
  - name: etcd
    rules:
      - alert: EtcdClusterUnavailable
        expr: up{job="etcd"} == 0
        for: 1m
        annotations:
          summary: "etcd cluster unavailable"

      - alert: EtcdHighLatency
        expr: histogram_quantile(0.99, rate(etcd_disk_wal_fsync_duration_seconds_bucket[5m])) > 0.1
        for: 5m
        annotations:
          summary: "etcd high latency"

      - alert: EtcdInsufficientMembers
        expr: count(up{job="etcd"} == 1) < 3
        for: 1m
        annotations:
          summary: "etcd insufficient members"

ZooKeeper的Session过期和etcd的Lease过期有什么区别?

答案:

对比维度ZooKeeper Sessionetcd Lease
绑定方式隐式绑定连接显式创建Lease
续约方式自动心跳主动KeepAlive
过期处理删除所有临时节点删除绑定的Key
灵活性低(一个连接一个Session)高(多个Key可共享Lease)
生命周期与连接绑定独立管理

ZooKeeper Session:

// Session自动管理
conn, sessionEvent, _ := zk.Connect(
    []string{"localhost:2181"},
    10*time.Second,  // sessionTimeout
)

// 自动心跳,无需手动处理
// 连接断开 → Session过期 → 临时节点删除

// 监听Session事件
go func() {
    for event := range sessionEvent {
        switch event.State {
        case zk.StateConnected:
            log.Println("Connected")
        case zk.StateExpired:
            log.Println("Session expired")
            // 需要重新连接,重新注册临时节点
        }
    }
}()

etcd Lease:

// 显式管理Lease
type ServiceRegistry struct {
    client  *clientv3.Client
    leases  map[string]clientv3.LeaseID  // 多个Key可用不同Lease
}

func (sr *ServiceRegistry) Register(key, value string, ttl int64) error {
    ctx := context.Background()

    // 创建Lease
    leaseResp, _ := sr.client.Grant(ctx, ttl)
    leaseID := leaseResp.ID

    // 绑定Key到Lease
    sr.client.Put(ctx, key, value, clientv3.WithLease(leaseID))

    // 主动续约
    keepAliveCh, _ := sr.client.KeepAlive(ctx, leaseID)
    go func() {
        for range keepAliveCh {
            // 续约成功
        }
    }()

    sr.leases[key] = leaseID
    return nil
}

// 灵活控制:可以单独撤销某个Lease
func (sr *ServiceRegistry) Unregister(key string) {
    if leaseID, ok := sr.leases[key]; ok {
        sr.client.Revoke(context.Background(), leaseID)
        delete(sr.leases, key)
    }
}

选择建议:

  • ZooKeeper:适合简单场景,自动管理更方便
  • etcd:适合复杂场景,需要精细控制生命周期

如何优化分布式协调服务的性能?

答案:

1. 读写分离

// etcd支持从任意节点读取(可能读到旧数据)
func (c *Client) Get(key string, allowStale bool) (string, error) {
    ctx := context.Background()

    opts := []clientv3.OpOption{}
    if allowStale {
        // 从本地节点读取(可能不是Leader)
        opts = append(opts, clientv3.WithSerializable())
    }

    resp, err := c.client.Get(ctx, key, opts...)
    // ...
}

// 使用建议:
// - 强一致性要求:不使用Serializable
// - 可以容忍短暂不一致:使用Serializable(性能提升10倍)

2. 批量操作

// etcd事务(原子批量操作)
func (c *Client) BatchPut(kvs map[string]string) error {
    ctx := context.Background()

    ops := make([]clientv3.Op, 0, len(kvs))
    for k, v := range kvs {
        ops = append(ops, clientv3.OpPut(k, v))
    }

    // 一次性提交多个写入
    _, err := c.client.Txn(ctx).Then(ops...).Commit()
    return err
}

// 性能对比:
// 逐个Put:1000次 → 5秒
// 批量Put:1000次 → 0.5秒(10倍提升)

3. 本地缓存

// 配置中心客户端缓存
type CachedConfigClient struct {
    etcdClient *clientv3.Client
    cache      *lru.Cache  // LRU缓存
}

func (cc *CachedConfigClient) Get(key string) (string, error) {
    // 1. 先查缓存
    if value, ok := cc.cache.Get(key); ok {
        return value.(string), nil
    }

    // 2. 缓存未命中,查etcd
    ctx := context.Background()
    resp, err := cc.etcdClient.Get(ctx, key)
    if err != nil {
        return "", err
    }

    value := string(resp.Kvs[0].Value)

    // 3. 更新缓存
    cc.cache.Add(key, value)

    return value, nil
}

// Watch机制更新缓存
func (cc *CachedConfigClient) watchAndUpdate() {
    ctx := context.Background()
    watchCh := cc.etcdClient.Watch(ctx, "/config/", clientv3.WithPrefix())

    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            key := string(event.Kv.Key)

            switch event.Type {
            case mvccpb.PUT:
                cc.cache.Add(key, string(event.Kv.Value))
            case mvccpb.DELETE:
                cc.cache.Remove(key)
            }
        }
    }
}

// 缓存命中率:95% → 响应时间从5ms降到0.1ms

4. 连接池

// etcd客户端连接池
type ClientPool struct {
    clients []*clientv3.Client
    index   int
    mu      sync.Mutex
}

func NewClientPool(endpoints []string, poolSize int) (*ClientPool, error) {
    clients := make([]*clientv3.Client, poolSize)

    for i := 0; i < poolSize; i++ {
        client, err := clientv3.New(clientv3.Config{
            Endpoints:   endpoints,
            DialTimeout: 5 * time.Second,
        })
        if err != nil {
            return nil, err
        }
        clients[i] = client
    }

    return &ClientPool{clients: clients}, nil
}

func (p *ClientPool) GetClient() *clientv3.Client {
    p.mu.Lock()
    defer p.mu.Unlock()

    client := p.clients[p.index]
    p.index = (p.index + 1) % len(p.clients)

    return client
}

// 性能提升:单连接10K QPS → 连接池(5个)40K QPS

5. 压缩与清理

# etcd定期压缩历史版本(避免空间膨胀)
ETCDCTL_API=3 etcdctl compact 1000000

# 碎片整理
ETCDCTL_API=3 etcdctl defrag

Prev
第2章:分布式锁
Next
第4章:服务发现与注册