高可用架构设计
交易所宕机是灾难性的。当BTC暴跌时,如果用户无法登录、无法平仓,客服电话被打爆,即使只是45分钟的故障也可能导致平台赔偿超过50万美元。对于承诺99.9% SLA的交易所来说,高可用性是核心竞争力。
本章讲解如何设计高可用架构,将可用性从99.9%提升到99.99%(年宕机时间从8.76小时降到52.6分钟),分享交易所高可用架构的设计思路和实战经验。
1. 高可用架构原则
1.1 核心指标
| 指标 | 含义 | 计算方式 | 目标 |
|---|---|---|---|
| Availability | 可用性 | (总时间-宕机时间)/总时间 | 99.99% |
| MTBF | 平均故障间隔时间 | 总运行时间/故障次数 | >720h |
| MTTR | 平均恢复时间 | 总故障时间/故障次数 | <5min |
| RPO | 恢复点目标 | 可接受的数据丢失量 | <1min |
| RTO | 恢复时间目标 | 可接受的服务中断时间 | <2min |
可用性等级:
- 99.9% (3个9) = 年宕机8.76小时
- 99.99% (4个9) = 年宕机52.6分钟
- 99.999% (5个9) = 年宕机5.26分钟
1.2 架构设计原则
- 无单点故障 (No Single Point of Failure)
- 故障隔离 (Failure Isolation)
- 快速故障检测 (Fast Failure Detection)
- 自动故障转移 (Automatic Failover)
- 优雅降级 (Graceful Degradation)
2. 整体架构
2.1 多地域部署
全球架构:
├── 美国东部 (us-east)
│ ├── 主数据中心
│ ├── 完整服务
│ └── 读写流量
│
├── 美国西部 (us-west)
│ ├── 备份数据中心
│ ├── 完整服务
│ └── 读流量 + 故障切换
│
├── 新加坡 (ap-southeast)
│ ├── 亚洲数据中心
│ ├── 完整服务
│ └── 亚洲用户流量
│
└── 欧洲 (eu-west)
├── 欧洲数据中心
├── 完整服务
└── 欧洲用户流量
2.2 单数据中心架构
用户
↓
Global Load Balancer (DNS)
↓
┌────────────┴────────────┐
↓ ↓
Region A Region B
↓ ↓
CDN + WAF CDN + WAF
↓ ↓
Load Balancer Load Balancer
↓ ↓
┌───┴───┐ ┌───┴───┐
↓ ↓ ↓ ↓
Web×3 API×5 Web×3 API×5
↓ ↓ ↓ ↓
├───────┴─────────────────┴───────┤
↓ ↓
WebSocket×10 WebSocket×10
↓ ↓
├─────────────┬───────────────────┤
↓ ↓ ↓
Matching×3 Account×5 Market×5
↓ ↓ ↓
└─────────────┴───────────────────┘
↓
┌─────────┴─────────┐
↓ ↓
Master DB Slave DB×3
↓ ↓
Redis Cluster Cache Cluster
↓ ↓
Kafka Cluster Message Queue
3. 负载均衡
3.1 DNS负载均衡
// GeoDNS配置
type GeoDNSConfig struct {
Regions map[string]*RegionConfig
}
type RegionConfig struct {
Name string
Endpoints []string
Priority int
Weight int
HealthCheck *HealthCheckConfig
}
type HealthCheckConfig struct {
Interval time.Duration
Timeout time.Duration
UnhealthyThreshold int
HealthyThreshold int
}
// DNS解析器
type GeoDNSResolver struct {
config *GeoDNSConfig
health map[string]bool
mu sync.RWMutex
}
func (r *GeoDNSResolver) Resolve(clientIP string) string {
// 1. 根据客户端IP确定最近的区域
region := r.findNearestRegion(clientIP)
// 2. 获取该区域的健康端点
endpoints := r.getHealthyEndpoints(region)
// 3. 按权重选择端点
if len(endpoints) == 0 {
// 如果本区域无健康端点,返回其他区域
endpoints = r.getAnyHealthyEndpoint()
}
// 4. 加权随机选择
return r.weightedRandomSelect(endpoints)
}
func (r *GeoDNSResolver) findNearestRegion(clientIP string) string {
// 使用GeoIP数据库查找最近的区域
// 实现略
return "us-east"
}
func (r *GeoDNSResolver) getHealthyEndpoints(region string) []string {
r.mu.RLock()
defer r.mu.RUnlock()
config := r.config.Regions[region]
healthy := make([]string, 0)
for _, endpoint := range config.Endpoints {
if r.health[endpoint] {
healthy = append(healthy, endpoint)
}
}
return healthy
}
// 健康检查
func (r *GeoDNSResolver) StartHealthCheck(ctx context.Context) {
for region, config := range r.config.Regions {
for _, endpoint := range config.Endpoints {
go r.healthCheckLoop(ctx, region, endpoint, config.HealthCheck)
}
}
}
func (r *GeoDNSResolver) healthCheckLoop(ctx context.Context, region, endpoint string, config *HealthCheckConfig) {
ticker := time.NewTicker(config.Interval)
defer ticker.Stop()
consecutiveFailures := 0
consecutiveSuccesses := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
healthy := r.checkEndpoint(endpoint, config.Timeout)
if healthy {
consecutiveFailures = 0
consecutiveSuccesses++
if consecutiveSuccesses >= config.HealthyThreshold {
r.setHealth(endpoint, true)
}
} else {
consecutiveSuccesses = 0
consecutiveFailures++
if consecutiveFailures >= config.UnhealthyThreshold {
r.setHealth(endpoint, false)
// 告警
alerter.Alert("Endpoint unhealthy", map[string]string{
"region": region,
"endpoint": endpoint,
})
}
}
}
}
}
func (r *GeoDNSResolver) checkEndpoint(endpoint string, timeout time.Duration) bool {
client := &http.Client{Timeout: timeout}
resp, err := client.Get("https://" + endpoint + "/health")
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == 200
}
3.2 应用层负载均衡
package loadbalancer
import (
"hash/crc32"
"math/rand"
"sync"
"sync/atomic"
)
// 负载均衡策略
type Strategy string
const (
RoundRobin Strategy = "round_robin"
LeastConnection Strategy = "least_connection"
IPHash Strategy = "ip_hash"
WeightedRandom Strategy = "weighted_random"
)
// 后端服务器
type Backend struct {
URL string
Weight int
Connections int32 // 当前连接数
Healthy bool
}
// 负载均衡器
type LoadBalancer struct {
backends []*Backend
strategy Strategy
index uint32 // 用于轮询
mu sync.RWMutex
}
func NewLoadBalancer(strategy Strategy) *LoadBalancer {
return &LoadBalancer{
backends: make([]*Backend, 0),
strategy: strategy,
}
}
func (lb *LoadBalancer) AddBackend(backend *Backend) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.backends = append(lb.backends, backend)
}
func (lb *LoadBalancer) SelectBackend(clientIP string) *Backend {
switch lb.strategy {
case RoundRobin:
return lb.roundRobin()
case LeastConnection:
return lb.leastConnection()
case IPHash:
return lb.ipHash(clientIP)
case WeightedRandom:
return lb.weightedRandom()
default:
return lb.roundRobin()
}
}
// 轮询
func (lb *LoadBalancer) roundRobin() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
if len(lb.backends) == 0 {
return nil
}
// 只选择健康的后端
healthyBackends := lb.getHealthyBackends()
if len(healthyBackends) == 0 {
return nil
}
index := atomic.AddUint32(&lb.index, 1)
return healthyBackends[index%uint32(len(healthyBackends))]
}
// 最少连接
func (lb *LoadBalancer) leastConnection() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
healthyBackends := lb.getHealthyBackends()
if len(healthyBackends) == 0 {
return nil
}
var selected *Backend
minConnections := int32(1<<31 - 1)
for _, backend := range healthyBackends {
connections := atomic.LoadInt32(&backend.Connections)
if connections < minConnections {
minConnections = connections
selected = backend
}
}
return selected
}
// IP哈希
func (lb *LoadBalancer) ipHash(clientIP string) *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
healthyBackends := lb.getHealthyBackends()
if len(healthyBackends) == 0 {
return nil
}
hash := crc32.ChecksumIEEE([]byte(clientIP))
index := hash % uint32(len(healthyBackends))
return healthyBackends[index]
}
// 加权随机
func (lb *LoadBalancer) weightedRandom() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
healthyBackends := lb.getHealthyBackends()
if len(healthyBackends) == 0 {
return nil
}
// 计算总权重
totalWeight := 0
for _, backend := range healthyBackends {
totalWeight += backend.Weight
}
// 随机选择
r := rand.Intn(totalWeight)
for _, backend := range healthyBackends {
r -= backend.Weight
if r < 0 {
return backend
}
}
return healthyBackends[0]
}
func (lb *LoadBalancer) getHealthyBackends() []*Backend {
healthy := make([]*Backend, 0)
for _, backend := range lb.backends {
if backend.Healthy {
healthy = append(healthy, backend)
}
}
return healthy
}
// 记录连接
func (lb *LoadBalancer) AcquireConnection(backend *Backend) {
atomic.AddInt32(&backend.Connections, 1)
}
func (lb *LoadBalancer) ReleaseConnection(backend *Backend) {
atomic.AddInt32(&backend.Connections, -1)
}
4. 数据库高可用
4.1 主从复制
// MySQL主从配置
type MySQLCluster struct {
master *sql.DB
slaves []*sql.DB
index uint32
}
func NewMySQLCluster(masterDSN string, slaveDSNs []string) (*MySQLCluster, error) {
// 连接主库
master, err := sql.Open("mysql", masterDSN)
if err != nil {
return nil, err
}
// 连接从库
slaves := make([]*sql.DB, len(slaveDSNs))
for i, dsn := range slaveDSNs {
slave, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
slaves[i] = slave
}
return &MySQLCluster{
master: master,
slaves: slaves,
}, nil
}
// 写操作走主库
func (c *MySQLCluster) Exec(query string, args ...interface{}) (sql.Result, error) {
return c.master.Exec(query, args...)
}
// 读操作走从库(轮询)
func (c *MySQLCluster) Query(query string, args ...interface{}) (*sql.Rows, error) {
slave := c.selectSlave()
return slave.Query(query, args...)
}
func (c *MySQLCluster) selectSlave() *sql.DB {
if len(c.slaves) == 0 {
return c.master // 如果没有从库,走主库
}
index := atomic.AddUint32(&c.index, 1)
return c.slaves[index%uint32(len(c.slaves))]
}
// 主从切换
func (c *MySQLCluster) Failover() error {
if len(c.slaves) == 0 {
return fmt.Errorf("no slave available")
}
// 1. 选择一个从库作为新主库
newMaster := c.slaves[0]
// 2. 将新主库设置为可写
_, err := newMaster.Exec("SET GLOBAL read_only = OFF")
if err != nil {
return err
}
// 3. 将旧主库降级为从库
oldMaster := c.master
_, _ = oldMaster.Exec("SET GLOBAL read_only = ON")
// 4. 更新配置
c.master = newMaster
c.slaves = append(c.slaves[1:], oldMaster)
log.Println("Database failover completed")
return nil
}
4.2 自动故障检测与切换
type DBHealthChecker struct {
cluster *MySQLCluster
ticker *time.Ticker
}
func NewDBHealthChecker(cluster *MySQLCluster, interval time.Duration) *DBHealthChecker {
return &DBHealthChecker{
cluster: cluster,
ticker: time.NewTicker(interval),
}
}
func (hc *DBHealthChecker) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-hc.ticker.C:
hc.checkMaster()
}
}
}
func (hc *DBHealthChecker) checkMaster() {
// 检查主库是否可用
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
err := hc.cluster.master.PingContext(ctx)
if err != nil {
log.Printf("Master database unhealthy: %v", err)
// 尝试故障切换
if err := hc.cluster.Failover(); err != nil {
log.Printf("Failover failed: %v", err)
// 发送紧急告警
alerter.Alert("Database failover failed", map[string]string{
"error": err.Error(),
})
} else {
// 发送告警通知
alerter.Alert("Database failover successful", nil)
}
}
}
5. 缓存高可用
5.1 Redis Cluster
package cache
import (
"context"
"time"
"github.com/go-redis/redis/v8"
)
type RedisCluster struct {
client *redis.ClusterClient
}
func NewRedisCluster(addrs []string) *RedisCluster {
client := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
PoolSize: 100,
MinIdleConns: 10,
MaxRetries: 3,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
return &RedisCluster{client: client}
}
func (rc *RedisCluster) Get(ctx context.Context, key string) (string, error) {
return rc.client.Get(ctx, key).Result()
}
func (rc *RedisCluster) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return rc.client.Set(ctx, key, value, expiration).Err()
}
// 多级缓存
type MultiLevelCache struct {
l1 *sync.Map // 本地缓存(进程内)
l2 *RedisCluster // Redis集群
l3 Database // 数据库
}
func (mlc *MultiLevelCache) Get(ctx context.Context, key string) (interface{}, error) {
// L1: 本地缓存
if value, ok := mlc.l1.Load(key); ok {
return value, nil
}
// L2: Redis
value, err := mlc.l2.Get(ctx, key)
if err == nil {
mlc.l1.Store(key, value)
return value, nil
}
// L3: 数据库
value, err = mlc.l3.Get(key)
if err != nil {
return nil, err
}
// 回填缓存
mlc.l2.Set(ctx, key, value, 5*time.Minute)
mlc.l1.Store(key, value)
return value, nil
}
5.2 缓存一致性
// 旁路缓存模式 (Cache-Aside)
func UpdateUser(userID string, user *User) error {
// 1. 先更新数据库
err := db.UpdateUser(userID, user)
if err != nil {
return err
}
// 2. 删除缓存(而非更新)
cache.Delete("user:" + userID)
return nil
}
func GetUser(userID string) (*User, error) {
key := "user:" + userID
// 1. 先查缓存
if value, err := cache.Get(key); err == nil {
var user User
json.Unmarshal([]byte(value), &user)
return &user, nil
}
// 2. 缓存未命中,查数据库
user, err := db.GetUser(userID)
if err != nil {
return nil, err
}
// 3. 写入缓存
data, _ := json.Marshal(user)
cache.Set(key, data, 5*time.Minute)
return user, nil
}
6. 消息队列高可用
6.1 Kafka集群
type KafkaCluster struct {
producer sarama.SyncProducer
consumer sarama.ConsumerGroup
}
func NewKafkaCluster(brokers []string) (*KafkaCluster, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &KafkaCluster{producer: producer}, nil
}
func (kc *KafkaCluster) Publish(topic string, message []byte) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
_, _, err := kc.producer.SendMessage(msg)
return err
}
// 消费者自动重连
type ResilientConsumer struct {
brokers []string
topic string
group string
handler MessageHandler
}
func (rc *ResilientConsumer) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
err := rc.consume(ctx)
if err != nil {
log.Printf("Consumer error: %v, reconnecting...", err)
time.Sleep(5 * time.Second)
}
}
}
}
func (rc *ResilientConsumer) consume(ctx context.Context) error {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumerGroup(rc.brokers, rc.group, config)
if err != nil {
return err
}
defer consumer.Close()
return consumer.Consume(ctx, []string{rc.topic}, rc.handler)
}
7. 服务熔断与降级
7.1 熔断器
package circuitbreaker
import (
"errors"
"sync"
"time"
)
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
maxFailures int
timeout time.Duration
state State
failures int
lastFailTime time.Time
mu sync.RWMutex
}
func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
timeout: timeout,
state: StateClosed,
}
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
// 检查是否需要从Open转到HalfOpen
if cb.state == StateOpen && time.Since(cb.lastFailTime) > cb.timeout {
cb.state = StateHalfOpen
cb.failures = 0
}
// 如果是Open状态,直接拒绝
if cb.state == StateOpen {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
cb.mu.Unlock()
// 执行函数
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
// 失败次数达到阈值,打开熔断器
if cb.failures >= cb.maxFailures {
cb.state = StateOpen
log.Println("Circuit breaker opened")
}
return err
}
// 成功,重置计数器并关闭熔断器
if cb.state == StateHalfOpen {
cb.state = StateClosed
log.Println("Circuit breaker closed")
}
cb.failures = 0
return nil
}
7.2 服务降级
// 降级策略
type DegradationManager struct {
enabled map[string]bool
mu sync.RWMutex
}
func (dm *DegradationManager) GetOrderBook(symbol string) (*OrderBook, error) {
// 检查是否降级
if dm.isDegraded("orderbook") {
// 返回缓存数据
return dm.getCachedOrderBook(symbol)
}
// 正常流程
return dm.getRealTimeOrderBook(symbol)
}
func (dm *DegradationManager) isDegraded(feature string) bool {
dm.mu.RLock()
defer dm.mu.RUnlock()
return dm.enabled[feature]
}
// 自动降级
func (dm *DegradationManager) AutoDegrade(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 检查系统负载
cpuUsage := getSystemCPUUsage()
memUsage := getSystemMemUsage()
if cpuUsage > 90 || memUsage > 90 {
// 启用降级
dm.enableDegradation("orderbook")
dm.enableDegradation("kline")
log.Println("System overloaded, degradation enabled")
} else if cpuUsage < 70 && memUsage < 70 {
// 解除降级
dm.disableDegradation("orderbook")
dm.disableDegradation("kline")
}
}
}
}
8. 监控告警
8.1 健康检查
type HealthChecker struct {
checks map[string]HealthCheck
}
type HealthCheck func() error
func (hc *HealthChecker) AddCheck(name string, check HealthCheck) {
hc.checks[name] = check
}
func (hc *HealthChecker) CheckAll() map[string]error {
results := make(map[string]error)
for name, check := range hc.checks {
results[name] = check()
}
return results
}
// HTTP健康检查端点
func (s *APIServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) {
results := healthChecker.CheckAll()
allHealthy := true
for _, err := range results {
if err != nil {
allHealthy = false
break
}
}
status := http.StatusOK
if !allHealthy {
status = http.StatusServiceUnavailable
}
w.WriteHeader(status)
json.NewEncoder(w).Encode(results)
}
8.2 Prometheus监控
var (
requestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request latency",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
)
)
func init() {
prometheus.MustRegister(requestCounter)
prometheus.MustRegister(requestDuration)
}
// 监控中间件
func MetricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 记录响应状态
rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(rw, r)
duration := time.Since(start).Seconds()
requestCounter.WithLabelValues(r.Method, r.URL.Path, fmt.Sprintf("%d", rw.statusCode)).Inc()
requestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
})
}
小结
高可用架构设计的核心要点:
- 无单点故障:多地域部署、主从复制、集群部署
- 负载均衡:DNS、应用层、数据库读写分离
- 故障检测:健康检查、自动故障切换
- 服务保护:熔断器、限流、降级
- 监控告警:Prometheus、健康检查端点
下一章将讨论交易所的压力测试和性能优化实战。