HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

撮合引擎高可用

撮合引擎是交易所的核心组件,其可用性直接影响交易服务。本章讨论如何设计高可用的撮合引擎架构。

1. 高可用架构模式

1.1 主从模式 (Master-Slave)

        ┌─────────────┐
        │   Master    │ ← 接收所有订单
        │  撮合引擎    │
        └──────┬──────┘
               │ 同步日志
        ┌──────┴───────┬──────────┐
        ↓              ↓          ↓
    ┌───────┐      ┌───────┐  ┌───────┐
    │Slave 1│      │Slave 2│  │Slave 3│
    └───────┘      └───────┘  └───────┘

特点:

  • Master负责处理所有订单
  • Slave实时同步Master的操作日志
  • Master故障时,Slave升级为新Master

1.2 实现方案

package ha

import (
    "sync"
    "time"
)

type Role string

const (
    RoleMaster Role = "master"
    RoleSlave  Role = "slave"
)

type HAMatchEngine struct {
    engine     *MatchEngine
    role       Role
    mu         sync.RWMutex

    // 日志复制
    logWriter  *LogWriter
    logReader  *LogReader

    // 心跳检测
    heartbeat  *Heartbeat
    peers      []string

    // 状态
    isHealthy  bool
}

func NewHAMatchEngine(symbol string, peers []string) *HAMatchEngine {
    return &HAMatchEngine{
        engine:    NewMatchEngine(symbol),
        role:      RoleSlave, // 默认Slave
        logWriter: NewLogWriter(),
        logReader: NewLogReader(),
        heartbeat: NewHeartbeat(3 * time.Second),
        peers:     peers,
        isHealthy: true,
    }
}

func (ha *HAMatchEngine) Start() {
    // 启动引擎
    ha.engine.Start()

    // 启动角色选举
    go ha.electLeader()

    // 启动心跳
    go ha.sendHeartbeat()
    go ha.checkHeartbeat()

    // 启动日志同步
    go ha.syncLog()
}

// 处理订单(只有Master)
func (ha *HAMatchEngine) SubmitOrder(order *Order) error {
    ha.mu.RLock()
    role := ha.role
    ha.mu.RUnlock()

    if role != RoleMaster {
        return fmt.Errorf("not master, current role: %s", role)
    }

    // 写入操作日志
    logEntry := &LogEntry{
        SequenceID: ha.logWriter.GetNextSeq(),
        EventType:  "order_submit",
        Data:       order,
        Timestamp:  time.Now(),
    }
    ha.logWriter.Write(logEntry)

    // 提交到撮合引擎
    ha.engine.SubmitOrder(order)

    return nil
}

// Leader选举(简化版,生产环境应使用Raft/Paxos)
func (ha *HAMatchEngine) electLeader() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        // 检查当前Master是否存活
        if ha.role == RoleSlave && !ha.isMasterAlive() {
            // Master挂了,尝试成为新Master
            if ha.tryBecomemaster() {
                ha.promoteToMaster()
            }
        }
    }
}

func (ha *HAMatchEngine) tryBecomeMaster() bool {
    // 这里应该实现分布式锁(如基于etcd/zookeeper)
    // 简化实现:使用本地标志
    ha.mu.Lock()
    defer ha.mu.Unlock()

    if ha.role == RoleSlave {
        ha.role = RoleMaster
        log.Println("Promoted to Master")
        return true
    }

    return false
}

func (ha *HAMatchEngine) promoteToMaster() {
    // 1. 停止日志读取
    ha.logReader.Stop()

    // 2. 开始接收订单
    log.Println("Now accepting orders as Master")
}

// 心跳检测
func (ha *HAMatchEngine) sendHeartbeat() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        ha.mu.RLock()
        role := ha.role
        ha.mu.RUnlock()

        if role == RoleMaster {
            // 发送心跳给所有Slave
            for _, peer := range ha.peers {
                sendHeartbeatToPeer(peer)
            }
        }
    }
}

func (ha *HAMatchEngine) checkHeartbeat() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        if !ha.heartbeat.IsAlive() {
            log.Println("Master heartbeat timeout")
            ha.isHealthy = false
        }
    }
}

// 日志同步
func (ha *HAMatchEngine) syncLog() {
    for {
        ha.mu.RLock()
        role := ha.role
        ha.mu.RUnlock()

        if role == RoleSlave {
            // 从Master读取日志
            logEntry := ha.logReader.Read()
            if logEntry != nil {
                ha.applyLogEntry(logEntry)
            }
        }

        time.Sleep(10 * time.Millisecond)
    }
}

func (ha *HAMatchEngine) applyLogEntry(entry *LogEntry) {
    switch entry.EventType {
    case "order_submit":
        order := entry.Data.(*Order)
        ha.engine.SubmitOrder(order)
    case "order_cancel":
        // 处理撤单
    }
}

2. 操作日志 (Oplog)

2.1 日志结构

type LogEntry struct {
    SequenceID int64       // 全局递增序列号
    EventType  string      // 事件类型
    Data       interface{} // 事件数据
    Timestamp  time.Time
    Checksum   uint32      // 校验和
}

type LogWriter struct {
    file       *os.File
    sequenceID int64
    mu         sync.Mutex
}

func NewLogWriter() *LogWriter {
    file, _ := os.OpenFile("match_engine.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)

    return &LogWriter{
        file:       file,
        sequenceID: 0,
    }
}

func (lw *LogWriter) Write(entry *LogEntry) error {
    lw.mu.Lock()
    defer lw.mu.Unlock()

    // 分配序列号
    lw.sequenceID++
    entry.SequenceID = lw.sequenceID

    // 计算校验和
    entry.Checksum = calculateChecksum(entry)

    // 序列化
    data, err := json.Marshal(entry)
    if err != nil {
        return err
    }

    // 写入文件
    _, err = lw.file.Write(append(data, '\n'))
    if err != nil {
        return err
    }

    // 刷盘(确保持久化)
    lw.file.Sync()

    return nil
}

func (lw *LogWriter) GetNextSeq() int64 {
    lw.mu.Lock()
    defer lw.mu.Unlock()
    return lw.sequenceID + 1
}

2.2 日志读取与回放

type LogReader struct {
    file         *os.File
    lastSequence int64
    mu           sync.Mutex
}

func NewLogReader() *LogReader {
    file, _ := os.Open("match_engine.log")

    return &LogReader{
        file:         file,
        lastSequence: 0,
    }
}

func (lr *LogReader) Read() *LogEntry {
    lr.mu.Lock()
    defer lr.mu.Unlock()

    scanner := bufio.NewScanner(lr.file)
    for scanner.Scan() {
        line := scanner.Text()

        var entry LogEntry
        if err := json.Unmarshal([]byte(line), &entry); err != nil {
            continue
        }

        // 只返回新日志
        if entry.SequenceID > lr.lastSequence {
            // 验证校验和
            if !verifyChecksum(&entry) {
                log.Printf("Checksum mismatch for entry %d", entry.SequenceID)
                continue
            }

            lr.lastSequence = entry.SequenceID
            return &entry
        }
    }

    return nil
}

// 从指定序列号开始回放
func (lr *LogReader) Replay(fromSeq int64, engine *MatchEngine) {
    lr.file.Seek(0, 0) // 从头开始
    scanner := bufio.NewScanner(lr.file)

    for scanner.Scan() {
        line := scanner.Text()

        var entry LogEntry
        json.Unmarshal([]byte(line), &entry)

        if entry.SequenceID >= fromSeq {
            // 重放日志
            switch entry.EventType {
            case "order_submit":
                order := entry.Data.(*Order)
                engine.SubmitOrder(order)
            case "order_cancel":
                // 处理撤单
            }
        }
    }
}

3. 快照 (Snapshot)

为了加快恢复速度,定期保存撮合引擎的完整状态。

3.1 快照结构

type Snapshot struct {
    SequenceID  int64            // 快照对应的日志序列号
    OrderBook   *OrderBookState  // 订单簿状态
    Timestamp   time.Time
}

type OrderBookState struct {
    Symbol string
    Bids   []PriceLevelState
    Asks   []PriceLevelState
}

type PriceLevelState struct {
    Price  float64
    Orders []*Order
}

3.2 创建快照

func (ha *HAMatchEngine) takeSnapshot() *Snapshot {
    // 获取当前日志序列号
    sequenceID := ha.logWriter.GetNextSeq() - 1

    // 获取订单簿状态
    orderBookState := ha.engine.OrderBook.GetState()

    snapshot := &Snapshot{
        SequenceID: sequenceID,
        OrderBook:  orderBookState,
        Timestamp:  time.Now(),
    }

    // 持久化快照
    ha.saveSnapshot(snapshot)

    return snapshot
}

func (ha *HAMatchEngine) saveSnapshot(snapshot *Snapshot) {
    filename := fmt.Sprintf("snapshot_%d.dat", snapshot.SequenceID)

    data, _ := json.Marshal(snapshot)
    os.WriteFile(filename, data, 0644)

    log.Printf("Snapshot saved: %s", filename)
}

// 定期创建快照
func (ha *HAMatchEngine) startSnapshotTask() {
    ticker := time.Ticker(10 * time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        ha.takeSnapshot()
    }
}

3.3 从快照恢复

func (ha *HAMatchEngine) recoverFromSnapshot() error {
    // 1. 找到最新快照
    snapshot, err := ha.loadLatestSnapshot()
    if err != nil {
        return err
    }

    // 2. 恢复订单簿状态
    ha.engine.OrderBook.RestoreState(snapshot.OrderBook)

    // 3. 回放快照之后的日志
    ha.logReader.Replay(snapshot.SequenceID+1, ha.engine)

    log.Printf("Recovered from snapshot %d", snapshot.SequenceID)

    return nil
}

func (ha *HAMatchEngine) loadLatestSnapshot() (*Snapshot, error) {
    files, _ := filepath.Glob("snapshot_*.dat")

    if len(files) == 0 {
        return nil, fmt.Errorf("no snapshot found")
    }

    // 排序,找到最新的
    sort.Strings(files)
    latestFile := files[len(files)-1]

    data, _ := os.ReadFile(latestFile)

    var snapshot Snapshot
    json.Unmarshal(data, &snapshot)

    return &snapshot, nil
}

4. 故障切换流程

4.1 自动切换

Master故障检测
    ↓
Slave检测到Master心跳超时
    ↓
Slave竞选新Master(使用分布式锁)
    ↓
选举成功的Slave提升为Master
    ↓
加载最新快照
    ↓
回放日志(从快照序列号开始)
    ↓
开始接收新订单

4.2 实现代码

func (ha *HAMatchEngine) handleMasterFailure() {
    log.Println("Master failure detected, starting failover...")

    // 1. 尝试获取Master角色
    if !ha.acquireMasterLock() {
        log.Println("Failed to acquire master lock, another instance became master")
        return
    }

    // 2. 从快照恢复
    if err := ha.recoverFromSnapshot(); err != nil {
        log.Printf("Failed to recover from snapshot: %v", err)
        return
    }

    // 3. 切换角色
    ha.mu.Lock()
    ha.role = RoleMaster
    ha.mu.Unlock()

    // 4. 开始接收订单
    log.Println("Failover completed, now serving as Master")
}

// 使用etcd实现分布式锁
func (ha *HAMatchEngine) acquireMasterLock() bool {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 使用etcd的租约和事务实现
    lease, _ := ha.etcdClient.Grant(ctx, 10)
    txn := ha.etcdClient.Txn(ctx)

    txn.If(clientv3.Compare(clientv3.CreateRevision("master_lock"), "=", 0)).
        Then(clientv3.OpPut("master_lock", ha.instanceID, clientv3.WithLease(lease.ID))).
        Else(clientv3.OpGet("master_lock"))

    resp, _ := txn.Commit()

    return resp.Succeeded
}

5. 数据一致性保证

5.1 WAL (Write-Ahead Logging)

确保所有操作先写日志,再执行。

func (ha *HAMatchEngine) SubmitOrderWithWAL(order *Order) error {
    // 1. 先写WAL
    logEntry := &LogEntry{
        EventType: "order_submit",
        Data:      order,
        Timestamp: time.Now(),
    }

    if err := ha.logWriter.Write(logEntry); err != nil {
        return err
    }

    // 2. 再执行操作
    ha.engine.SubmitOrder(order)

    return nil
}

5.2 幂等性保证

确保日志重放不会产生重复成交。

type IdempotentEngine struct {
    *MatchEngine
    processedOrders map[string]bool // orderID -> processed
    mu              sync.RWMutex
}

func (ie *IdempotentEngine) SubmitOrder(order *Order) {
    ie.mu.Lock()
    defer ie.mu.Unlock()

    // 检查是否已处理
    if ie.processedOrders[order.OrderID] {
        log.Printf("Order %s already processed, skipping", order.OrderID)
        return
    }

    // 处理订单
    ie.MatchEngine.SubmitOrder(order)

    // 记录已处理
    ie.processedOrders[order.OrderID] = true
}

6. 监控指标

6.1 关键指标

type HAMetrics struct {
    // 角色
    Role string

    // 日志复制
    LogSequenceID       int64
    LogReplicationLag   int64 // Master序列号 - Slave序列号
    LogWriteRate        float64

    // 心跳
    LastHeartbeatTime   time.Time
    HeartbeatMissCount  int

    // 快照
    LastSnapshotTime    time.Time
    SnapshotSize        int64
}

func (ha *HAMatchEngine) GetMetrics() *HAMetrics {
    return &HAMetrics{
        Role:              string(ha.role),
        LogSequenceID:     ha.logWriter.GetNextSeq(),
        LastHeartbeatTime: ha.heartbeat.GetLastTime(),
        // ...
    }
}

6.2 告警规则

alerts:
  - name: MasterDown
    condition: heartbeat_miss_count > 3
    severity: critical
    message: "Master is down"

  - name: HighReplicationLag
    condition: log_replication_lag > 1000
    severity: warning
    message: "Slave replication lag is high"

  - name: SnapshotTooOld
    condition: time_since_last_snapshot > 1h
    severity: warning
    message: "Snapshot is too old"

7. 最佳实践

7.1 配置建议

ha:
  # 最小集群规模
  min_instances: 3

  # 心跳间隔
  heartbeat_interval: 1s
  heartbeat_timeout: 3s

  # 日志配置
  log_flush_interval: 100ms
  log_rotation_size: 1GB

  # 快照配置
  snapshot_interval: 10m
  snapshot_retention: 5

7.2 注意事项

  1. 避免脑裂:使用奇数个节点(3、5、7),并使用多数派选举
  2. 数据持久化:日志和快照都应写入持久化存储(如SSD)
  3. 监控告警:实时监控Master状态,及时发现故障
  4. 定期演练:定期进行故障切换演练,确保流程正确

小结

撮合引擎高可用设计的核心要点:

  • 主从架构:Master处理订单,Slave实时同步
  • 操作日志:WAL保证数据不丢失
  • 快照机制:加快故障恢复速度
  • 自动切换:Master故障时自动选举新Master
  • 幂等保证:日志回放不产生重复成交

下一章将讨论清算系统的设计与实现。

Prev
撮合引擎优化 - 延迟与吞吐
Next
清算系统设计