第3章:分布式协调
什么是分布式协调
定义
分布式协调服务:在分布式系统中,提供配置管理、命名服务、分布式同步、组服务等功能的基础设施。
为什么需要分布式协调
单机场景:
Application
↓
Local Config File (config.yml)
↓
简单直接,重启应用生效
分布式场景问题:
Service A (Node1) → config.yml (version 1.0)
Service A (Node2) → config.yml (version 1.0)
Service A (Node3) → config.yml (version 1.1) 配置不一致!
问题:
1. 配置不一致 → 行为不一致
2. 配置变更需要重启 → 影响可用性
3. 无法动态感知 → 无法实时更新
4. 难以管理 → 运维复杂
分布式协调方案:
┌─────────────────────────────────┐
│ ZooKeeper / etcd Cluster │
│ (统一配置存储 + Watch通知) │
└─────────────────────────────────┘
↓ ↓ ↓
Node1 Node2 Node3
(Watch) (Watch) (Watch)
配置变更流程:
1. 管理员更新配置到ZK/etcd
2. ZK/etcd通知所有Watch的节点
3. 节点收到通知,重新加载配置
4. 无需重启,实时生效
核心功能
1. 配置管理(Configuration Management)
场景:数据库连接串、限流阈值、开关配置
// 传统方式(不推荐)
const (
DBHost = "mysql.prod.com" // 硬编码,修改需重新发布
MaxQPS = 1000
)
// 分布式协调方式(推荐)
type ConfigCenter struct {
zkConn *zk.Conn
}
func (c *ConfigCenter) GetConfig(key string) (string, error) {
data, _, err := c.zkConn.Get("/config/" + key)
return string(data), err
}
func (c *ConfigCenter) WatchConfig(key string, callback func(string)) {
for {
data, _, eventCh, err := c.zkConn.GetW("/config/" + key)
if err != nil {
continue
}
callback(string(data)) // 首次读取
// 等待变更通知
event := <-eventCh
if event.Type == zk.EventNodeDataChanged {
// 配置变更,递归监听
}
}
}
2. 服务注册与发现(Service Registry)
场景:微服务动态上下线
服务注册:
OrderService (192.168.1.10:8080) → ZK: /services/order/node1
OrderService (192.168.1.11:8080) → ZK: /services/order/node2
服务发现:
UserService → ZK查询 /services/order/*
→ 获取所有可用实例列表
→ 负载均衡调用
3. 分布式锁(Distributed Lock)
场景:保证多个节点互斥访问共享资源
ZooKeeper临时顺序节点:
/locks/my-lock/0000000001 ← Node1 (最小序号,获得锁)
/locks/my-lock/0000000002 ← Node2 (等待)
/locks/my-lock/0000000003 ← Node3 (等待)
Node1释放锁 → Node2自动获得锁
4. Leader选举(Leader Election)
场景:分布式系统中选出主节点
Kafka Controller选举:
Broker1 → 创建 /controller (临时节点) → 成为Leader
Broker2 → 创建失败 → 成为Follower
Broker3 → 创建失败 → 成为Follower
Broker1宕机 → 临时节点删除 → Broker2/3竞争创建 → 新Leader产生
ZooKeeper详解
简介
ZooKeeper:Apache开源的分布式协调服务,源于Google Chubby论文
特点:
- 强一致性(CP系统)
- 高可用(集群部署)
- 顺序保证(全局有序)
- 原子性(操作要么成功要么失败)
ZAB协议(ZooKeeper Atomic Broadcast)
ZAB vs Paxos:
- ZAB是Paxos的工程实现
- 针对ZooKeeper场景优化
核心流程:
1. Leader选举:
┌────────────────────────────────┐
│ 所有节点启动,都是LOOKING状态 │
└────────────────────────────────┘
↓
┌────────────────────────────────┐
│ 投票给自己,广播投票信息 │
│ (myid, zxid) │
└────────────────────────────────┘
↓
┌────────────────────────────────┐
│ 接收其他节点投票,比较: │
│ 1. zxid更大的优先(数据更新) │
│ 2. zxid相同,myid更大的优先 │
└────────────────────────────────┘
↓
┌────────────────────────────────┐
│ 获得超过半数投票 → Leader │
│ 其他节点 → Follower │
└────────────────────────────────┘
2. 数据同步:
Leader → Follower同步最新数据
3. 消息广播:
Client写请求 → Leader → 2PC广播 → 过半确认 → 提交
2PC广播流程:
// Leader接收客户端写请求
func (l *Leader) HandleWrite(request WriteRequest) {
// 1. 生成全局递增的zxid
zxid := l.GenerateZxid()
// 2. 发送Proposal给所有Follower
proposal := Proposal{
Zxid: zxid,
Data: request.Data,
}
ackCount := 1 // Leader自己先ACK
for _, follower := range l.followers {
go func(f *Follower) {
if f.SendProposal(proposal) {
ackCount++
}
}(follower)
}
// 3. 等待过半ACK
if ackCount > len(l.followers)/2 {
// 4. 发送Commit给所有Follower
for _, follower := range l.followers {
follower.SendCommit(zxid)
}
// 5. 返回客户端成功
return Success
}
return Failed
}
ZXID(ZooKeeper Transaction ID):
ZXID = 64位整数
↓
高32位: epoch(Leader纪元)
低32位: counter(递增计数器)
示例:
Zxid = 0x0000000100000001
↑ ↑
epoch=1 counter=1
作用:
1. 全局有序(单调递增)
2. 识别数据新旧
3. Leader选举依据
数据模型
树形结构:
/ (根节点)
├── /services (服务注册目录)
│ ├── /order
│ │ ├── node1 (临时节点)
│ │ └── node2 (临时节点)
│ └── /user
│ └── node1
├── /config (配置管理)
│ ├── /db
│ │ └── host (持久节点)
│ └── /cache
│ └── ttl
└── /locks (分布式锁)
└── /my-lock
├── 0000000001 (临时顺序节点)
└── 0000000002
节点类型:
| 类型 | 说明 | 应用场景 |
|---|---|---|
| 持久节点 | 创建后一直存在,除非主动删除 | 配置存储 |
| 临时节点 | 客户端断开连接后自动删除 | 服务注册、分布式锁 |
| 持久顺序节点 | 自动追加序号,持久 | - |
| 临时顺序节点 | 自动追加序号,临时 | 分布式锁、选举 |
代码示例:
package main
import (
"github.com/samuel/go-zookeeper/zk"
"time"
)
type ZKClient struct {
conn *zk.Conn
}
func NewZKClient(servers []string) (*ZKClient, error) {
conn, _, err := zk.Connect(servers, 10*time.Second)
if err != nil {
return nil, err
}
return &ZKClient{conn: conn}, nil
}
// 创建持久节点(配置存储)
func (c *ZKClient) CreatePersistent(path, data string) error {
_, err := c.conn.Create(
path,
[]byte(data),
0, // flags=0 表示持久节点
zk.WorldACL(zk.PermAll),
)
return err
}
// 创建临时节点(服务注册)
func (c *ZKClient) CreateEphemeral(path, data string) error {
_, err := c.conn.Create(
path,
[]byte(data),
zk.FlagEphemeral, // 临时节点
zk.WorldACL(zk.PermAll),
)
return err
}
// 创建临时顺序节点(分布式锁)
func (c *ZKClient) CreateEphemeralSequential(path, data string) (string, error) {
return c.conn.Create(
path,
[]byte(data),
zk.FlagEphemeral|zk.FlagSequence, // 临时+顺序
zk.WorldACL(zk.PermAll),
)
}
Watch机制
定义:客户端可以监听ZNode的变化,当节点数据/子节点发生变化时,ZooKeeper会主动通知客户端
特点:
- 一次性触发:Watch触发后需要重新设置
- 异步通知:回调方式通知客户端
- 顺序保证:事件通知顺序与操作顺序一致
Watch类型:
1. GetW: 监听节点数据变化
Event: EventNodeDataChanged
2. ExistsW: 监听节点创建/删除/数据变化
Event: EventNodeCreated, EventNodeDeleted, EventNodeDataChanged
3. ChildrenW: 监听子节点列表变化
Event: EventNodeChildrenChanged
代码示例:
// 配置热更新
type ConfigWatcher struct {
zkClient *ZKClient
cache map[string]string
}
func (w *ConfigWatcher) WatchConfig(key string) {
path := "/config/" + key
for {
// GetW: 获取数据并设置Watch
data, stat, eventCh, err := w.zkClient.conn.GetW(path)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
// 更新缓存
w.cache[key] = string(data)
log.Printf("Config updated: %s = %s (version=%d)",
key, string(data), stat.Version)
// 等待变更事件
event := <-eventCh
log.Printf("Received event: %v", event.Type)
// Watch是一次性的,需要重新设置(循环继续)
}
}
// 服务发现
type ServiceDiscovery struct {
zkClient *ZKClient
services map[string][]string // serviceName -> [instances]
}
func (sd *ServiceDiscovery) WatchService(serviceName string) {
path := "/services/" + serviceName
for {
// ChildrenW: 获取子节点并设置Watch
children, stat, eventCh, err := sd.zkClient.conn.ChildrenW(path)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
// 更新服务实例列表
instances := make([]string, 0)
for _, child := range children {
data, _, _ := sd.zkClient.conn.Get(path + "/" + child)
instances = append(instances, string(data))
}
sd.services[serviceName] = instances
log.Printf("Service instances updated: %v", instances)
// 等待子节点变更事件
event := <-eventCh
if event.Type == zk.EventNodeChildrenChanged {
log.Printf("Service instances changed")
}
}
}
Watch实现原理:
Client端:
┌─────────────────────────────────┐
│ WatchManager │
│ - 维护Watch注册表 │
│ - path -> []Watcher │
└─────────────────────────────────┘
↓
getW("/config/db")
↓
┌─────────────────────────────────┐
│ 发送请求给Server │
│ Request: GET + Watch flag │
└─────────────────────────────────┘
Server端:
┌─────────────────────────────────┐
│ WatchManager │
│ - path -> Set<Session> │
│ - 记录哪些客户端监听哪些路径 │
└─────────────────────────────────┘
↓
数据变更时
↓
┌─────────────────────────────────┐
│ 触发Watch │
│ - 查找监听该path的所有Session │
│ - 发送WatchEvent给客户端 │
│ - 删除Watch(一次性) │
└─────────────────────────────────┘
ZooKeeper集群
部署模式:
推荐集群规模:
- 3节点:容忍1个故障(3/2+1=2)
- 5节点:容忍2个故障(5/2+1=3)
- 7节点:容忍3个故障(7/2+1=4)
不推荐偶数节点:
- 4节点和3节点容忍度相同(都是1个)
- 浪费资源
读写流程:
读请求(性能高):
Client → 任意节点(Leader/Follower)→ 直接返回
(可能读到稍旧的数据)
写请求(强一致):
Client → Follower → 转发给Leader
→ Leader → 2PC广播 → 过半ACK → 提交 → 返回客户端
顺序:
Client A: write(x=1) → zxid=1
Client B: write(x=2) → zxid=2
Client C: write(x=3) → zxid=3
所有节点看到的顺序一致:1 → 2 → 3
会话机制:
// 客户端连接
conn, sessionEvent, err := zk.Connect(
[]string{"zk1:2181", "zk2:2181", "zk3:2181"},
10*time.Second, // sessionTimeout
)
// 监听会话事件
go func() {
for event := range sessionEvent {
switch event.State {
case zk.StateConnecting:
log.Println("Connecting to ZK...")
case zk.StateConnected:
log.Println("Connected to ZK")
case zk.StateHasSession:
log.Println("Session established")
case zk.StateDisconnected:
log.Println("Disconnected, will retry...")
case zk.StateExpired:
log.Println("Session expired, need reconnect")
// 临时节点会被删除,需要重新注册
}
}
}()
Session过期机制:
1. 客户端定期发送心跳(ping)给Server
↓
2. Server更新Session的lastAccessTime
↓
3. 如果超过sessionTimeout未收到心跳
↓
4. Server标记Session为Expired
↓
5. 删除该Session创建的所有临时节点
↓
6. 触发Watch通知其他客户端
etcd详解
简介
etcd:CoreOS开源的分布式键值存储,使用Go语言实现
特点:
- 强一致性(Raft算法)
- 高可用(集群部署)
- 简单API(HTTP/gRPC)
- Watch支持(流式推送)
- 租约机制(Lease)
与ZooKeeper的区别:
- 更现代的设计(2013 vs 2008)
- 更简洁的API
- 更好的性能(gRPC vs 自定义协议)
- Kubernetes的默认选择
Raft协议
Raft vs Paxos:
- Raft更易理解(Strong Leader)
- Paxos理论完备,实现复杂
核心角色:
Leader(领导者):
- 处理所有客户端请求
- 复制日志给Follower
- 定期发送心跳
Follower(跟随者):
- 被动接收日志
- 转发请求给Leader
- 参与投票
Candidate(候选者):
- Follower超时未收到心跳 → 转为Candidate
- 发起选举
- 获得多数票 → 成为Leader
选举流程:
初始状态:所有节点都是Follower
┌─────────────────────────────────┐
│ Follower (term=0) │
└─────────────────────────────────┘
↓ (超时未收到心跳)
┌─────────────────────────────────┐
│ Candidate (term=1) │
│ - 投票给自己 │
│ - 向其他节点请求投票 │
└─────────────────────────────────┘
↓
获得多数票?
↙ ↘
Yes No
↓ ↓
Leader 重新选举
选举约束:
1. 一个term内,每个节点只能投一票
2. 先到先得(First-Come-First-Served)
3. Candidate的日志必须至少和Follower一样新
日志复制:
type LogEntry struct {
Term int64 // Leader的term
Index int64 // 日志索引
Command string // 实际命令
}
// Leader接收客户端写请求
func (l *Leader) HandleWrite(cmd string) error {
// 1. 追加到本地日志
entry := LogEntry{
Term: l.currentTerm,
Index: l.getLastLogIndex() + 1,
Command: cmd,
}
l.log = append(l.log, entry)
// 2. 并行复制给所有Follower
successCount := 1 // Leader自己
for _, follower := range l.followers {
go func(f *Follower) {
if f.AppendEntries(entry) {
successCount++
}
}(follower)
}
// 3. 等待多数确认
if successCount > len(l.followers)/2 {
// 4. 提交日志
l.commitIndex = entry.Index
// 5. 应用到状态机
l.applyToStateMachine(entry)
return nil
}
return errors.New("replication failed")
}
Raft保证:
1. Election Safety(选举安全):
一个term内最多只有一个Leader
2. Leader Append-Only(Leader只追加):
Leader的日志只能追加,不能删除或覆盖
3. Log Matching(日志匹配):
如果两个日志条目有相同的index和term,
则它们存储相同的命令,
且之前的所有日志条目都相同
4. Leader Completeness(Leader完整性):
如果某个日志条目在某个term被提交,
则该条目必然出现在更大term的Leader的日志中
5. State Machine Safety(状态机安全):
如果某个节点已应用某个日志条目到状态机,
则其他节点不会在相同index应用不同的命令
数据模型
扁平键值模型(与ZK的树形结构不同):
Key-Value Store:
/services/order/node1 → "192.168.1.10:8080"
/services/order/node2 → "192.168.1.11:8080"
/config/db/host → "mysql.prod.com"
/config/db/port → "3306"
特点:
- 简单直接
- 前缀查询
- 范围扫描
版本机制:
每个Key都有版本信息:
{
"key": "/config/db/host",
"value": "mysql.prod.com",
"create_revision": 100, // 创建时的revision
"mod_revision": 150, // 最后修改时的revision
"version": 5, // 该key的修改次数
"lease": 12345 // 租约ID(可选)
}
全局Revision:
- 单调递增
- 类似ZK的zxid
- 用于MVCC(多版本并发控制)
代码示例:
package main
import (
"context"
"go.etcd.io/etcd/client/v3"
"time"
)
type EtcdClient struct {
client *clientv3.Client
}
func NewEtcdClient(endpoints []string) (*EtcdClient, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
return &EtcdClient{client: client}, nil
}
// 写入数据
func (c *EtcdClient) Put(key, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := c.client.Put(ctx, key, value)
return err
}
// 读取数据
func (c *EtcdClient) Get(key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
resp, err := c.client.Get(ctx, key)
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", nil
}
return string(resp.Kvs[0].Value), nil
}
// 前缀查询
func (c *EtcdClient) GetPrefix(prefix string) (map[string]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
resp, err := c.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
result := make(map[string]string)
for _, kv := range resp.Kvs {
result[string(kv.Key)] = string(kv.Value)
}
return result, nil
}
// CAS(Compare-And-Swap)操作
func (c *EtcdClient) CAS(key, oldValue, newValue string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// 事务:如果value=oldValue,则更新为newValue
txn := c.client.Txn(ctx)
txn = txn.If(
clientv3.Compare(clientv3.Value(key), "=", oldValue),
).Then(
clientv3.OpPut(key, newValue),
)
resp, err := txn.Commit()
if err != nil {
return false, err
}
return resp.Succeeded, nil
}
Lease租约机制
定义:Lease是etcd的租约机制,用于实现临时节点(类似ZK的Ephemeral)
特点:
- Key绑定Lease
- Lease过期 → 绑定的Key自动删除
- 支持KeepAlive续约
代码示例:
// 服务注册(使用Lease实现临时节点)
type ServiceRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
stopCh chan struct{}
}
func (sr *ServiceRegistry) Register(serviceName, addr string, ttl int64) error {
ctx := context.Background()
// 1. 创建租约
leaseResp, err := sr.client.Grant(ctx, ttl)
if err != nil {
return err
}
sr.leaseID = leaseResp.ID
// 2. 注册服务(绑定租约)
key := "/services/" + serviceName + "/" + addr
_, err = sr.client.Put(ctx, key, addr, clientv3.WithLease(sr.leaseID))
if err != nil {
return err
}
// 3. 自动续约
sr.stopCh = make(chan struct{})
go sr.keepAlive()
return nil
}
func (sr *ServiceRegistry) keepAlive() {
ctx := context.Background()
// KeepAlive返回一个channel,定期发送续约响应
keepAliveCh, err := sr.client.KeepAlive(ctx, sr.leaseID)
if err != nil {
log.Printf("KeepAlive error: %v", err)
return
}
for {
select {
case resp := <-keepAliveCh:
if resp == nil {
log.Println("Lease expired")
return
}
log.Printf("Lease %d keepalive, TTL=%d", resp.ID, resp.TTL)
case <-sr.stopCh:
// 主动撤销租约
sr.client.Revoke(ctx, sr.leaseID)
return
}
}
}
func (sr *ServiceRegistry) Unregister() {
close(sr.stopCh)
}
Lease vs Session:
| 特性 | etcd Lease | ZK Session |
|---|---|---|
| 实现方式 | 显式创建Lease | 隐式Session |
| 续约方式 | KeepAlive主动续约 | 心跳自动续约 |
| 灵活性 | 高(可单独管理) | 低(绑定连接) |
| 复杂度 | 稍高 | 简单 |
Watch机制
与ZK Watch的区别:
| 特性 | etcd Watch | ZK Watch |
|---|---|---|
| 触发次数 | 持续监听 | 一次性 |
| 实现方式 | gRPC流式推送 | 事件通知 |
| 历史回溯 | 支持(通过revision) | 不支持 |
| 性能 | 高 | 中等 |
代码示例:
// 配置热更新(持续监听)
type ConfigWatcher struct {
client *clientv3.Client
cache map[string]string
}
func (w *ConfigWatcher) WatchConfig(key string) {
ctx := context.Background()
// Watch返回一个channel,持续推送事件
watchCh := w.client.Watch(ctx, key)
for watchResp := range watchCh {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
log.Printf("Config updated: %s = %s",
string(event.Kv.Key), string(event.Kv.Value))
w.cache[string(event.Kv.Key)] = string(event.Kv.Value)
case mvccpb.DELETE:
log.Printf("Config deleted: %s", string(event.Kv.Key))
delete(w.cache, string(event.Kv.Key))
}
}
}
}
// 服务发现(监听前缀)
type ServiceDiscovery struct {
client *clientv3.Client
services map[string][]string
}
func (sd *ServiceDiscovery) WatchService(serviceName string) {
ctx := context.Background()
prefix := "/services/" + serviceName + "/"
// 1. 首次获取所有实例
resp, _ := sd.client.Get(ctx, prefix, clientv3.WithPrefix())
instances := make([]string, 0)
for _, kv := range resp.Kvs {
instances = append(instances, string(kv.Value))
}
sd.services[serviceName] = instances
// 2. 监听前缀(持续)
watchCh := sd.client.Watch(ctx, prefix, clientv3.WithPrefix())
for watchResp := range watchCh {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
// 新实例上线
addr := string(event.Kv.Value)
sd.services[serviceName] = append(sd.services[serviceName], addr)
log.Printf("Service instance added: %s", addr)
case mvccpb.DELETE:
// 实例下线
addr := string(event.Kv.Value)
sd.removeInstance(serviceName, addr)
log.Printf("Service instance removed: %s", addr)
}
}
}
}
func (sd *ServiceDiscovery) removeInstance(serviceName, addr string) {
instances := sd.services[serviceName]
for i, instance := range instances {
if instance == addr {
sd.services[serviceName] = append(instances[:i], instances[i+1:]...)
break
}
}
}
历史版本查询:
// 查询某个revision时的数据(时间旅行)
func (c *EtcdClient) GetAtRevision(key string, revision int64) (string, error) {
ctx := context.Background()
resp, err := c.client.Get(ctx, key, clientv3.WithRev(revision))
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", nil
}
return string(resp.Kvs[0].Value), nil
}
// 监听从某个revision开始的变更
func (c *EtcdClient) WatchFromRevision(key string, revision int64) {
ctx := context.Background()
// 从指定revision开始监听
watchCh := c.client.Watch(ctx, key, clientv3.WithRev(revision))
for watchResp := range watchCh {
for _, event := range watchResp.Events {
log.Printf("Event at revision %d: %v",
event.Kv.ModRevision, event.Type)
}
}
}
ZooKeeper vs etcd
全面对比
| 对比维度 | ZooKeeper | etcd |
|---|---|---|
| 开发语言 | Java | Go |
| 发布时间 | 2008年 | 2013年 |
| 共识算法 | ZAB(Paxos变种) | Raft |
| 数据模型 | 树形结构(ZNode) | 扁平KV |
| 协议 | 自定义协议 | HTTP/gRPC |
| Watch机制 | 一次性,需重新注册 | 持续监听 |
| 临时节点 | Session绑定 | Lease机制 |
| 客户端库 | 多语言支持 | Go优先,多语言支持 |
| 性能 | 中等(万级QPS) | 高(万级-十万级QPS) |
| 运维复杂度 | 较高(JVM调优) | 较低 |
| 生态 | Hadoop、Kafka、HBase | Kubernetes、Service Mesh |
适用场景
ZooKeeper适合:
- Java技术栈
场景:
- Kafka(Controller选举、Broker管理)
- HBase(RegionServer管理)
- Dubbo(服务注册发现)
- 大数据生态(Hadoop、Storm)
原因:
- 同为Java,集成方便
- 成熟稳定(10+ years)
- 大量现有案例
- 需要树形结构
场景:
- 权限管理(ACL树)
- 配置管理(层级配置)
- 命名服务(路径即名称)
优势:
- 树形结构更直观
- 父子关系明确
- 批量操作方便
etcd适合:
- Kubernetes生态
场景:
- Kubernetes(存储集群状态)
- Service Mesh(Istio配置)
- Cloud Native应用
原因:
- Kubernetes默认选择
- gRPC性能高
- Watch机制更强大
- 高性能要求
场景:
- 高频配置变更
- 大量Watch监听
- 微服务注册发现
优势:
- gRPC协议效率高
- 持续Watch,无需重新注册
- 更好的并发性能
- 简单运维
优势:
- 单一二进制文件
- 无JVM内存调优
- 配置简单
- 容器化友好
性能对比
写性能测试(3节点集群):
| 操作类型 | ZooKeeper | etcd |
|---|---|---|
| 单次写入延迟 | ~5ms | ~3ms |
| 写入QPS | 10,000 | 30,000+ |
| 批量写入 | 不支持 | 支持(Txn) |
读性能测试:
| 操作类型 | ZooKeeper | etcd |
|---|---|---|
| 单次读取延迟 | ~1ms | ~0.5ms |
| 读取QPS | 50,000+ | 100,000+ |
| 范围查询 | 不支持 | 支持 |
Watch性能:
| 对比维度 | ZooKeeper | etcd |
|---|---|---|
| Watch数量 | 10,000+ | 100,000+ |
| 事件延迟 | ~10ms | ~5ms |
| 历史回溯 | 不支持 | 支持 |
应用场景
1. 配置中心
场景:统一管理配置,动态更新
// 配置中心客户端(etcd实现)
type ConfigCenter struct {
client *clientv3.Client
cache sync.Map // 本地缓存
}
func NewConfigCenter(endpoints []string) (*ConfigCenter, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
cc := &ConfigCenter{client: client}
// 启动时加载所有配置
cc.loadAllConfigs()
// 监听配置变更
go cc.watchConfigs()
return cc, nil
}
func (cc *ConfigCenter) loadAllConfigs() error {
ctx := context.Background()
resp, err := cc.client.Get(ctx, "/config/", clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
cc.cache.Store(string(kv.Key), string(kv.Value))
}
return nil
}
func (cc *ConfigCenter) watchConfigs() {
ctx := context.Background()
watchCh := cc.client.Watch(ctx, "/config/", clientv3.WithPrefix())
for watchResp := range watchCh {
for _, event := range watchResp.Events {
key := string(event.Kv.Key)
switch event.Type {
case mvccpb.PUT:
value := string(event.Kv.Value)
cc.cache.Store(key, value)
log.Printf("Config updated: %s = %s", key, value)
// 触发回调(通知应用层)
cc.notifyConfigChange(key, value)
case mvccpb.DELETE:
cc.cache.Delete(key)
log.Printf("Config deleted: %s", key)
}
}
}
}
// 获取配置(从本地缓存)
func (cc *ConfigCenter) GetString(key string) string {
if value, ok := cc.cache.Load("/config/" + key); ok {
return value.(string)
}
return ""
}
func (cc *ConfigCenter) GetInt(key string) int {
value := cc.GetString(key)
if value == "" {
return 0
}
intVal, _ := strconv.Atoi(value)
return intVal
}
// 使用示例
func main() {
cc, _ := NewConfigCenter([]string{"localhost:2379"})
// 读取配置
dbHost := cc.GetString("db.host")
maxQPS := cc.GetInt("rate_limit.max_qps")
log.Printf("DB Host: %s, Max QPS: %d", dbHost, maxQPS)
// 配置变更会自动更新到本地缓存
select {}
}
2. 服务注册与发现
场景:微服务动态上下线
// 服务注册
type ServiceRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
stopCh chan struct{}
}
func (sr *ServiceRegistry) Register(serviceName, addr string, ttl int64) error {
ctx := context.Background()
// 创建租约
leaseResp, err := sr.client.Grant(ctx, ttl)
if err != nil {
return err
}
sr.leaseID = leaseResp.ID
// 注册服务
key := fmt.Sprintf("/services/%s/%s", serviceName, addr)
_, err = sr.client.Put(ctx, key, addr, clientv3.WithLease(sr.leaseID))
if err != nil {
return err
}
// 自动续约
sr.stopCh = make(chan struct{})
go sr.keepAlive()
log.Printf("Service registered: %s -> %s", serviceName, addr)
return nil
}
func (sr *ServiceRegistry) keepAlive() {
ctx := context.Background()
keepAliveCh, _ := sr.client.KeepAlive(ctx, sr.leaseID)
for {
select {
case <-keepAliveCh:
// 续约成功
case <-sr.stopCh:
sr.client.Revoke(ctx, sr.leaseID)
return
}
}
}
func (sr *ServiceRegistry) Unregister() {
close(sr.stopCh)
}
// 服务发现
type ServiceDiscovery struct {
client *clientv3.Client
services sync.Map // serviceName -> []string
callbacks sync.Map // serviceName -> []func([]string)
}
func NewServiceDiscovery(endpoints []string) (*ServiceDiscovery, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
return &ServiceDiscovery{client: client}, nil
}
func (sd *ServiceDiscovery) Discover(serviceName string) []string {
prefix := "/services/" + serviceName + "/"
// 首次获取
ctx := context.Background()
resp, _ := sd.client.Get(ctx, prefix, clientv3.WithPrefix())
instances := make([]string, 0)
for _, kv := range resp.Kvs {
instances = append(instances, string(kv.Value))
}
sd.services.Store(serviceName, instances)
// 监听变更
go sd.watchService(serviceName)
return instances
}
func (sd *ServiceDiscovery) watchService(serviceName string) {
prefix := "/services/" + serviceName + "/"
ctx := context.Background()
watchCh := sd.client.Watch(ctx, prefix, clientv3.WithPrefix())
for watchResp := range watchCh {
instances := sd.getCurrentInstances(serviceName)
for _, event := range watchResp.Events {
addr := string(event.Kv.Value)
switch event.Type {
case mvccpb.PUT:
instances = append(instances, addr)
log.Printf("Service instance added: %s", addr)
case mvccpb.DELETE:
instances = sd.removeInstance(instances, addr)
log.Printf("Service instance removed: %s", addr)
}
}
sd.services.Store(serviceName, instances)
sd.notifyCallbacks(serviceName, instances)
}
}
func (sd *ServiceDiscovery) OnChange(serviceName string, callback func([]string)) {
callbacks, _ := sd.callbacks.LoadOrStore(serviceName, []func([]string){})
callbacks = append(callbacks.([]func([]string)), callback)
sd.callbacks.Store(serviceName, callbacks)
}
// 使用示例
func main() {
// 服务注册
registry := &ServiceRegistry{client: etcdClient}
registry.Register("order-service", "192.168.1.10:8080", 10)
// 服务发现
discovery, _ := NewServiceDiscovery([]string{"localhost:2379"})
instances := discovery.Discover("order-service")
fmt.Printf("Order service instances: %v\n", instances)
// 监听服务变更
discovery.OnChange("order-service", func(instances []string) {
fmt.Printf("Instances updated: %v\n", instances)
})
}
3. Leader选举
场景:分布式系统中选出主节点
// Leader选举(etcd实现)
type LeaderElection struct {
client *clientv3.Client
session *concurrency.Session
election *concurrency.Election
isLeader bool
callbacks struct {
onElected func()
onLost func()
}
}
func NewLeaderElection(endpoints []string, electionKey string) (*LeaderElection, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
// 创建Session
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
return nil, err
}
// 创建Election对象
election := concurrency.NewElection(session, electionKey)
return &LeaderElection{
client: client,
session: session,
election: election,
}, nil
}
// 竞选Leader
func (le *LeaderElection) Campaign(ctx context.Context, value string) error {
// Campaign会阻塞直到成为Leader
if err := le.election.Campaign(ctx, value); err != nil {
return err
}
le.isLeader = true
log.Printf("Elected as leader: %s", value)
// 触发回调
if le.callbacks.onElected != nil {
le.callbacks.onElected()
}
// 监听Leader变更
go le.watchLeader(ctx)
return nil
}
func (le *LeaderElection) watchLeader(ctx context.Context) {
observeCh := le.election.Observe(ctx)
for resp := range observeCh {
if string(resp.Kvs[0].Value) != le.election.Key() {
// Leader变更
le.isLeader = false
log.Println("Lost leadership")
if le.callbacks.onLost != nil {
le.callbacks.onLost()
}
}
}
}
// 主动放弃Leader
func (le *LeaderElection) Resign(ctx context.Context) error {
le.isLeader = false
return le.election.Resign(ctx)
}
func (le *LeaderElection) IsLeader() bool {
return le.isLeader
}
func (le *LeaderElection) OnElected(callback func()) {
le.callbacks.onElected = callback
}
func (le *LeaderElection) OnLost(callback func()) {
le.callbacks.onLost = callback
}
// 使用示例
func main() {
nodeID := "node-1"
election, _ := NewLeaderElection(
[]string{"localhost:2379"},
"/election/master",
)
// 设置回调
election.OnElected(func() {
fmt.Println("I am the leader now!")
// 执行Leader逻辑
})
election.OnLost(func() {
fmt.Println("I lost leadership")
// 停止Leader逻辑
})
// 参与竞选
ctx := context.Background()
election.Campaign(ctx, nodeID)
}
实战案例
案例1:分布式配置中心(完整实现)
需求:
- 集中管理配置
- 配置热更新
- 多环境支持(dev、test、prod)
- 配置版本管理
架构设计:
┌─────────────────────────────────────────┐
│ Management Console │
│ (配置管理界面) │
└─────────────────────────────────────────┘
↓ (HTTP API)
┌─────────────────────────────────────────┐
│ Config Center Server │
│ - RESTful API │
│ - 权限校验 │
│ - 审计日志 │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ etcd Cluster │
│ /config/{env}/{app}/{key} │
│ /config/prod/order/db.host │
└─────────────────────────────────────────┘
↑ (Watch)
┌──────────┬──────────┬──────────┬─────────┐
│ Service1 │ Service2 │ Service3 │ ... │
│ (Client) │ (Client) │ (Client) │ │
└──────────┴──────────┴──────────┴─────────┘
代码实现:
// 配置中心服务端
type ConfigServer struct {
etcdClient *clientv3.Client
}
// 创建/更新配置
func (s *ConfigServer) SetConfig(env, app, key, value string) error {
ctx := context.Background()
configKey := fmt.Sprintf("/config/%s/%s/%s", env, app, key)
// 记录审计日志
s.auditLog("SET", configKey, value)
_, err := s.etcdClient.Put(ctx, configKey, value)
return err
}
// 获取配置
func (s *ConfigServer) GetConfig(env, app, key string) (string, error) {
ctx := context.Background()
configKey := fmt.Sprintf("/config/%s/%s/%s", env, app, key)
resp, err := s.etcdClient.Get(ctx, configKey)
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", errors.New("config not found")
}
return string(resp.Kvs[0].Value), nil
}
// 配置中心客户端
type ConfigClient struct {
etcdClient *clientv3.Client
env string
app string
cache sync.Map
listeners []ConfigListener
}
type ConfigListener interface {
OnConfigChange(key, value string)
}
func NewConfigClient(endpoints []string, env, app string) (*ConfigClient, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
cc := &ConfigClient{
etcdClient: client,
env: env,
app: app,
}
// 加载所有配置
cc.loadConfigs()
// 监听配置变更
go cc.watchConfigs()
return cc, nil
}
func (cc *ConfigClient) loadConfigs() error {
ctx := context.Background()
prefix := fmt.Sprintf("/config/%s/%s/", cc.env, cc.app)
resp, err := cc.etcdClient.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
key := strings.TrimPrefix(string(kv.Key), prefix)
cc.cache.Store(key, string(kv.Value))
}
log.Printf("Loaded %d configs", len(resp.Kvs))
return nil
}
func (cc *ConfigClient) watchConfigs() {
ctx := context.Background()
prefix := fmt.Sprintf("/config/%s/%s/", cc.env, cc.app)
watchCh := cc.etcdClient.Watch(ctx, prefix, clientv3.WithPrefix())
for watchResp := range watchCh {
for _, event := range watchResp.Events {
key := strings.TrimPrefix(string(event.Kv.Key), prefix)
value := string(event.Kv.Value)
switch event.Type {
case mvccpb.PUT:
cc.cache.Store(key, value)
log.Printf("Config updated: %s = %s", key, value)
// 通知监听器
for _, listener := range cc.listeners {
listener.OnConfigChange(key, value)
}
}
}
}
}
func (cc *ConfigClient) Get(key string) string {
if value, ok := cc.cache.Load(key); ok {
return value.(string)
}
return ""
}
func (cc *ConfigClient) GetInt(key string, defaultValue int) int {
value := cc.Get(key)
if value == "" {
return defaultValue
}
intVal, _ := strconv.Atoi(value)
return intVal
}
func (cc *ConfigClient) AddListener(listener ConfigListener) {
cc.listeners = append(cc.listeners, listener)
}
// 使用示例
type MyApp struct {
configClient *ConfigClient
maxQPS int
}
func (app *MyApp) OnConfigChange(key, value string) {
if key == "max_qps" {
newQPS, _ := strconv.Atoi(value)
app.maxQPS = newQPS
log.Printf("QPS limit updated to %d", newQPS)
}
}
func main() {
// 创建配置客户端
configClient, _ := NewConfigClient(
[]string{"localhost:2379"},
"prod",
"order-service",
)
app := &MyApp{
configClient: configClient,
maxQPS: configClient.GetInt("max_qps", 1000),
}
// 注册监听器
configClient.AddListener(app)
// 读取配置
dbHost := configClient.Get("db.host")
dbPort := configClient.GetInt("db.port", 3306)
log.Printf("DB: %s:%d, Max QPS: %d", dbHost, dbPort, app.maxQPS)
select {}
}
面试问答
ZooKeeper的ZAB协议和Raft有什么区别?
答案:
相似之处:
- 都是基于Leader的共识算法
- 都需要多数派确认
- 都保证强一致性
核心区别:
| 对比维度 | ZAB | Raft |
|---|---|---|
| 设计目标 | 为ZooKeeper定制 | 通用共识算法 |
| Leader选举 | 基于zxid+myid | 基于term+日志完整性 |
| 日志复制 | 可以乱序,最终排序 | 严格有序 |
| 状态 | Looking/Following/Leading | Follower/Candidate/Leader |
| 理解难度 | 较难 | 相对容易 |
代码对比:
// ZAB选举规则
func (node *ZKNode) CompareVote(vote1, vote2 Vote) Vote {
// 1. zxid更大的优先(数据更新)
if vote1.Zxid > vote2.Zxid {
return vote1
}
if vote2.Zxid > vote1.Zxid {
return vote2
}
// 2. zxid相同,myid更大的优先
if vote1.MyId > vote2.MyId {
return vote1
}
return vote2
}
// Raft选举规则
func (node *RaftNode) RequestVote(req VoteRequest) bool {
// 1. term更大的优先
if req.Term < node.currentTerm {
return false
}
// 2. 已经投过票,拒绝
if node.votedFor != "" && node.votedFor != req.CandidateId {
return false
}
// 3. 候选者的日志必须至少和自己一样新
if !node.isLogUpToDate(req.LastLogIndex, req.LastLogTerm) {
return false
}
return true
}
etcd的Watch机制是如何实现的?
答案:
实现原理:
1. 客户端建立gRPC流式连接:
Client → etcd Server (Watch RPC)
2. etcd维护Watch表:
WatchableStore:
- key -> []WatchStream
- 记录哪些客户端监听哪些key
3. 数据变更时:
Write → Apply to State Machine
→ Notify WatchableStore
→ Push Event to WatchStream
→ Client收到事件
4. MVCC支持历史版本:
每次变更生成新revision
Watch可以从任意revision开始
代码示例:
// etcd Watch服务端实现(简化版)
type WatchableStore struct {
mu sync.RWMutex
watchers map[string][]*WatchStream // key -> streams
revisions map[int64]*Event // revision -> event
}
func (ws *WatchableStore) Watch(key string, startRevision int64) *WatchStream {
ws.mu.Lock()
defer ws.mu.Unlock()
stream := &WatchStream{
key: key,
eventC: make(chan *Event, 100),
}
ws.watchers[key] = append(ws.watchers[key], stream)
// 发送历史事件(从startRevision开始)
go func() {
for rev := startRevision; ; rev++ {
if event, ok := ws.revisions[rev]; ok {
if event.Key == key {
stream.eventC <- event
}
} else {
break
}
}
}()
return stream
}
func (ws *WatchableStore) Notify(key string, event *Event) {
ws.mu.RLock()
defer ws.mu.RUnlock()
// 保存事件(MVCC)
ws.revisions[event.Revision] = event
// 通知所有Watch该key的stream
for _, stream := range ws.watchers[key] {
select {
case stream.eventC <- event:
default:
// channel满了,跳过(客户端消费慢)
}
}
}
与ZK Watch的对比:
// ZK Watch(一次性)
for {
data, _, eventCh, _ := zkConn.GetW("/config/key")
handleData(data)
event := <-eventCh
// 需要重新设置Watch(循环继续)
}
// etcd Watch(持续)
watchCh := etcdClient.Watch(ctx, "/config/key")
for watchResp := range watchCh {
for _, event := range watchResp.Events {
handleEvent(event)
}
// 无需重新设置,持续接收
}
如何保证分布式协调服务的高可用?
答案:
1. 集群部署(多数派)
推荐部署:
- 3节点:容忍1个故障
- 5节点:容忍2个故障
- 7节点:容忍3个故障
示例:
┌─────────────────────────────────┐
│ ZK Cluster (5 nodes) │
│ Node1 (Leader) │
│ Node2, Node3, Node4, Node5 │
└─────────────────────────────────┘
故障场景:
- Node1宕机 → 自动选举新Leader
- Node1+Node2宕机 → 仍有多数派(3/5)
- Node1+Node2+Node3宕机 → 不可用(2/5 < 3)
2. 客户端重连机制
type ResilientClient struct {
endpoints []string
client *clientv3.Client
mu sync.RWMutex
}
func (rc *ResilientClient) ensureConnected() error {
rc.mu.Lock()
defer rc.mu.Unlock()
// 检查连接状态
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := rc.client.Get(ctx, "health_check")
if err == nil {
return nil // 连接正常
}
// 重新连接
log.Println("Reconnecting to etcd...")
newClient, err := clientv3.New(clientv3.Config{
Endpoints: rc.endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
rc.client.Close()
rc.client = newClient
log.Println("Reconnected successfully")
return nil
}
func (rc *ResilientClient) Get(key string) (string, error) {
// 自动重连
if err := rc.ensureConnected(); err != nil {
return "", err
}
ctx := context.Background()
resp, err := rc.client.Get(ctx, key)
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", nil
}
return string(resp.Kvs[0].Value), nil
}
3. 数据备份与恢复
# etcd快照备份
ETCDCTL_API=3 etcdctl snapshot save snapshot.db \
--endpoints=https://127.0.0.1:2379 \
--cacert=/etc/etcd/ca.crt \
--cert=/etc/etcd/server.crt \
--key=/etc/etcd/server.key
# 恢复
ETCDCTL_API=3 etcdctl snapshot restore snapshot.db \
--name=node1 \
--initial-cluster=node1=http://localhost:2380 \
--initial-advertise-peer-urls=http://localhost:2380
4. 监控告警
# Prometheus告警规则
groups:
- name: etcd
rules:
- alert: EtcdClusterUnavailable
expr: up{job="etcd"} == 0
for: 1m
annotations:
summary: "etcd cluster unavailable"
- alert: EtcdHighLatency
expr: histogram_quantile(0.99, rate(etcd_disk_wal_fsync_duration_seconds_bucket[5m])) > 0.1
for: 5m
annotations:
summary: "etcd high latency"
- alert: EtcdInsufficientMembers
expr: count(up{job="etcd"} == 1) < 3
for: 1m
annotations:
summary: "etcd insufficient members"
ZooKeeper的Session过期和etcd的Lease过期有什么区别?
答案:
| 对比维度 | ZooKeeper Session | etcd Lease |
|---|---|---|
| 绑定方式 | 隐式绑定连接 | 显式创建Lease |
| 续约方式 | 自动心跳 | 主动KeepAlive |
| 过期处理 | 删除所有临时节点 | 删除绑定的Key |
| 灵活性 | 低(一个连接一个Session) | 高(多个Key可共享Lease) |
| 生命周期 | 与连接绑定 | 独立管理 |
ZooKeeper Session:
// Session自动管理
conn, sessionEvent, _ := zk.Connect(
[]string{"localhost:2181"},
10*time.Second, // sessionTimeout
)
// 自动心跳,无需手动处理
// 连接断开 → Session过期 → 临时节点删除
// 监听Session事件
go func() {
for event := range sessionEvent {
switch event.State {
case zk.StateConnected:
log.Println("Connected")
case zk.StateExpired:
log.Println("Session expired")
// 需要重新连接,重新注册临时节点
}
}
}()
etcd Lease:
// 显式管理Lease
type ServiceRegistry struct {
client *clientv3.Client
leases map[string]clientv3.LeaseID // 多个Key可用不同Lease
}
func (sr *ServiceRegistry) Register(key, value string, ttl int64) error {
ctx := context.Background()
// 创建Lease
leaseResp, _ := sr.client.Grant(ctx, ttl)
leaseID := leaseResp.ID
// 绑定Key到Lease
sr.client.Put(ctx, key, value, clientv3.WithLease(leaseID))
// 主动续约
keepAliveCh, _ := sr.client.KeepAlive(ctx, leaseID)
go func() {
for range keepAliveCh {
// 续约成功
}
}()
sr.leases[key] = leaseID
return nil
}
// 灵活控制:可以单独撤销某个Lease
func (sr *ServiceRegistry) Unregister(key string) {
if leaseID, ok := sr.leases[key]; ok {
sr.client.Revoke(context.Background(), leaseID)
delete(sr.leases, key)
}
}
选择建议:
- ZooKeeper:适合简单场景,自动管理更方便
- etcd:适合复杂场景,需要精细控制生命周期
如何优化分布式协调服务的性能?
答案:
1. 读写分离
// etcd支持从任意节点读取(可能读到旧数据)
func (c *Client) Get(key string, allowStale bool) (string, error) {
ctx := context.Background()
opts := []clientv3.OpOption{}
if allowStale {
// 从本地节点读取(可能不是Leader)
opts = append(opts, clientv3.WithSerializable())
}
resp, err := c.client.Get(ctx, key, opts...)
// ...
}
// 使用建议:
// - 强一致性要求:不使用Serializable
// - 可以容忍短暂不一致:使用Serializable(性能提升10倍)
2. 批量操作
// etcd事务(原子批量操作)
func (c *Client) BatchPut(kvs map[string]string) error {
ctx := context.Background()
ops := make([]clientv3.Op, 0, len(kvs))
for k, v := range kvs {
ops = append(ops, clientv3.OpPut(k, v))
}
// 一次性提交多个写入
_, err := c.client.Txn(ctx).Then(ops...).Commit()
return err
}
// 性能对比:
// 逐个Put:1000次 → 5秒
// 批量Put:1000次 → 0.5秒(10倍提升)
3. 本地缓存
// 配置中心客户端缓存
type CachedConfigClient struct {
etcdClient *clientv3.Client
cache *lru.Cache // LRU缓存
}
func (cc *CachedConfigClient) Get(key string) (string, error) {
// 1. 先查缓存
if value, ok := cc.cache.Get(key); ok {
return value.(string), nil
}
// 2. 缓存未命中,查etcd
ctx := context.Background()
resp, err := cc.etcdClient.Get(ctx, key)
if err != nil {
return "", err
}
value := string(resp.Kvs[0].Value)
// 3. 更新缓存
cc.cache.Add(key, value)
return value, nil
}
// Watch机制更新缓存
func (cc *CachedConfigClient) watchAndUpdate() {
ctx := context.Background()
watchCh := cc.etcdClient.Watch(ctx, "/config/", clientv3.WithPrefix())
for watchResp := range watchCh {
for _, event := range watchResp.Events {
key := string(event.Kv.Key)
switch event.Type {
case mvccpb.PUT:
cc.cache.Add(key, string(event.Kv.Value))
case mvccpb.DELETE:
cc.cache.Remove(key)
}
}
}
}
// 缓存命中率:95% → 响应时间从5ms降到0.1ms
4. 连接池
// etcd客户端连接池
type ClientPool struct {
clients []*clientv3.Client
index int
mu sync.Mutex
}
func NewClientPool(endpoints []string, poolSize int) (*ClientPool, error) {
clients := make([]*clientv3.Client, poolSize)
for i := 0; i < poolSize; i++ {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
clients[i] = client
}
return &ClientPool{clients: clients}, nil
}
func (p *ClientPool) GetClient() *clientv3.Client {
p.mu.Lock()
defer p.mu.Unlock()
client := p.clients[p.index]
p.index = (p.index + 1) % len(p.clients)
return client
}
// 性能提升:单连接10K QPS → 连接池(5个)40K QPS
5. 压缩与清理
# etcd定期压缩历史版本(避免空间膨胀)
ETCDCTL_API=3 etcdctl compact 1000000
# 碎片整理
ETCDCTL_API=3 etcdctl defrag