撮合引擎高可用
撮合引擎是交易所的核心组件,其可用性直接影响交易服务。本章讨论如何设计高可用的撮合引擎架构。
1. 高可用架构模式
1.1 主从模式 (Master-Slave)
┌─────────────┐
│ Master │ ← 接收所有订单
│ 撮合引擎 │
└──────┬──────┘
│ 同步日志
┌──────┴───────┬──────────┐
↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐
│Slave 1│ │Slave 2│ │Slave 3│
└───────┘ └───────┘ └───────┘
特点:
- Master负责处理所有订单
- Slave实时同步Master的操作日志
- Master故障时,Slave升级为新Master
1.2 实现方案
package ha
import (
"sync"
"time"
)
type Role string
const (
RoleMaster Role = "master"
RoleSlave Role = "slave"
)
type HAMatchEngine struct {
engine *MatchEngine
role Role
mu sync.RWMutex
// 日志复制
logWriter *LogWriter
logReader *LogReader
// 心跳检测
heartbeat *Heartbeat
peers []string
// 状态
isHealthy bool
}
func NewHAMatchEngine(symbol string, peers []string) *HAMatchEngine {
return &HAMatchEngine{
engine: NewMatchEngine(symbol),
role: RoleSlave, // 默认Slave
logWriter: NewLogWriter(),
logReader: NewLogReader(),
heartbeat: NewHeartbeat(3 * time.Second),
peers: peers,
isHealthy: true,
}
}
func (ha *HAMatchEngine) Start() {
// 启动引擎
ha.engine.Start()
// 启动角色选举
go ha.electLeader()
// 启动心跳
go ha.sendHeartbeat()
go ha.checkHeartbeat()
// 启动日志同步
go ha.syncLog()
}
// 处理订单(只有Master)
func (ha *HAMatchEngine) SubmitOrder(order *Order) error {
ha.mu.RLock()
role := ha.role
ha.mu.RUnlock()
if role != RoleMaster {
return fmt.Errorf("not master, current role: %s", role)
}
// 写入操作日志
logEntry := &LogEntry{
SequenceID: ha.logWriter.GetNextSeq(),
EventType: "order_submit",
Data: order,
Timestamp: time.Now(),
}
ha.logWriter.Write(logEntry)
// 提交到撮合引擎
ha.engine.SubmitOrder(order)
return nil
}
// Leader选举(简化版,生产环境应使用Raft/Paxos)
func (ha *HAMatchEngine) electLeader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
// 检查当前Master是否存活
if ha.role == RoleSlave && !ha.isMasterAlive() {
// Master挂了,尝试成为新Master
if ha.tryBecomemaster() {
ha.promoteToMaster()
}
}
}
}
func (ha *HAMatchEngine) tryBecomeMaster() bool {
// 这里应该实现分布式锁(如基于etcd/zookeeper)
// 简化实现:使用本地标志
ha.mu.Lock()
defer ha.mu.Unlock()
if ha.role == RoleSlave {
ha.role = RoleMaster
log.Println("Promoted to Master")
return true
}
return false
}
func (ha *HAMatchEngine) promoteToMaster() {
// 1. 停止日志读取
ha.logReader.Stop()
// 2. 开始接收订单
log.Println("Now accepting orders as Master")
}
// 心跳检测
func (ha *HAMatchEngine) sendHeartbeat() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
ha.mu.RLock()
role := ha.role
ha.mu.RUnlock()
if role == RoleMaster {
// 发送心跳给所有Slave
for _, peer := range ha.peers {
sendHeartbeatToPeer(peer)
}
}
}
}
func (ha *HAMatchEngine) checkHeartbeat() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !ha.heartbeat.IsAlive() {
log.Println("Master heartbeat timeout")
ha.isHealthy = false
}
}
}
// 日志同步
func (ha *HAMatchEngine) syncLog() {
for {
ha.mu.RLock()
role := ha.role
ha.mu.RUnlock()
if role == RoleSlave {
// 从Master读取日志
logEntry := ha.logReader.Read()
if logEntry != nil {
ha.applyLogEntry(logEntry)
}
}
time.Sleep(10 * time.Millisecond)
}
}
func (ha *HAMatchEngine) applyLogEntry(entry *LogEntry) {
switch entry.EventType {
case "order_submit":
order := entry.Data.(*Order)
ha.engine.SubmitOrder(order)
case "order_cancel":
// 处理撤单
}
}
2. 操作日志 (Oplog)
2.1 日志结构
type LogEntry struct {
SequenceID int64 // 全局递增序列号
EventType string // 事件类型
Data interface{} // 事件数据
Timestamp time.Time
Checksum uint32 // 校验和
}
type LogWriter struct {
file *os.File
sequenceID int64
mu sync.Mutex
}
func NewLogWriter() *LogWriter {
file, _ := os.OpenFile("match_engine.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
return &LogWriter{
file: file,
sequenceID: 0,
}
}
func (lw *LogWriter) Write(entry *LogEntry) error {
lw.mu.Lock()
defer lw.mu.Unlock()
// 分配序列号
lw.sequenceID++
entry.SequenceID = lw.sequenceID
// 计算校验和
entry.Checksum = calculateChecksum(entry)
// 序列化
data, err := json.Marshal(entry)
if err != nil {
return err
}
// 写入文件
_, err = lw.file.Write(append(data, '\n'))
if err != nil {
return err
}
// 刷盘(确保持久化)
lw.file.Sync()
return nil
}
func (lw *LogWriter) GetNextSeq() int64 {
lw.mu.Lock()
defer lw.mu.Unlock()
return lw.sequenceID + 1
}
2.2 日志读取与回放
type LogReader struct {
file *os.File
lastSequence int64
mu sync.Mutex
}
func NewLogReader() *LogReader {
file, _ := os.Open("match_engine.log")
return &LogReader{
file: file,
lastSequence: 0,
}
}
func (lr *LogReader) Read() *LogEntry {
lr.mu.Lock()
defer lr.mu.Unlock()
scanner := bufio.NewScanner(lr.file)
for scanner.Scan() {
line := scanner.Text()
var entry LogEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
continue
}
// 只返回新日志
if entry.SequenceID > lr.lastSequence {
// 验证校验和
if !verifyChecksum(&entry) {
log.Printf("Checksum mismatch for entry %d", entry.SequenceID)
continue
}
lr.lastSequence = entry.SequenceID
return &entry
}
}
return nil
}
// 从指定序列号开始回放
func (lr *LogReader) Replay(fromSeq int64, engine *MatchEngine) {
lr.file.Seek(0, 0) // 从头开始
scanner := bufio.NewScanner(lr.file)
for scanner.Scan() {
line := scanner.Text()
var entry LogEntry
json.Unmarshal([]byte(line), &entry)
if entry.SequenceID >= fromSeq {
// 重放日志
switch entry.EventType {
case "order_submit":
order := entry.Data.(*Order)
engine.SubmitOrder(order)
case "order_cancel":
// 处理撤单
}
}
}
}
3. 快照 (Snapshot)
为了加快恢复速度,定期保存撮合引擎的完整状态。
3.1 快照结构
type Snapshot struct {
SequenceID int64 // 快照对应的日志序列号
OrderBook *OrderBookState // 订单簿状态
Timestamp time.Time
}
type OrderBookState struct {
Symbol string
Bids []PriceLevelState
Asks []PriceLevelState
}
type PriceLevelState struct {
Price float64
Orders []*Order
}
3.2 创建快照
func (ha *HAMatchEngine) takeSnapshot() *Snapshot {
// 获取当前日志序列号
sequenceID := ha.logWriter.GetNextSeq() - 1
// 获取订单簿状态
orderBookState := ha.engine.OrderBook.GetState()
snapshot := &Snapshot{
SequenceID: sequenceID,
OrderBook: orderBookState,
Timestamp: time.Now(),
}
// 持久化快照
ha.saveSnapshot(snapshot)
return snapshot
}
func (ha *HAMatchEngine) saveSnapshot(snapshot *Snapshot) {
filename := fmt.Sprintf("snapshot_%d.dat", snapshot.SequenceID)
data, _ := json.Marshal(snapshot)
os.WriteFile(filename, data, 0644)
log.Printf("Snapshot saved: %s", filename)
}
// 定期创建快照
func (ha *HAMatchEngine) startSnapshotTask() {
ticker := time.Ticker(10 * time.Minute)
defer ticker.Stop()
for range ticker.C {
ha.takeSnapshot()
}
}
3.3 从快照恢复
func (ha *HAMatchEngine) recoverFromSnapshot() error {
// 1. 找到最新快照
snapshot, err := ha.loadLatestSnapshot()
if err != nil {
return err
}
// 2. 恢复订单簿状态
ha.engine.OrderBook.RestoreState(snapshot.OrderBook)
// 3. 回放快照之后的日志
ha.logReader.Replay(snapshot.SequenceID+1, ha.engine)
log.Printf("Recovered from snapshot %d", snapshot.SequenceID)
return nil
}
func (ha *HAMatchEngine) loadLatestSnapshot() (*Snapshot, error) {
files, _ := filepath.Glob("snapshot_*.dat")
if len(files) == 0 {
return nil, fmt.Errorf("no snapshot found")
}
// 排序,找到最新的
sort.Strings(files)
latestFile := files[len(files)-1]
data, _ := os.ReadFile(latestFile)
var snapshot Snapshot
json.Unmarshal(data, &snapshot)
return &snapshot, nil
}
4. 故障切换流程
4.1 自动切换
Master故障检测
↓
Slave检测到Master心跳超时
↓
Slave竞选新Master(使用分布式锁)
↓
选举成功的Slave提升为Master
↓
加载最新快照
↓
回放日志(从快照序列号开始)
↓
开始接收新订单
4.2 实现代码
func (ha *HAMatchEngine) handleMasterFailure() {
log.Println("Master failure detected, starting failover...")
// 1. 尝试获取Master角色
if !ha.acquireMasterLock() {
log.Println("Failed to acquire master lock, another instance became master")
return
}
// 2. 从快照恢复
if err := ha.recoverFromSnapshot(); err != nil {
log.Printf("Failed to recover from snapshot: %v", err)
return
}
// 3. 切换角色
ha.mu.Lock()
ha.role = RoleMaster
ha.mu.Unlock()
// 4. 开始接收订单
log.Println("Failover completed, now serving as Master")
}
// 使用etcd实现分布式锁
func (ha *HAMatchEngine) acquireMasterLock() bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 使用etcd的租约和事务实现
lease, _ := ha.etcdClient.Grant(ctx, 10)
txn := ha.etcdClient.Txn(ctx)
txn.If(clientv3.Compare(clientv3.CreateRevision("master_lock"), "=", 0)).
Then(clientv3.OpPut("master_lock", ha.instanceID, clientv3.WithLease(lease.ID))).
Else(clientv3.OpGet("master_lock"))
resp, _ := txn.Commit()
return resp.Succeeded
}
5. 数据一致性保证
5.1 WAL (Write-Ahead Logging)
确保所有操作先写日志,再执行。
func (ha *HAMatchEngine) SubmitOrderWithWAL(order *Order) error {
// 1. 先写WAL
logEntry := &LogEntry{
EventType: "order_submit",
Data: order,
Timestamp: time.Now(),
}
if err := ha.logWriter.Write(logEntry); err != nil {
return err
}
// 2. 再执行操作
ha.engine.SubmitOrder(order)
return nil
}
5.2 幂等性保证
确保日志重放不会产生重复成交。
type IdempotentEngine struct {
*MatchEngine
processedOrders map[string]bool // orderID -> processed
mu sync.RWMutex
}
func (ie *IdempotentEngine) SubmitOrder(order *Order) {
ie.mu.Lock()
defer ie.mu.Unlock()
// 检查是否已处理
if ie.processedOrders[order.OrderID] {
log.Printf("Order %s already processed, skipping", order.OrderID)
return
}
// 处理订单
ie.MatchEngine.SubmitOrder(order)
// 记录已处理
ie.processedOrders[order.OrderID] = true
}
6. 监控指标
6.1 关键指标
type HAMetrics struct {
// 角色
Role string
// 日志复制
LogSequenceID int64
LogReplicationLag int64 // Master序列号 - Slave序列号
LogWriteRate float64
// 心跳
LastHeartbeatTime time.Time
HeartbeatMissCount int
// 快照
LastSnapshotTime time.Time
SnapshotSize int64
}
func (ha *HAMatchEngine) GetMetrics() *HAMetrics {
return &HAMetrics{
Role: string(ha.role),
LogSequenceID: ha.logWriter.GetNextSeq(),
LastHeartbeatTime: ha.heartbeat.GetLastTime(),
// ...
}
}
6.2 告警规则
alerts:
- name: MasterDown
condition: heartbeat_miss_count > 3
severity: critical
message: "Master is down"
- name: HighReplicationLag
condition: log_replication_lag > 1000
severity: warning
message: "Slave replication lag is high"
- name: SnapshotTooOld
condition: time_since_last_snapshot > 1h
severity: warning
message: "Snapshot is too old"
7. 最佳实践
7.1 配置建议
ha:
# 最小集群规模
min_instances: 3
# 心跳间隔
heartbeat_interval: 1s
heartbeat_timeout: 3s
# 日志配置
log_flush_interval: 100ms
log_rotation_size: 1GB
# 快照配置
snapshot_interval: 10m
snapshot_retention: 5
7.2 注意事项
- 避免脑裂:使用奇数个节点(3、5、7),并使用多数派选举
- 数据持久化:日志和快照都应写入持久化存储(如SSD)
- 监控告警:实时监控Master状态,及时发现故障
- 定期演练:定期进行故障切换演练,确保流程正确
小结
撮合引擎高可用设计的核心要点:
- 主从架构:Master处理订单,Slave实时同步
- 操作日志:WAL保证数据不丢失
- 快照机制:加快故障恢复速度
- 自动切换:Master故障时自动选举新Master
- 幂等保证:日志回放不产生重复成交
下一章将讨论清算系统的设计与实现。