HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

高可用架构设计

交易所宕机是灾难性的。当BTC暴跌时,如果用户无法登录、无法平仓,客服电话被打爆,即使只是45分钟的故障也可能导致平台赔偿超过50万美元。对于承诺99.9% SLA的交易所来说,高可用性是核心竞争力。

本章讲解如何设计高可用架构,将可用性从99.9%提升到99.99%(年宕机时间从8.76小时降到52.6分钟),分享交易所高可用架构的设计思路和实战经验。

1. 高可用架构原则

1.1 核心指标

指标含义计算方式目标
Availability可用性(总时间-宕机时间)/总时间99.99%
MTBF平均故障间隔时间总运行时间/故障次数>720h
MTTR平均恢复时间总故障时间/故障次数<5min
RPO恢复点目标可接受的数据丢失量<1min
RTO恢复时间目标可接受的服务中断时间<2min

可用性等级:

  • 99.9% (3个9) = 年宕机8.76小时
  • 99.99% (4个9) = 年宕机52.6分钟
  • 99.999% (5个9) = 年宕机5.26分钟

1.2 架构设计原则

  1. 无单点故障 (No Single Point of Failure)
  2. 故障隔离 (Failure Isolation)
  3. 快速故障检测 (Fast Failure Detection)
  4. 自动故障转移 (Automatic Failover)
  5. 优雅降级 (Graceful Degradation)

2. 整体架构

2.1 多地域部署

全球架构:
├── 美国东部 (us-east)
│   ├── 主数据中心
│   ├── 完整服务
│   └── 读写流量
│
├── 美国西部 (us-west)
│   ├── 备份数据中心
│   ├── 完整服务
│   └── 读流量 + 故障切换
│
├── 新加坡 (ap-southeast)
│   ├── 亚洲数据中心
│   ├── 完整服务
│   └── 亚洲用户流量
│
└── 欧洲 (eu-west)
    ├── 欧洲数据中心
    ├── 完整服务
    └── 欧洲用户流量

2.2 单数据中心架构

                    用户
                     ↓
            Global Load Balancer (DNS)
                     ↓
        ┌────────────┴────────────┐
        ↓                         ↓
    Region A                  Region B
        ↓                         ↓
    CDN + WAF                CDN + WAF
        ↓                         ↓
  Load Balancer            Load Balancer
        ↓                         ↓
    ┌───┴───┐                 ┌───┴───┐
    ↓       ↓                 ↓       ↓
  Web×3   API×5             Web×3   API×5
    ↓       ↓                 ↓       ↓
    ├───────┴─────────────────┴───────┤
    ↓                                 ↓
WebSocket×10                    WebSocket×10
    ↓                                 ↓
    ├─────────────┬───────────────────┤
    ↓             ↓                   ↓
 Matching×3   Account×5         Market×5
    ↓             ↓                   ↓
    └─────────────┴───────────────────┘
                  ↓
        ┌─────────┴─────────┐
        ↓                   ↓
    Master DB           Slave DB×3
        ↓                   ↓
    Redis Cluster     Cache Cluster
        ↓                   ↓
    Kafka Cluster     Message Queue

3. 负载均衡

3.1 DNS负载均衡

// GeoDNS配置
type GeoDNSConfig struct {
	Regions map[string]*RegionConfig
}

type RegionConfig struct {
	Name      string
	Endpoints []string
	Priority  int
	Weight    int
	HealthCheck *HealthCheckConfig
}

type HealthCheckConfig struct {
	Interval    time.Duration
	Timeout     time.Duration
	UnhealthyThreshold int
	HealthyThreshold   int
}

// DNS解析器
type GeoDNSResolver struct {
	config *GeoDNSConfig
	health map[string]bool
	mu     sync.RWMutex
}

func (r *GeoDNSResolver) Resolve(clientIP string) string {
	// 1. 根据客户端IP确定最近的区域
	region := r.findNearestRegion(clientIP)

	// 2. 获取该区域的健康端点
	endpoints := r.getHealthyEndpoints(region)

	// 3. 按权重选择端点
	if len(endpoints) == 0 {
		// 如果本区域无健康端点,返回其他区域
		endpoints = r.getAnyHealthyEndpoint()
	}

	// 4. 加权随机选择
	return r.weightedRandomSelect(endpoints)
}

func (r *GeoDNSResolver) findNearestRegion(clientIP string) string {
	// 使用GeoIP数据库查找最近的区域
	// 实现略
	return "us-east"
}

func (r *GeoDNSResolver) getHealthyEndpoints(region string) []string {
	r.mu.RLock()
	defer r.mu.RUnlock()

	config := r.config.Regions[region]
	healthy := make([]string, 0)

	for _, endpoint := range config.Endpoints {
		if r.health[endpoint] {
			healthy = append(healthy, endpoint)
		}
	}

	return healthy
}

// 健康检查
func (r *GeoDNSResolver) StartHealthCheck(ctx context.Context) {
	for region, config := range r.config.Regions {
		for _, endpoint := range config.Endpoints {
			go r.healthCheckLoop(ctx, region, endpoint, config.HealthCheck)
		}
	}
}

func (r *GeoDNSResolver) healthCheckLoop(ctx context.Context, region, endpoint string, config *HealthCheckConfig) {
	ticker := time.NewTicker(config.Interval)
	defer ticker.Stop()

	consecutiveFailures := 0
	consecutiveSuccesses := 0

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			healthy := r.checkEndpoint(endpoint, config.Timeout)

			if healthy {
				consecutiveFailures = 0
				consecutiveSuccesses++

				if consecutiveSuccesses >= config.HealthyThreshold {
					r.setHealth(endpoint, true)
				}
			} else {
				consecutiveSuccesses = 0
				consecutiveFailures++

				if consecutiveFailures >= config.UnhealthyThreshold {
					r.setHealth(endpoint, false)
					// 告警
					alerter.Alert("Endpoint unhealthy", map[string]string{
						"region":   region,
						"endpoint": endpoint,
					})
				}
			}
		}
	}
}

func (r *GeoDNSResolver) checkEndpoint(endpoint string, timeout time.Duration) bool {
	client := &http.Client{Timeout: timeout}
	resp, err := client.Get("https://" + endpoint + "/health")
	if err != nil {
		return false
	}
	defer resp.Body.Close()

	return resp.StatusCode == 200
}

3.2 应用层负载均衡

package loadbalancer

import (
	"hash/crc32"
	"math/rand"
	"sync"
	"sync/atomic"
)

// 负载均衡策略
type Strategy string

const (
	RoundRobin       Strategy = "round_robin"
	LeastConnection  Strategy = "least_connection"
	IPHash           Strategy = "ip_hash"
	WeightedRandom   Strategy = "weighted_random"
)

// 后端服务器
type Backend struct {
	URL         string
	Weight      int
	Connections int32 // 当前连接数
	Healthy     bool
}

// 负载均衡器
type LoadBalancer struct {
	backends []*Backend
	strategy Strategy
	index    uint32 // 用于轮询
	mu       sync.RWMutex
}

func NewLoadBalancer(strategy Strategy) *LoadBalancer {
	return &LoadBalancer{
		backends: make([]*Backend, 0),
		strategy: strategy,
	}
}

func (lb *LoadBalancer) AddBackend(backend *Backend) {
	lb.mu.Lock()
	defer lb.mu.Unlock()
	lb.backends = append(lb.backends, backend)
}

func (lb *LoadBalancer) SelectBackend(clientIP string) *Backend {
	switch lb.strategy {
	case RoundRobin:
		return lb.roundRobin()
	case LeastConnection:
		return lb.leastConnection()
	case IPHash:
		return lb.ipHash(clientIP)
	case WeightedRandom:
		return lb.weightedRandom()
	default:
		return lb.roundRobin()
	}
}

// 轮询
func (lb *LoadBalancer) roundRobin() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	if len(lb.backends) == 0 {
		return nil
	}

	// 只选择健康的后端
	healthyBackends := lb.getHealthyBackends()
	if len(healthyBackends) == 0 {
		return nil
	}

	index := atomic.AddUint32(&lb.index, 1)
	return healthyBackends[index%uint32(len(healthyBackends))]
}

// 最少连接
func (lb *LoadBalancer) leastConnection() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	healthyBackends := lb.getHealthyBackends()
	if len(healthyBackends) == 0 {
		return nil
	}

	var selected *Backend
	minConnections := int32(1<<31 - 1)

	for _, backend := range healthyBackends {
		connections := atomic.LoadInt32(&backend.Connections)
		if connections < minConnections {
			minConnections = connections
			selected = backend
		}
	}

	return selected
}

// IP哈希
func (lb *LoadBalancer) ipHash(clientIP string) *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	healthyBackends := lb.getHealthyBackends()
	if len(healthyBackends) == 0 {
		return nil
	}

	hash := crc32.ChecksumIEEE([]byte(clientIP))
	index := hash % uint32(len(healthyBackends))

	return healthyBackends[index]
}

// 加权随机
func (lb *LoadBalancer) weightedRandom() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	healthyBackends := lb.getHealthyBackends()
	if len(healthyBackends) == 0 {
		return nil
	}

	// 计算总权重
	totalWeight := 0
	for _, backend := range healthyBackends {
		totalWeight += backend.Weight
	}

	// 随机选择
	r := rand.Intn(totalWeight)
	for _, backend := range healthyBackends {
		r -= backend.Weight
		if r < 0 {
			return backend
		}
	}

	return healthyBackends[0]
}

func (lb *LoadBalancer) getHealthyBackends() []*Backend {
	healthy := make([]*Backend, 0)
	for _, backend := range lb.backends {
		if backend.Healthy {
			healthy = append(healthy, backend)
		}
	}
	return healthy
}

// 记录连接
func (lb *LoadBalancer) AcquireConnection(backend *Backend) {
	atomic.AddInt32(&backend.Connections, 1)
}

func (lb *LoadBalancer) ReleaseConnection(backend *Backend) {
	atomic.AddInt32(&backend.Connections, -1)
}

4. 数据库高可用

4.1 主从复制

// MySQL主从配置
type MySQLCluster struct {
	master *sql.DB
	slaves []*sql.DB
	index  uint32
}

func NewMySQLCluster(masterDSN string, slaveDSNs []string) (*MySQLCluster, error) {
	// 连接主库
	master, err := sql.Open("mysql", masterDSN)
	if err != nil {
		return nil, err
	}

	// 连接从库
	slaves := make([]*sql.DB, len(slaveDSNs))
	for i, dsn := range slaveDSNs {
		slave, err := sql.Open("mysql", dsn)
		if err != nil {
			return nil, err
		}
		slaves[i] = slave
	}

	return &MySQLCluster{
		master: master,
		slaves: slaves,
	}, nil
}

// 写操作走主库
func (c *MySQLCluster) Exec(query string, args ...interface{}) (sql.Result, error) {
	return c.master.Exec(query, args...)
}

// 读操作走从库(轮询)
func (c *MySQLCluster) Query(query string, args ...interface{}) (*sql.Rows, error) {
	slave := c.selectSlave()
	return slave.Query(query, args...)
}

func (c *MySQLCluster) selectSlave() *sql.DB {
	if len(c.slaves) == 0 {
		return c.master // 如果没有从库,走主库
	}

	index := atomic.AddUint32(&c.index, 1)
	return c.slaves[index%uint32(len(c.slaves))]
}

// 主从切换
func (c *MySQLCluster) Failover() error {
	if len(c.slaves) == 0 {
		return fmt.Errorf("no slave available")
	}

	// 1. 选择一个从库作为新主库
	newMaster := c.slaves[0]

	// 2. 将新主库设置为可写
	_, err := newMaster.Exec("SET GLOBAL read_only = OFF")
	if err != nil {
		return err
	}

	// 3. 将旧主库降级为从库
	oldMaster := c.master
	_, _ = oldMaster.Exec("SET GLOBAL read_only = ON")

	// 4. 更新配置
	c.master = newMaster
	c.slaves = append(c.slaves[1:], oldMaster)

	log.Println("Database failover completed")

	return nil
}

4.2 自动故障检测与切换

type DBHealthChecker struct {
	cluster *MySQLCluster
	ticker  *time.Ticker
}

func NewDBHealthChecker(cluster *MySQLCluster, interval time.Duration) *DBHealthChecker {
	return &DBHealthChecker{
		cluster: cluster,
		ticker:  time.NewTicker(interval),
	}
}

func (hc *DBHealthChecker) Start(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-hc.ticker.C:
			hc.checkMaster()
		}
	}
}

func (hc *DBHealthChecker) checkMaster() {
	// 检查主库是否可用
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	err := hc.cluster.master.PingContext(ctx)
	if err != nil {
		log.Printf("Master database unhealthy: %v", err)

		// 尝试故障切换
		if err := hc.cluster.Failover(); err != nil {
			log.Printf("Failover failed: %v", err)
			// 发送紧急告警
			alerter.Alert("Database failover failed", map[string]string{
				"error": err.Error(),
			})
		} else {
			// 发送告警通知
			alerter.Alert("Database failover successful", nil)
		}
	}
}

5. 缓存高可用

5.1 Redis Cluster

package cache

import (
	"context"
	"time"

	"github.com/go-redis/redis/v8"
)

type RedisCluster struct {
	client *redis.ClusterClient
}

func NewRedisCluster(addrs []string) *RedisCluster {
	client := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:        addrs,
		PoolSize:     100,
		MinIdleConns: 10,
		MaxRetries:   3,
		DialTimeout:  5 * time.Second,
		ReadTimeout:  3 * time.Second,
		WriteTimeout: 3 * time.Second,
	})

	return &RedisCluster{client: client}
}

func (rc *RedisCluster) Get(ctx context.Context, key string) (string, error) {
	return rc.client.Get(ctx, key).Result()
}

func (rc *RedisCluster) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
	return rc.client.Set(ctx, key, value, expiration).Err()
}

// 多级缓存
type MultiLevelCache struct {
	l1 *sync.Map         // 本地缓存(进程内)
	l2 *RedisCluster     // Redis集群
	l3 Database          // 数据库
}

func (mlc *MultiLevelCache) Get(ctx context.Context, key string) (interface{}, error) {
	// L1: 本地缓存
	if value, ok := mlc.l1.Load(key); ok {
		return value, nil
	}

	// L2: Redis
	value, err := mlc.l2.Get(ctx, key)
	if err == nil {
		mlc.l1.Store(key, value)
		return value, nil
	}

	// L3: 数据库
	value, err = mlc.l3.Get(key)
	if err != nil {
		return nil, err
	}

	// 回填缓存
	mlc.l2.Set(ctx, key, value, 5*time.Minute)
	mlc.l1.Store(key, value)

	return value, nil
}

5.2 缓存一致性

// 旁路缓存模式 (Cache-Aside)
func UpdateUser(userID string, user *User) error {
	// 1. 先更新数据库
	err := db.UpdateUser(userID, user)
	if err != nil {
		return err
	}

	// 2. 删除缓存(而非更新)
	cache.Delete("user:" + userID)

	return nil
}

func GetUser(userID string) (*User, error) {
	key := "user:" + userID

	// 1. 先查缓存
	if value, err := cache.Get(key); err == nil {
		var user User
		json.Unmarshal([]byte(value), &user)
		return &user, nil
	}

	// 2. 缓存未命中,查数据库
	user, err := db.GetUser(userID)
	if err != nil {
		return nil, err
	}

	// 3. 写入缓存
	data, _ := json.Marshal(user)
	cache.Set(key, data, 5*time.Minute)

	return user, nil
}

6. 消息队列高可用

6.1 Kafka集群

type KafkaCluster struct {
	producer sarama.SyncProducer
	consumer sarama.ConsumerGroup
}

func NewKafkaCluster(brokers []string) (*KafkaCluster, error) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer(brokers, config)
	if err != nil {
		return nil, err
	}

	return &KafkaCluster{producer: producer}, nil
}

func (kc *KafkaCluster) Publish(topic string, message []byte) error {
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.ByteEncoder(message),
	}

	_, _, err := kc.producer.SendMessage(msg)
	return err
}

// 消费者自动重连
type ResilientConsumer struct {
	brokers []string
	topic   string
	group   string
	handler MessageHandler
}

func (rc *ResilientConsumer) Start(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			err := rc.consume(ctx)
			if err != nil {
				log.Printf("Consumer error: %v, reconnecting...", err)
				time.Sleep(5 * time.Second)
			}
		}
	}
}

func (rc *ResilientConsumer) consume(ctx context.Context) error {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	consumer, err := sarama.NewConsumerGroup(rc.brokers, rc.group, config)
	if err != nil {
		return err
	}
	defer consumer.Close()

	return consumer.Consume(ctx, []string{rc.topic}, rc.handler)
}

7. 服务熔断与降级

7.1 熔断器

package circuitbreaker

import (
	"errors"
	"sync"
	"time"
)

type State int

const (
	StateClosed State = iota
	StateOpen
	StateHalfOpen
)

type CircuitBreaker struct {
	maxFailures  int
	timeout      time.Duration
	state        State
	failures     int
	lastFailTime time.Time
	mu           sync.RWMutex
}

func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
	return &CircuitBreaker{
		maxFailures: maxFailures,
		timeout:     timeout,
		state:       StateClosed,
	}
}

func (cb *CircuitBreaker) Call(fn func() error) error {
	cb.mu.Lock()

	// 检查是否需要从Open转到HalfOpen
	if cb.state == StateOpen && time.Since(cb.lastFailTime) > cb.timeout {
		cb.state = StateHalfOpen
		cb.failures = 0
	}

	// 如果是Open状态,直接拒绝
	if cb.state == StateOpen {
		cb.mu.Unlock()
		return errors.New("circuit breaker is open")
	}

	cb.mu.Unlock()

	// 执行函数
	err := fn()

	cb.mu.Lock()
	defer cb.mu.Unlock()

	if err != nil {
		cb.failures++
		cb.lastFailTime = time.Now()

		// 失败次数达到阈值,打开熔断器
		if cb.failures >= cb.maxFailures {
			cb.state = StateOpen
			log.Println("Circuit breaker opened")
		}

		return err
	}

	// 成功,重置计数器并关闭熔断器
	if cb.state == StateHalfOpen {
		cb.state = StateClosed
		log.Println("Circuit breaker closed")
	}
	cb.failures = 0

	return nil
}

7.2 服务降级

// 降级策略
type DegradationManager struct {
	enabled map[string]bool
	mu      sync.RWMutex
}

func (dm *DegradationManager) GetOrderBook(symbol string) (*OrderBook, error) {
	// 检查是否降级
	if dm.isDegraded("orderbook") {
		// 返回缓存数据
		return dm.getCachedOrderBook(symbol)
	}

	// 正常流程
	return dm.getRealTimeOrderBook(symbol)
}

func (dm *DegradationManager) isDegraded(feature string) bool {
	dm.mu.RLock()
	defer dm.mu.RUnlock()
	return dm.enabled[feature]
}

// 自动降级
func (dm *DegradationManager) AutoDegrade(ctx context.Context) {
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			// 检查系统负载
			cpuUsage := getSystemCPUUsage()
			memUsage := getSystemMemUsage()

			if cpuUsage > 90 || memUsage > 90 {
				// 启用降级
				dm.enableDegradation("orderbook")
				dm.enableDegradation("kline")
				log.Println("System overloaded, degradation enabled")
			} else if cpuUsage < 70 && memUsage < 70 {
				// 解除降级
				dm.disableDegradation("orderbook")
				dm.disableDegradation("kline")
			}
		}
	}
}

8. 监控告警

8.1 健康检查

type HealthChecker struct {
	checks map[string]HealthCheck
}

type HealthCheck func() error

func (hc *HealthChecker) AddCheck(name string, check HealthCheck) {
	hc.checks[name] = check
}

func (hc *HealthChecker) CheckAll() map[string]error {
	results := make(map[string]error)

	for name, check := range hc.checks {
		results[name] = check()
	}

	return results
}

// HTTP健康检查端点
func (s *APIServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) {
	results := healthChecker.CheckAll()

	allHealthy := true
	for _, err := range results {
		if err != nil {
			allHealthy = false
			break
		}
	}

	status := http.StatusOK
	if !allHealthy {
		status = http.StatusServiceUnavailable
	}

	w.WriteHeader(status)
	json.NewEncoder(w).Encode(results)
}

8.2 Prometheus监控

var (
	requestCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "http_requests_total",
			Help: "Total number of HTTP requests",
		},
		[]string{"method", "endpoint", "status"},
	)

	requestDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "http_request_duration_seconds",
			Help:    "HTTP request latency",
			Buckets: prometheus.DefBuckets,
		},
		[]string{"method", "endpoint"},
	)
)

func init() {
	prometheus.MustRegister(requestCounter)
	prometheus.MustRegister(requestDuration)
}

// 监控中间件
func MetricsMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		start := time.Now()

		// 记录响应状态
		rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}

		next.ServeHTTP(rw, r)

		duration := time.Since(start).Seconds()

		requestCounter.WithLabelValues(r.Method, r.URL.Path, fmt.Sprintf("%d", rw.statusCode)).Inc()
		requestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
	})
}

小结

高可用架构设计的核心要点:

  1. 无单点故障:多地域部署、主从复制、集群部署
  2. 负载均衡:DNS、应用层、数据库读写分离
  3. 故障检测:健康检查、自动故障切换
  4. 服务保护:熔断器、限流、降级
  5. 监控告警:Prometheus、健康检查端点

下一章将讨论交易所的压力测试和性能优化实战。

Prev
安全防护与攻防
Next
压力测试与性能优化