行情系统设计
1. 行情数据类型
1.1 Ticker数据
实时价格快照,包含24小时统计信息。
type Ticker struct {
Symbol string `json:"symbol"`
LastPrice float64 `json:"lastPrice"` // 最新成交价
BidPrice float64 `json:"bidPrice"` // 买一价
AskPrice float64 `json:"askPrice"` // 卖一价
OpenPrice float64 `json:"openPrice"` // 24h开盘价
HighPrice float64 `json:"highPrice"` // 24h最高价
LowPrice float64 `json:"lowPrice"` // 24h最低价
Volume float64 `json:"volume"` // 24h成交量
QuoteVolume float64 `json:"quoteVolume"` // 24h成交额
PriceChange float64 `json:"priceChange"` // 24h价格变化
PriceChangePercent float64 `json:"priceChangePercent"` // 24h涨跌幅
Count int64 `json:"count"` // 24h成交笔数
Timestamp time.Time `json:"timestamp"`
}
1.2 深度数据
订单簿的聚合视图。
type Depth struct {
Symbol string `json:"symbol"`
Bids [][2]float64 `json:"bids"` // [[price, quantity], ...]
Asks [][2]float64 `json:"asks"`
Timestamp time.Time `json:"timestamp"`
}
// 示例
{
"symbol": "BTC/USDT",
"bids": [
["50000.00", "1.234"],
["49999.00", "0.567"],
["49998.00", "2.345"]
],
"asks": [
["50001.00", "0.876"],
["50002.00", "1.234"],
["50003.00", "0.543"]
],
"timestamp": 1640000000000
}
1.3 成交记录
最近的成交明细。
type Trade struct {
TradeID string `json:"tradeId"`
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Quantity float64 `json:"quantity"`
Side string `json:"side"` // "buy" or "sell"
Timestamp time.Time `json:"timestamp"`
}
1.4 K线数据
type Kline struct {
Symbol string `json:"symbol"`
Interval string `json:"interval"` // "1m", "5m", "1h", "1d"
OpenTime time.Time `json:"openTime"`
CloseTime time.Time `json:"closeTime"`
Open float64 `json:"open"`
High float64 `json:"high"`
Low float64 `json:"low"`
Close float64 `json:"close"`
Volume float64 `json:"volume"`
QuoteVolume float64 `json:"quoteVolume"`
TradeCount int `json:"tradeCount"`
}
2. 行情聚合
2.1 实时聚合
从撮合引擎的成交流中实时计算行情数据。
package market
import (
"sync"
"time"
)
type MarketDataAggregator struct {
// Ticker聚合器
tickers map[string]*TickerAggregator
// K线聚合器
klines map[string]map[string]*KlineAggregator // symbol -> interval -> aggregator
mu sync.RWMutex
}
func NewMarketDataAggregator() *MarketDataAggregator {
return &MarketDataAggregator{
tickers: make(map[string]*TickerAggregator),
klines: make(map[string]map[string]*KlineAggregator),
}
}
// 处理成交事件
func (mda *MarketDataAggregator) OnTrade(trade *Trade) {
// 更新Ticker
mda.updateTicker(trade)
// 更新K线
mda.updateKlines(trade)
}
func (mda *MarketDataAggregator) updateTicker(trade *Trade) {
mda.mu.Lock()
defer mda.mu.Unlock()
agg, exists := mda.tickers[trade.Symbol]
if !exists {
agg = NewTickerAggregator(trade.Symbol)
mda.tickers[trade.Symbol] = agg
}
agg.AddTrade(trade)
}
// Ticker聚合器
type TickerAggregator struct {
Symbol string
lastPrice float64
bidPrice float64
askPrice float64
// 24小时滚动窗口
window *TimeWindow
openPrice float64
highPrice float64
lowPrice float64
volume float64
quoteVolume float64
tradeCount int64
mu sync.RWMutex
}
func NewTickerAggregator(symbol string) *TickerAggregator {
return &TickerAggregator{
Symbol: symbol,
window: NewTimeWindow(24 * time.Hour),
}
}
func (ta *TickerAggregator) AddTrade(trade *Trade) {
ta.mu.Lock()
defer ta.mu.Unlock()
// 更新最新价
ta.lastPrice = trade.Price
// 添加到时间窗口
ta.window.Add(&WindowItem{
Price: trade.Price,
Quantity: trade.Quantity,
Time: trade.Timestamp,
})
// 重新计算24小时数据
ta.recalculate()
}
func (ta *TickerAggregator) recalculate() {
items := ta.window.GetAll()
if len(items) == 0 {
return
}
// 开盘价(最早的成交)
ta.openPrice = items[0].Price
// 最高价、最低价
ta.highPrice = items[0].Price
ta.lowPrice = items[0].Price
ta.volume = 0
ta.quoteVolume = 0
ta.tradeCount = 0
for _, item := range items {
if item.Price > ta.highPrice {
ta.highPrice = item.Price
}
if item.Price < ta.lowPrice {
ta.lowPrice = item.Price
}
ta.volume += item.Quantity
ta.quoteVolume += item.Price * item.Quantity
ta.tradeCount++
}
}
func (ta *TickerAggregator) GetTicker() *Ticker {
ta.mu.RLock()
defer ta.mu.RUnlock()
priceChange := ta.lastPrice - ta.openPrice
priceChangePercent := 0.0
if ta.openPrice > 0 {
priceChangePercent = (priceChange / ta.openPrice) * 100
}
return &Ticker{
Symbol: ta.Symbol,
LastPrice: ta.lastPrice,
BidPrice: ta.bidPrice,
AskPrice: ta.askPrice,
OpenPrice: ta.openPrice,
HighPrice: ta.highPrice,
LowPrice: ta.lowPrice,
Volume: ta.volume,
QuoteVolume: ta.quoteVolume,
PriceChange: priceChange,
PriceChangePercent: priceChangePercent,
Count: ta.tradeCount,
Timestamp: time.Now(),
}
}
// 时间窗口
type WindowItem struct {
Price float64
Quantity float64
Time time.Time
}
type TimeWindow struct {
duration time.Duration
items []*WindowItem
mu sync.RWMutex
}
func NewTimeWindow(duration time.Duration) *TimeWindow {
tw := &TimeWindow{
duration: duration,
items: make([]*WindowItem, 0),
}
// 定期清理过期数据
go tw.cleanup()
return tw
}
func (tw *TimeWindow) Add(item *WindowItem) {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.items = append(tw.items, item)
}
func (tw *TimeWindow) GetAll() []*WindowItem {
tw.mu.RLock()
defer tw.mu.RUnlock()
cutoff := time.Now().Add(-tw.duration)
result := make([]*WindowItem, 0)
for _, item := range tw.items {
if item.Time.After(cutoff) {
result = append(result, item)
}
}
return result
}
func (tw *TimeWindow) cleanup() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
tw.mu.Lock()
cutoff := time.Now().Add(-tw.duration)
// 找到第一个未过期的元素
i := 0
for i < len(tw.items) && tw.items[i].Time.Before(cutoff) {
i++
}
// 删除过期元素
if i > 0 {
tw.items = tw.items[i:]
}
tw.mu.Unlock()
}
}
2.2 K线聚合
type KlineAggregator struct {
Symbol string
Interval string
currentKline *Kline
completed []*Kline
mu sync.RWMutex
}
func NewKlineAggregator(symbol, interval string) *KlineAggregator {
ka := &KlineAggregator{
Symbol: symbol,
Interval: interval,
completed: make([]*Kline, 0),
}
// 启动定时器,按周期切换K线
go ka.startTimer()
return ka
}
func (ka *KlineAggregator) AddTrade(trade *Trade) {
ka.mu.Lock()
defer ka.mu.Unlock()
// 检查是否需要新建K线
if ka.currentKline == nil || ka.shouldStartNewKline(trade.Timestamp) {
ka.finishCurrentKline()
ka.startNewKline(trade)
}
// 更新当前K线
ka.updateKline(trade)
}
func (ka *KlineAggregator) shouldStartNewKline(timestamp time.Time) bool {
if ka.currentKline == nil {
return true
}
// 检查是否超过当前K线的时间范围
intervalDuration := parseInterval(ka.Interval)
return timestamp.After(ka.currentKline.CloseTime)
}
func (ka *KlineAggregator) startNewKline(trade *Trade) {
intervalDuration := parseInterval(ka.Interval)
// 计算K线的开始和结束时间
openTime := alignTime(trade.Timestamp, intervalDuration)
closeTime := openTime.Add(intervalDuration).Add(-time.Millisecond)
ka.currentKline = &Kline{
Symbol: ka.Symbol,
Interval: ka.Interval,
OpenTime: openTime,
CloseTime: closeTime,
Open: trade.Price,
High: trade.Price,
Low: trade.Price,
Close: trade.Price,
Volume: 0,
QuoteVolume: 0,
TradeCount: 0,
}
}
func (ka *KlineAggregator) updateKline(trade *Trade) {
kline := ka.currentKline
// 更新OHLC
if trade.Price > kline.High {
kline.High = trade.Price
}
if trade.Price < kline.Low {
kline.Low = trade.Price
}
kline.Close = trade.Price
// 更新成交量
kline.Volume += trade.Quantity
kline.QuoteVolume += trade.Price * trade.Quantity
kline.TradeCount++
}
func (ka *KlineAggregator) finishCurrentKline() {
if ka.currentKline != nil {
ka.completed = append(ka.completed, ka.currentKline)
// 只保留最近1000根K线
if len(ka.completed) > 1000 {
ka.completed = ka.completed[1:]
}
// 持久化K线
saveKline(ka.currentKline)
}
}
func (ka *KlineAggregator) GetKlines(limit int) []*Kline {
ka.mu.RLock()
defer ka.mu.RUnlock()
result := make([]*Kline, 0)
// 获取已完成的K线
start := len(ka.completed) - limit
if start < 0 {
start = 0
}
result = append(result, ka.completed[start:]...)
// 加上当前K线
if ka.currentKline != nil {
result = append(result, ka.currentKline)
}
return result
}
func (ka *KlineAggregator) startTimer() {
intervalDuration := parseInterval(ka.Interval)
ticker := time.NewTicker(intervalDuration)
defer ticker.Stop()
for range ticker.C {
ka.mu.Lock()
ka.finishCurrentKline()
ka.currentKline = nil
ka.mu.Unlock()
}
}
// 解析时间间隔
func parseInterval(interval string) time.Duration {
switch interval {
case "1m":
return 1 * time.Minute
case "5m":
return 5 * time.Minute
case "15m":
return 15 * time.Minute
case "30m":
return 30 * time.Minute
case "1h":
return 1 * time.Hour
case "4h":
return 4 * time.Hour
case "1d":
return 24 * time.Hour
case "1w":
return 7 * 24 * time.Hour
default:
return 1 * time.Minute
}
}
// 对齐时间到间隔边界
func alignTime(t time.Time, interval time.Duration) time.Time {
unix := t.Unix()
intervalSec := int64(interval.Seconds())
aligned := (unix / intervalSec) * intervalSec
return time.Unix(aligned, 0)
}
3. 深度数据优化
3.1 增量更新
只推送订单簿的变化部分,而非完整深度。
type DepthUpdate struct {
Symbol string `json:"symbol"`
Bids []PriceLevel `json:"bids"` // 变化的买盘档位
Asks []PriceLevel `json:"asks"` // 变化的卖盘档位
Timestamp time.Time `json:"timestamp"`
}
type PriceLevel struct {
Price string `json:"price"`
Quantity string `json:"quantity"` // "0"表示该档位已删除
}
// 示例
{
"symbol": "BTC/USDT",
"bids": [
["50000.00", "1.5"], // 更新
["49999.00", "0"] // 删除
],
"asks": [
["50001.00", "2.3"] // 更新
]
}
3.2 深度合并
客户端维护本地订单簿,应用增量更新。
class OrderBookManager {
constructor() {
this.bids = new Map(); // price -> quantity
this.asks = new Map();
}
// 初始化完整深度
initSnapshot(snapshot) {
this.bids.clear();
this.asks.clear();
snapshot.bids.forEach(([price, qty]) => {
this.bids.set(price, parseFloat(qty));
});
snapshot.asks.forEach(([price, qty]) => {
this.asks.set(price, parseFloat(qty));
});
}
// 应用增量更新
applyUpdate(update) {
update.bids.forEach(([price, qty]) => {
const quantity = parseFloat(qty);
if (quantity === 0) {
this.bids.delete(price);
} else {
this.bids.set(price, quantity);
}
});
update.asks.forEach(([price, qty]) => {
const quantity = parseFloat(qty);
if (quantity === 0) {
this.asks.delete(price);
} else {
this.asks.set(price, quantity);
}
});
}
// 获取当前深度
getDepth(levels = 20) {
const bids = Array.from(this.bids.entries())
.sort((a, b) => parseFloat(b[0]) - parseFloat(a[0]))
.slice(0, levels);
const asks = Array.from(this.asks.entries())
.sort((a, b) => parseFloat(a[0]) - parseFloat(b[0]))
.slice(0, levels);
return { bids, asks };
}
}
3.3 深度聚合
按价格档位聚合订单簿。
func (ob *OrderBook) GetAggregatedDepth(tickSize float64, levels int) *Depth {
depth := &Depth{
Symbol: ob.Symbol,
Bids: make([][2]float64, 0),
Asks: make([][2]float64, 0),
}
// 聚合买盘
currentPrice := 0.0
currentQty := 0.0
count := 0
node := ob.Bids.GetBest()
for node != nil && count < levels {
// 计算聚合价格(向下取整到tickSize)
aggPrice := math.Floor(node.Price/tickSize) * tickSize
if currentPrice == 0 {
currentPrice = aggPrice
}
if aggPrice == currentPrice {
// 累加到当前档位
for _, order := range node.Level.Orders {
currentQty += order.Quantity - order.Filled
}
} else {
// 保存当前档位,开始新档位
if currentQty > 0 {
depth.Bids = append(depth.Bids, [2]float64{currentPrice, currentQty})
count++
}
currentPrice = aggPrice
currentQty = 0
for _, order := range node.Level.Orders {
currentQty += order.Quantity - order.Filled
}
}
node = node.Forward[0]
}
// 保存最后一个档位
if currentQty > 0 && count < levels {
depth.Bids = append(depth.Bids, [2]float64{currentPrice, currentQty})
}
// 聚合卖盘(类似逻辑)
// ...
return depth
}
4. 行情推送
4.1 WebSocket推送架构
package ws
import (
"sync"
"github.com/gorilla/websocket"
)
type MarketDataPusher struct {
// 订阅管理
subscriptions map[string]map[*Client]bool // topic -> clients
// 消息队列
messageChan chan *Message
mu sync.RWMutex
}
type Client struct {
conn *websocket.Conn
send chan []byte
topics map[string]bool
}
type Message struct {
Topic string
Data interface{}
}
func NewMarketDataPusher() *MarketDataPusher {
mdp := &MarketDataPusher{
subscriptions: make(map[string]map[*Client]bool),
messageChan: make(chan *Message, 10000),
}
go mdp.broadcast()
return mdp
}
// 订阅
func (mdp *MarketDataPusher) Subscribe(client *Client, topic string) {
mdp.mu.Lock()
defer mdp.mu.Unlock()
if mdp.subscriptions[topic] == nil {
mdp.subscriptions[topic] = make(map[*Client]bool)
}
mdp.subscriptions[topic][client] = true
client.topics[topic] = true
}
// 取消订阅
func (mdp *MarketDataPusher) Unsubscribe(client *Client, topic string) {
mdp.mu.Lock()
defer mdp.mu.Unlock()
if clients, exists := mdp.subscriptions[topic]; exists {
delete(clients, client)
}
delete(client.topics, topic)
}
// 发布消息
func (mdp *MarketDataPusher) Publish(topic string, data interface{}) {
mdp.messageChan <- &Message{
Topic: topic,
Data: data,
}
}
// 广播消息
func (mdp *MarketDataPusher) broadcast() {
for msg := range mdp.messageChan {
mdp.mu.RLock()
clients := mdp.subscriptions[msg.Topic]
mdp.mu.RUnlock()
if clients == nil {
continue
}
// 序列化消息
data := serializeMessage(msg)
// 推送给所有订阅者
for client := range clients {
select {
case client.send <- data:
default:
// 客户端发送缓冲区满,跳过
}
}
}
}
// 客户端写循环
func (c *Client) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer ticker.Stop()
for {
select {
case message, ok := <-c.send:
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// 批量发送队列中的消息
n := len(c.send)
for i := 0; i < n; i++ {
w.Write([]byte{'\n'})
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
4.2 订阅主题
// 主题格式
const (
TopicTicker = "ticker.{symbol}" // ticker.BTC/USDT
TopicDepth = "depth.{symbol}@{levels}" // depth.BTC/USDT@20
TopicTrade = "trade.{symbol}" // trade.BTC/USDT
TopicKline = "kline.{symbol}@{interval}" // kline.BTC/USDT@1m
)
// 客户端订阅示例
{
"method": "subscribe",
"params": [
"ticker.BTC/USDT",
"depth.BTC/USDT@20",
"trade.BTC/USDT",
"kline.BTC/USDT@1m"
]
}
// 服务端推送示例
// Ticker推送
{
"topic": "ticker.BTC/USDT",
"data": {
"symbol": "BTC/USDT",
"lastPrice": "50000.00",
"volume": "1234.56",
...
}
}
// 深度推送
{
"topic": "depth.BTC/USDT@20",
"data": {
"bids": [["50000.00", "1.5"]],
"asks": [["50001.00", "2.3"]]
}
}
// 成交推送
{
"topic": "trade.BTC/USDT",
"data": {
"tradeId": "123456",
"price": "50000.00",
"quantity": "0.1",
"side": "buy",
"timestamp": 1640000000000
}
}
5. 行情缓存
5.1 Redis缓存策略
type MarketDataCache struct {
redis *redis.Client
}
func NewMarketDataCache(redisAddr string) *MarketDataCache {
return &MarketDataCache{
redis: redis.NewClient(&redis.Options{
Addr: redisAddr,
}),
}
}
// 缓存Ticker
func (mdc *MarketDataCache) SetTicker(ticker *Ticker) error {
key := fmt.Sprintf("ticker:%s", ticker.Symbol)
data, _ := json.Marshal(ticker)
return mdc.redis.Set(context.Background(), key, data, 5*time.Second).Err()
}
func (mdc *MarketDataCache) GetTicker(symbol string) (*Ticker, error) {
key := fmt.Sprintf("ticker:%s", symbol)
data, err := mdc.redis.Get(context.Background(), key).Bytes()
if err != nil {
return nil, err
}
var ticker Ticker
json.Unmarshal(data, &ticker)
return &ticker, nil
}
// 缓存深度
func (mdc *MarketDataCache) SetDepth(depth *Depth) error {
key := fmt.Sprintf("depth:%s", depth.Symbol)
data, _ := json.Marshal(depth)
return mdc.redis.Set(context.Background(), key, data, 1*time.Second).Err()
}
// 缓存K线
func (mdc *MarketDataCache) SetKlines(symbol, interval string, klines []*Kline) error {
key := fmt.Sprintf("kline:%s:%s", symbol, interval)
data, _ := json.Marshal(klines)
return mdc.redis.Set(context.Background(), key, data, 1*time.Minute).Err()
}
5.2 多级缓存
type TieredCache struct {
l1 *sync.Map // 本地内存缓存
l2 *redis.Client // Redis缓存
l3 Database // 数据库
}
func (tc *TieredCache) GetTicker(symbol string) (*Ticker, error) {
// L1: 本地缓存
if value, ok := tc.l1.Load(symbol); ok {
return value.(*Ticker), nil
}
// L2: Redis
ticker, err := tc.getTickerFromRedis(symbol)
if err == nil {
tc.l1.Store(symbol, ticker)
return ticker, nil
}
// L3: 数据库
ticker, err = tc.getTickerFromDB(symbol)
if err != nil {
return nil, err
}
// 回填缓存
tc.setTickerToRedis(symbol, ticker)
tc.l1.Store(symbol, ticker)
return ticker, nil
}
6. 性能优化
6.1 批量推送
type BatchPusher struct {
pusher *MarketDataPusher
buffer map[string][]interface{} // topic -> messages
batchSize int
mu sync.Mutex
}
func (bp *BatchPusher) Publish(topic string, data interface{}) {
bp.mu.Lock()
defer bp.mu.Unlock()
if bp.buffer[topic] == nil {
bp.buffer[topic] = make([]interface{}, 0, bp.batchSize)
}
bp.buffer[topic] = append(bp.buffer[topic], data)
if len(bp.buffer[topic]) >= bp.batchSize {
bp.flush(topic)
}
}
func (bp *BatchPusher) flush(topic string) {
messages := bp.buffer[topic]
bp.buffer[topic] = make([]interface{}, 0, bp.batchSize)
// 批量推送
bp.pusher.Publish(topic, messages)
}
6.2 限流
type RateLimitedPusher struct {
pusher *MarketDataPusher
limiter *rate.Limiter
}
func (rlp *RateLimitedPusher) Publish(topic string, data interface{}) {
// 等待令牌
rlp.limiter.Wait(context.Background())
rlp.pusher.Publish(topic, data)
}
小结
行情系统设计的核心要点:
- 实时聚合:从成交流计算Ticker、深度、K线
- 增量更新:只推送变化部分,减少带宽
- WebSocket推送:主题订阅模式
- 多级缓存:内存 + Redis + 数据库
- 性能优化:批量推送、限流
下一章将讨论合约交易系统的设计与实现。