撮合引擎实现-内存撮合
章节导读
撮合引擎是交易所的核心,早期用Python实现的撮合引擎QPS只有5000,无法承受高峰期流量。使用Go重写后,QPS可以提升到20万,延迟从50ms降到2ms以下。
本章讲解完整的撮合引擎实现,不是玩具代码,而是真正能用在生产环境的代码。本章将一步步讲解每个模块的设计和实现,帮助读者真正理解一个高性能撮合引擎的实现原理。
学习目标
- 掌握订单簿的完整实现(跳表)
- 理解撮合流程的每个细节
- 学会如何优化性能(内存池、批量处理)
- 能够独立实现一个可用的撮合引擎
一、整体架构
1.1 模块划分
┌─────────────────────────────────────────────┐
│ 撮合引擎架构 │
├─────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 订单接收 │ │ 撮合核心 │ │ 成交推送 │ │
│ │ 队列 │→ │ 引擎 │→ │ 队列 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ↓ │
│ ┌─────────────┐ │
│ │ 订单簿 │ │
│ │ (买/卖队列) │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────┘
关键设计:
- 单线程撮合:避免锁竞争,保证确定性
- 内存撮合:所有数据在内存,不依赖数据库
- 异步IO:订单接收和成交推送用队列异步处理
1.2 数据结构选型
| 数据结构 | 作用 | 复杂度 |
|---|---|---|
| 跳表(Skip List) | 价格层级索引 | O(log n) |
| 双向链表 | 同价格订单队列 | O(1) |
| 哈希表 | 订单ID索引 | O(1) |
为什么选跳表而不是红黑树?
跳表优势:
1. 实现简单(红黑树旋转逻辑复杂)
2. 性能相近(都是O(log n))
3. 范围查询方便
4. 并发友好(红黑树需要全局平衡)
劣势:
1. 空间占用稍大(多级索引)
二、核心数据结构
2.1 订单结构
package engine
import (
"time"
"github.com/shopspring/decimal"
)
// Side 买卖方向
type Side int
const (
Buy Side = 1
Sell Side = 2
)
// OrderType 订单类型
type OrderType int
const (
Limit OrderType = 1 // 限价单
Market OrderType = 2 // 市价单
)
// OrderStatus 订单状态
type OrderStatus int
const (
Pending OrderStatus = 1 // 待成交
Partial OrderStatus = 2 // 部分成交
Filled OrderStatus = 3 // 完全成交
Cancelled OrderStatus = 4 // 已撤销
)
// Order 订单
type Order struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Symbol string `json:"symbol"` // 交易对,如BTC/USDT
Side Side `json:"side"` // 买卖方向
Type OrderType `json:"type"` // 订单类型
Price decimal.Decimal `json:"price"` // 价格
Quantity decimal.Decimal `json:"quantity"` // 数量
Remaining decimal.Decimal `json:"remaining"` // 剩余数量
Status OrderStatus `json:"status"` // 状态
CreatedAt time.Time `json:"created_at"` // 创建时间
UpdatedAt time.Time `json:"updated_at"` // 更新时间
}
// NewOrder 创建新订单
func NewOrder(id, userID, symbol string, side Side, orderType OrderType,
price, quantity decimal.Decimal) *Order {
now := time.Now()
return &Order{
ID: id,
UserID: userID,
Symbol: symbol,
Side: side,
Type: orderType,
Price: price,
Quantity: quantity,
Remaining: quantity, // 初始剩余量等于总量
Status: Pending,
CreatedAt: now,
UpdatedAt: now,
}
}
2.2 成交结构
// Trade 成交记录
type Trade struct {
ID string `json:"id"`
Symbol string `json:"symbol"`
Price decimal.Decimal `json:"price"` // 成交价
Quantity decimal.Decimal `json:"quantity"` // 成交量
BuyOrderID string `json:"buy_order_id"`
SellOrderID string `json:"sell_order_id"`
BuyUserID string `json:"buy_user_id"`
SellUserID string `json:"sell_user_id"`
IsBuyMaker bool `json:"is_buy_maker"` // 买方是否是Maker
CreatedAt time.Time `json:"created_at"`
}
// NewTrade 创建成交记录
func NewTrade(symbol string, price, quantity decimal.Decimal,
buyOrder, sellOrder *Order, isBuyMaker bool) *Trade {
return &Trade{
ID: generateTradeID(), // 生成唯一ID
Symbol: symbol,
Price: price,
Quantity: quantity,
BuyOrderID: buyOrder.ID,
SellOrderID: sellOrder.ID,
BuyUserID: buyOrder.UserID,
SellUserID: sellOrder.UserID,
IsBuyMaker: isBuyMaker,
CreatedAt: time.Now(),
}
}
2.3 价格层级(PriceLevel)
// PriceLevel 价格层级
type PriceLevel struct {
Price decimal.Decimal // 价格
Orders []*Order // 该价格的所有订单(时间优先)
Volume decimal.Decimal // 总量
}
// NewPriceLevel 创建价格层级
func NewPriceLevel(price decimal.Decimal) *PriceLevel {
return &PriceLevel{
Price: price,
Orders: make([]*Order, 0, 8), // 预分配8个位置
Volume: decimal.Zero,
}
}
// AddOrder 添加订单(尾部)
func (p *PriceLevel) AddOrder(order *Order) {
p.Orders = append(p.Orders, order)
p.Volume = p.Volume.Add(order.Remaining)
}
// RemoveOrder 移除订单
func (p *PriceLevel) RemoveOrder(order *Order) bool {
for i, o := range p.Orders {
if o.ID == order.ID {
// 删除该订单
p.Orders = append(p.Orders[:i], p.Orders[i+1:]...)
p.Volume = p.Volume.Sub(order.Remaining)
return true
}
}
return false
}
// GetFirstOrder 获取第一个订单(时间最早)
func (p *PriceLevel) GetFirstOrder() *Order {
if len(p.Orders) == 0 {
return nil
}
return p.Orders[0]
}
// IsEmpty 是否为空
func (p *PriceLevel) IsEmpty() bool {
return len(p.Orders) == 0
}
三、订单簿实现
3.1 跳表实现
package skiplist
import (
"math/rand"
"github.com/shopspring/decimal"
)
const (
MaxLevel = 32 // 最大层数
P = 0.25 // 概率因子
)
// Node 跳表节点
type Node struct {
Key decimal.Decimal // 价格
Value interface{} // 价格层级(PriceLevel)
Next []*Node // 每层的下一个节点
}
// SkipList 跳表
type SkipList struct {
Head *Node // 头节点
Level int // 当前层数
Size int // 节点数量
Desc bool // 是否降序(卖单用升序,买单用降序)
}
// NewSkipList 创建跳表
func NewSkipList(desc bool) *SkipList {
head := &Node{
Key: decimal.Zero,
Next: make([]*Node, MaxLevel),
}
return &SkipList{
Head: head,
Level: 1,
Desc: desc,
}
}
// randomLevel 随机层数
func randomLevel() int {
level := 1
for rand.Float64() < P && level < MaxLevel {
level++
}
return level
}
// compare 比较两个价格
func (sl *SkipList) compare(a, b decimal.Decimal) int {
if sl.Desc {
return b.Cmp(a) // 降序:大的在前
}
return a.Cmp(b) // 升序:小的在前
}
// Insert 插入节点
func (sl *SkipList) Insert(key decimal.Decimal, value interface{}) {
// 1. 找到每层的插入位置
update := make([]*Node, MaxLevel)
current := sl.Head
for i := sl.Level - 1; i >= 0; i-- {
for current.Next[i] != nil && sl.compare(current.Next[i].Key, key) < 0 {
current = current.Next[i]
}
update[i] = current
}
// 2. 生成随机层数
level := randomLevel()
if level > sl.Level {
for i := sl.Level; i < level; i++ {
update[i] = sl.Head
}
sl.Level = level
}
// 3. 创建新节点
newNode := &Node{
Key: key,
Value: value,
Next: make([]*Node, level),
}
// 4. 插入每层
for i := 0; i < level; i++ {
newNode.Next[i] = update[i].Next[i]
update[i].Next[i] = newNode
}
sl.Size++
}
// Search 查找节点
func (sl *SkipList) Search(key decimal.Decimal) *Node {
current := sl.Head
for i := sl.Level - 1; i >= 0; i-- {
for current.Next[i] != nil && sl.compare(current.Next[i].Key, key) < 0 {
current = current.Next[i]
}
}
current = current.Next[0]
if current != nil && current.Key.Equal(key) {
return current
}
return nil
}
// Delete 删除节点
func (sl *SkipList) Delete(key decimal.Decimal) bool {
update := make([]*Node, MaxLevel)
current := sl.Head
for i := sl.Level - 1; i >= 0; i-- {
for current.Next[i] != nil && sl.compare(current.Next[i].Key, key) < 0 {
current = current.Next[i]
}
update[i] = current
}
current = current.Next[0]
if current == nil || !current.Key.Equal(key) {
return false
}
// 删除每层的节点
for i := 0; i < sl.Level; i++ {
if update[i].Next[i] != current {
break
}
update[i].Next[i] = current.Next[i]
}
// 调整层数
for sl.Level > 1 && sl.Head.Next[sl.Level-1] == nil {
sl.Level--
}
sl.Size--
return true
}
// GetBest 获取最优价格(第一个节点)
func (sl *SkipList) GetBest() *Node {
return sl.Head.Next[0]
}
3.2 订单簿实现
package engine
import (
"sync"
"github.com/shopspring/decimal"
)
// OrderBook 订单簿
type OrderBook struct {
Symbol string // 交易对
Bids *skiplist.SkipList // 买单(价格降序)
Asks *skiplist.SkipList // 卖单(价格升序)
Orders map[string]*Order // 订单ID索引
mu sync.RWMutex // 读写锁
}
// NewOrderBook 创建订单簿
func NewOrderBook(symbol string) *OrderBook {
return &OrderBook{
Symbol: symbol,
Bids: skiplist.NewSkipList(true), // 降序
Asks: skiplist.NewSkipList(false), // 升序
Orders: make(map[string]*Order),
}
}
// AddOrder 添加订单到订单簿
func (ob *OrderBook) AddOrder(order *Order) {
ob.mu.Lock()
defer ob.mu.Unlock()
// 1. 记录到索引
ob.Orders[order.ID] = order
// 2. 选择买单簿或卖单簿
var book *skiplist.SkipList
if order.Side == Buy {
book = ob.Bids
} else {
book = ob.Asks
}
// 3. 查找或创建价格层级
node := book.Search(order.Price)
var priceLevel *PriceLevel
if node == nil {
priceLevel = NewPriceLevel(order.Price)
book.Insert(order.Price, priceLevel)
} else {
priceLevel = node.Value.(*PriceLevel)
}
// 4. 添加订单到价格层级
priceLevel.AddOrder(order)
}
// RemoveOrder 从订单簿移除订单
func (ob *OrderBook) RemoveOrder(order *Order) bool {
ob.mu.Lock()
defer ob.mu.Unlock()
// 1. 从索引删除
delete(ob.Orders, order.ID)
// 2. 选择买单簿或卖单簿
var book *skiplist.SkipList
if order.Side == Buy {
book = ob.Bids
} else {
book = ob.Asks
}
// 3. 查找价格层级
node := book.Search(order.Price)
if node == nil {
return false
}
priceLevel := node.Value.(*PriceLevel)
// 4. 从价格层级移除订单
priceLevel.RemoveOrder(order)
// 5. 如果价格层级为空,从跳表删除
if priceLevel.IsEmpty() {
book.Delete(order.Price)
}
return true
}
// GetBestBid 获取最优买价
func (ob *OrderBook) GetBestBid() *PriceLevel {
ob.mu.RLock()
defer ob.mu.RUnlock()
node := ob.Bids.GetBest()
if node == nil {
return nil
}
return node.Value.(*PriceLevel)
}
// GetBestAsk 获取最优卖价
func (ob *OrderBook) GetBestAsk() *PriceLevel {
ob.mu.RLock()
defer ob.mu.RUnlock()
node := ob.Asks.GetBest()
if node == nil {
return nil
}
return node.Value.(*PriceLevel)
}
// GetOrder 获取订单
func (ob *OrderBook) GetOrder(orderID string) *Order {
ob.mu.RLock()
defer ob.mu.RUnlock()
return ob.Orders[orderID]
}
四、撮合引擎实现
4.1 撮合核心逻辑
package engine
import (
"github.com/shopspring/decimal"
)
// MatchEngine 撮合引擎
type MatchEngine struct {
OrderBook *OrderBook
Trades []*Trade // 成交记录
}
// NewMatchEngine 创建撮合引擎
func NewMatchEngine(symbol string) *MatchEngine {
return &MatchEngine{
OrderBook: NewOrderBook(symbol),
Trades: make([]*Trade, 0),
}
}
// ProcessOrder 处理订单
func (e *MatchEngine) ProcessOrder(order *Order) []*Trade {
trades := make([]*Trade, 0)
if order.Type == Limit {
trades = e.matchLimitOrder(order)
} else {
trades = e.matchMarketOrder(order)
}
// 如果订单未完全成交,添加到订单簿
if order.Remaining.GreaterThan(decimal.Zero) {
e.OrderBook.AddOrder(order)
}
return trades
}
// matchLimitOrder 撮合限价单
func (e *MatchEngine) matchLimitOrder(order *Order) []*Trade {
trades := make([]*Trade, 0)
// 选择对手方订单簿
var oppBook *skiplist.SkipList
if order.Side == Buy {
oppBook = e.OrderBook.Asks // 买单匹配卖单
} else {
oppBook = e.OrderBook.Bids // 卖单匹配买单
}
// 持续撮合,直到订单完全成交或无法匹配
for order.Remaining.GreaterThan(decimal.Zero) {
// 1. 获取最优对手价
bestNode := oppBook.GetBest()
if bestNode == nil {
break // 对手方订单簿为空
}
bestPrice := bestNode.Value.(*PriceLevel)
// 2. 检查是否能成交
if !e.canMatch(order, bestPrice.Price) {
break // 价格不匹配
}
// 3. 获取该价格的第一个订单(时间优先)
oppOrder := bestPrice.GetFirstOrder()
if oppOrder == nil {
break
}
// 4. 计算成交量
matchQty := decimal.Min(order.Remaining, oppOrder.Remaining)
// 5. 生成成交记录
var trade *Trade
if order.Side == Buy {
// 买单是Taker,卖单是Maker
trade = NewTrade(e.OrderBook.Symbol, oppOrder.Price, matchQty,
order, oppOrder, false)
} else {
// 卖单是Taker,买单是Maker
trade = NewTrade(e.OrderBook.Symbol, oppOrder.Price, matchQty,
oppOrder, order, true)
}
trades = append(trades, trade)
// 6. 更新订单剩余量
order.Remaining = order.Remaining.Sub(matchQty)
oppOrder.Remaining = oppOrder.Remaining.Sub(matchQty)
// 7. 更新订单状态
if order.Remaining.IsZero() {
order.Status = Filled
} else {
order.Status = Partial
}
if oppOrder.Remaining.IsZero() {
oppOrder.Status = Filled
// 从订单簿移除完全成交的订单
e.OrderBook.RemoveOrder(oppOrder)
} else {
oppOrder.Status = Partial
}
// 8. 更新价格层级的总量
bestPrice.Volume = bestPrice.Volume.Sub(matchQty)
}
return trades
}
// canMatch 检查订单是否能与对手价匹配
func (e *MatchEngine) canMatch(order *Order, oppPrice decimal.Decimal) bool {
if order.Side == Buy {
// 买单:买价 >= 卖价
return order.Price.GreaterThanOrEqual(oppPrice)
}
// 卖单:卖价 <= 买价
return order.Price.LessThanOrEqual(oppPrice)
}
// matchMarketOrder 撮合市价单
func (e *MatchEngine) matchMarketOrder(order *Order) []*Trade {
trades := make([]*Trade, 0)
var oppBook *skiplist.SkipList
if order.Side == Buy {
oppBook = e.OrderBook.Asks
} else {
oppBook = e.OrderBook.Bids
}
// 市价单:不管价格,直接成交
for order.Remaining.GreaterThan(decimal.Zero) {
bestNode := oppBook.GetBest()
if bestNode == nil {
break // 对手方订单簿为空
}
bestPrice := bestNode.Value.(*PriceLevel)
oppOrder := bestPrice.GetFirstOrder()
if oppOrder == nil {
break
}
matchQty := decimal.Min(order.Remaining, oppOrder.Remaining)
var trade *Trade
if order.Side == Buy {
trade = NewTrade(e.OrderBook.Symbol, oppOrder.Price, matchQty,
order, oppOrder, false)
} else {
trade = NewTrade(e.OrderBook.Symbol, oppOrder.Price, matchQty,
oppOrder, order, true)
}
trades = append(trades, trade)
order.Remaining = order.Remaining.Sub(matchQty)
oppOrder.Remaining = oppOrder.Remaining.Sub(matchQty)
if order.Remaining.IsZero() {
order.Status = Filled
} else {
order.Status = Partial
}
if oppOrder.Remaining.IsZero() {
oppOrder.Status = Filled
e.OrderBook.RemoveOrder(oppOrder)
} else {
oppOrder.Status = Partial
}
bestPrice.Volume = bestPrice.Volume.Sub(matchQty)
}
return trades
}
// CancelOrder 撤销订单
func (e *MatchEngine) CancelOrder(orderID string) bool {
order := e.OrderBook.GetOrder(orderID)
if order == nil {
return false
}
order.Status = Cancelled
return e.OrderBook.RemoveOrder(order)
}
五、性能优化
5.1 对象池
package engine
import (
"sync"
)
var (
orderPool = sync.Pool{
New: func() interface{} {
return &Order{}
},
}
tradePool = sync.Pool{
New: func() interface{} {
return &Trade{}
},
}
)
// GetOrder 从对象池获取Order
func GetOrder() *Order {
return orderPool.Get().(*Order)
}
// PutOrder 归还Order到对象池
func PutOrder(order *Order) {
// 清空数据
*order = Order{}
orderPool.Put(order)
}
// GetTrade 从对象池获取Trade
func GetTrade() *Trade {
return tradePool.Get().(*Trade)
}
// PutTrade 归还Trade到对象池
func PutTrade(trade *Trade) {
*trade = Trade{}
tradePool.Put(trade)
}
5.2 批量处理
// ProcessOrders 批量处理订单
func (e *MatchEngine) ProcessOrders(orders []*Order) [][]*Trade {
result := make([][]*Trade, 0, len(orders))
for _, order := range orders {
trades := e.ProcessOrder(order)
result = append(result, trades)
}
return result
}
六、测试
6.1 单元测试
package engine
import (
"testing"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
)
func TestMatchEngine_LimitOrder(t *testing.T) {
engine := NewMatchEngine("BTC/USDT")
// 1. 添加卖单
sellOrder := NewOrder("sell1", "user1", "BTC/USDT", Sell, Limit,
decimal.NewFromInt(100), decimal.NewFromInt(10))
trades := engine.ProcessOrder(sellOrder)
assert.Equal(t, 0, len(trades)) // 无成交
// 2. 添加买单(价格更低,无法成交)
buyOrder1 := NewOrder("buy1", "user2", "BTC/USDT", Buy, Limit,
decimal.NewFromInt(99), decimal.NewFromInt(5))
trades = engine.ProcessOrder(buyOrder1)
assert.Equal(t, 0, len(trades)) // 无成交
// 3. 添加买单(价格匹配,成交)
buyOrder2 := NewOrder("buy2", "user3", "BTC/USDT", Buy, Limit,
decimal.NewFromInt(100), decimal.NewFromInt(5))
trades = engine.ProcessOrder(buyOrder2)
assert.Equal(t, 1, len(trades)) // 有1笔成交
assert.Equal(t, "5", trades[0].Quantity.String())
assert.Equal(t, "100", trades[0].Price.String())
// 4. 检查卖单剩余量
sellOrder = engine.OrderBook.GetOrder("sell1")
assert.Equal(t, "5", sellOrder.Remaining.String())
}
func BenchmarkMatchEngine(b *testing.B) {
engine := NewMatchEngine("BTC/USDT")
// 预先添加1000个卖单
for i := 0; i < 1000; i++ {
price := decimal.NewFromInt(int64(100 + i))
order := NewOrder("", "", "BTC/USDT", Sell, Limit,
price, decimal.NewFromInt(10))
engine.ProcessOrder(order)
}
b.ResetTimer()
// 测试处理买单的性能
for i := 0; i < b.N; i++ {
order := NewOrder("", "", "BTC/USDT", Buy, Limit,
decimal.NewFromInt(100), decimal.NewFromInt(1))
engine.ProcessOrder(order)
}
}