HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

缓存与消息队列

1. Redis缓存架构

1.1 缓存层级

┌─────────────────────────────────────────────────────────┐
│                      应用层                              │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────┴────────────────────────────────────┐
│                  L1缓存:本地内存                         │
│              (LRU, 容量10MB, TTL 1s)                     │
└────────────────────┬────────────────────────────────────┘
                     │ Cache Miss
┌────────────────────┴────────────────────────────────────┐
│                  L2缓存:Redis                           │
│            (容量100GB, TTL 60s - 1hour)                  │
└────────────────────┬────────────────────────────────────┘
                     │ Cache Miss
┌────────────────────┴────────────────────────────────────┐
│                  数据库:MySQL                           │
└─────────────────────────────────────────────────────────┘

1.2 缓存实现

本地缓存(LRU):

package cache

import (
    "container/list"
    "sync"
    "time"
)

type CacheItem struct {
    key       string
    value     interface{}
    expiresAt time.Time
}

type LRUCache struct {
    capacity int
    items    map[string]*list.Element
    list     *list.List
    mu       sync.RWMutex
}

func NewLRUCache(capacity int) *LRUCache {
    return &LRUCache{
        capacity: capacity,
        items:    make(map[string]*list.Element),
        list:     list.New(),
    }
}

func (lru *LRUCache) Get(key string) (interface{}, bool) {
    lru.mu.Lock()
    defer lru.mu.Unlock()

    elem, exists := lru.items[key]
    if !exists {
        return nil, false
    }

    item := elem.Value.(*CacheItem)

    // 检查是否过期
    if time.Now().After(item.expiresAt) {
        lru.removeElement(elem)
        return nil, false
    }

    // 移到链表头部(最近使用)
    lru.list.MoveToFront(elem)

    return item.value, true
}

func (lru *LRUCache) Set(key string, value interface{}, ttl time.Duration) {
    lru.mu.Lock()
    defer lru.mu.Unlock()

    // 如果已存在,更新并移到头部
    if elem, exists := lru.items[key]; exists {
        lru.list.MoveToFront(elem)
        item := elem.Value.(*CacheItem)
        item.value = value
        item.expiresAt = time.Now().Add(ttl)
        return
    }

    // 新增项
    item := &CacheItem{
        key:       key,
        value:     value,
        expiresAt: time.Now().Add(ttl),
    }

    elem := lru.list.PushFront(item)
    lru.items[key] = elem

    // 超过容量,删除最久未使用的项
    if lru.list.Len() > lru.capacity {
        oldest := lru.list.Back()
        if oldest != nil {
            lru.removeElement(oldest)
        }
    }
}

func (lru *LRUCache) removeElement(elem *list.Element) {
    lru.list.Remove(elem)
    item := elem.Value.(*CacheItem)
    delete(lru.items, item.key)
}

多级缓存服务:

type CacheService struct {
    l1Cache *LRUCache
    redis   *redis.Client
    db      *sql.DB
}

func NewCacheService(redisClient *redis.Client, db *sql.DB) *CacheService {
    return &CacheService{
        l1Cache: NewLRUCache(10000),
        redis:   redisClient,
        db:      db,
    }
}

func (cs *CacheService) GetTicker(symbol string) (*Ticker, error) {
    cacheKey := fmt.Sprintf("ticker:%s", symbol)

    // 1. 尝试L1缓存
    if value, ok := cs.l1Cache.Get(cacheKey); ok {
        return value.(*Ticker), nil
    }

    // 2. 尝试Redis
    var ticker Ticker
    data, err := cs.redis.Get(context.Background(), cacheKey).Result()
    if err == nil {
        json.Unmarshal([]byte(data), &ticker)
        // 写入L1缓存
        cs.l1Cache.Set(cacheKey, &ticker, 1*time.Second)
        return &ticker, nil
    }

    // 3. 从数据库查询
    ticker, err = cs.queryTickerFromDB(symbol)
    if err != nil {
        return nil, err
    }

    // 4. 回写缓存
    tickerJSON, _ := json.Marshal(ticker)
    cs.redis.Set(context.Background(), cacheKey, tickerJSON, 5*time.Second)
    cs.l1Cache.Set(cacheKey, &ticker, 1*time.Second)

    return &ticker, nil
}

2. 缓存策略

2.1 Cache-Aside(旁路缓存)

读取流程:

func (cs *CacheService) GetUserBalance(userID int64, currency string) (*Balance, error) {
    cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)

    // 1. 先查缓存
    data, err := cs.redis.Get(context.Background(), cacheKey).Result()
    if err == nil {
        var balance Balance
        json.Unmarshal([]byte(data), &balance)
        return &balance, nil
    }

    // 2. 缓存未命中,查数据库
    balance, err := cs.queryBalanceFromDB(userID, currency)
    if err != nil {
        return nil, err
    }

    // 3. 写入缓存
    balanceJSON, _ := json.Marshal(balance)
    cs.redis.Set(context.Background(), cacheKey, balanceJSON, 60*time.Second)

    return balance, nil
}

更新流程:

func (cs *CacheService) UpdateBalance(userID int64, currency string, delta float64) error {
    // 1. 更新数据库
    err := cs.updateBalanceInDB(userID, currency, delta)
    if err != nil {
        return err
    }

    // 2. 删除缓存(而不是更新缓存)
    cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
    cs.redis.Del(context.Background(), cacheKey)

    return nil
}

2.2 Write-Through(直写缓存)

func (cs *CacheService) UpdateBalanceWriteThrough(userID int64, currency string, delta float64) error {
    // 1. 同时更新数据库和缓存
    balance, err := cs.updateBalanceInDB(userID, currency, delta)
    if err != nil {
        return err
    }

    // 2. 更新缓存
    cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
    balanceJSON, _ := json.Marshal(balance)
    cs.redis.Set(context.Background(), cacheKey, balanceJSON, 60*time.Second)

    return nil
}

2.3 Write-Behind(异步回写)

type WriteBuffer struct {
    updates chan *BalanceUpdate
    redis   *redis.Client
    db      *sql.DB
}

type BalanceUpdate struct {
    UserID   int64
    Currency string
    Delta    float64
}

func NewWriteBuffer(redis *redis.Client, db *sql.DB) *WriteBuffer {
    wb := &WriteBuffer{
        updates: make(chan *BalanceUpdate, 10000),
        redis:   redis,
        db:      db,
    }

    // 启动后台写入协程
    go wb.flushWorker()

    return wb
}

func (wb *WriteBuffer) UpdateBalance(userID int64, currency string, delta float64) {
    // 1. 先更新缓存(快速返回)
    cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
    wb.redis.IncrByFloat(context.Background(), cacheKey, delta)

    // 2. 异步更新数据库
    wb.updates <- &BalanceUpdate{
        UserID:   userID,
        Currency: currency,
        Delta:    delta,
    }
}

func (wb *WriteBuffer) flushWorker() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    buffer := make([]*BalanceUpdate, 0, 100)

    for {
        select {
        case update := <-wb.updates:
            buffer = append(buffer, update)

            // 批量写入
            if len(buffer) >= 100 {
                wb.flushToDB(buffer)
                buffer = buffer[:0]
            }

        case <-ticker.C:
            // 定时刷新
            if len(buffer) > 0 {
                wb.flushToDB(buffer)
                buffer = buffer[:0]
            }
        }
    }
}

func (wb *WriteBuffer) flushToDB(updates []*BalanceUpdate) {
    // 批量更新数据库
    for _, update := range updates {
        wb.db.Exec(`
            UPDATE account_balances
            SET available = available + ?
            WHERE user_id = ? AND currency = ?
        `, update.Delta, update.UserID, update.Currency)
    }
}

3. 缓存常见问题

3.1 缓存穿透

问题:查询不存在的数据,缓存和数据库都没有,导致每次都查数据库。

解决方案1:布隆过滤器

import "github.com/bits-and-blooms/bloom/v3"

type CacheWithBloom struct {
    cache *redis.Client
    bloom *bloom.BloomFilter
    db    *sql.DB
}

func NewCacheWithBloom(cache *redis.Client, db *sql.DB) *CacheWithBloom {
    // 创建布隆过滤器:预期100万个元素,误判率0.01%
    bf := bloom.NewWithEstimates(1000000, 0.0001)

    // 初始化:将所有存在的userID加入布隆过滤器
    rows, _ := db.Query("SELECT id FROM users")
    defer rows.Close()

    for rows.Next() {
        var userID int64
        rows.Scan(&userID)
        bf.Add([]byte(fmt.Sprintf("%d", userID)))
    }

    return &CacheWithBloom{
        cache: cache,
        bloom: bf,
        db:    db,
    }
}

func (cwb *CacheWithBloom) GetUser(userID int64) (*User, error) {
    key := []byte(fmt.Sprintf("%d", userID))

    // 1. 布隆过滤器判断:如果不存在,直接返回
    if !cwb.bloom.Test(key) {
        return nil, errors.New("user not found")
    }

    // 2. 查缓存
    cacheKey := fmt.Sprintf("user:%d", userID)
    data, err := cwb.cache.Get(context.Background(), cacheKey).Result()
    if err == nil {
        var user User
        json.Unmarshal([]byte(data), &user)
        return &user, nil
    }

    // 3. 查数据库
    user, err := cwb.queryUserFromDB(userID)
    if err != nil {
        return nil, err
    }

    // 4. 写入缓存
    userJSON, _ := json.Marshal(user)
    cwb.cache.Set(context.Background(), cacheKey, userJSON, 600*time.Second)

    return user, nil
}

解决方案2:缓存空值

func (cs *CacheService) GetUserWithNullCache(userID int64) (*User, error) {
    cacheKey := fmt.Sprintf("user:%d", userID)

    // 1. 查缓存
    data, err := cs.redis.Get(context.Background(), cacheKey).Result()
    if err == nil {
        if data == "null" {
            // 缓存的空值
            return nil, errors.New("user not found")
        }

        var user User
        json.Unmarshal([]byte(data), &user)
        return &user, nil
    }

    // 2. 查数据库
    user, err := cs.queryUserFromDB(userID)
    if err != nil {
        // 数据库中不存在,缓存空值(短TTL)
        cs.redis.Set(context.Background(), cacheKey, "null", 60*time.Second)
        return nil, err
    }

    // 3. 写入缓存
    userJSON, _ := json.Marshal(user)
    cs.redis.Set(context.Background(), cacheKey, userJSON, 600*time.Second)

    return user, nil
}

3.2 缓存击穿

问题:热点数据过期瞬间,大量请求同时查数据库。

解决方案:分布式锁 + 双重检查

func (cs *CacheService) GetTickerWithLock(symbol string) (*Ticker, error) {
    cacheKey := fmt.Sprintf("ticker:%s", symbol)

    // 1. 查缓存
    data, err := cs.redis.Get(context.Background(), cacheKey).Result()
    if err == nil {
        var ticker Ticker
        json.Unmarshal([]byte(data), &ticker)
        return &ticker, nil
    }

    // 2. 缓存未命中,获取分布式锁
    lockKey := fmt.Sprintf("lock:ticker:%s", symbol)
    locked, err := cs.redis.SetNX(context.Background(), lockKey, "1", 10*time.Second).Result()

    if locked {
        // 获取锁成功,查询数据库
        defer cs.redis.Del(context.Background(), lockKey)

        // 双重检查:其他协程可能已经写入缓存
        data, err := cs.redis.Get(context.Background(), cacheKey).Result()
        if err == nil {
            var ticker Ticker
            json.Unmarshal([]byte(data), &ticker)
            return &ticker, nil
        }

        // 查询数据库
        ticker, err := cs.queryTickerFromDB(symbol)
        if err != nil {
            return nil, err
        }

        // 写入缓存
        tickerJSON, _ := json.Marshal(ticker)
        cs.redis.Set(context.Background(), cacheKey, tickerJSON, 5*time.Second)

        return ticker, nil
    } else {
        // 获取锁失败,等待一段时间后重试
        time.Sleep(50 * time.Millisecond)
        return cs.GetTickerWithLock(symbol)
    }
}

3.3 缓存雪崩

问题:大量缓存同时过期,导致数据库压力暴增。

解决方案1:随机TTL

func (cs *CacheService) SetWithRandomTTL(key string, value interface{}, baseTTL time.Duration) {
    // 添加随机时间:baseTTL ± 20%
    randomOffset := time.Duration(rand.Int63n(int64(baseTTL) / 5))
    ttl := baseTTL + randomOffset - baseTTL/10

    valueJSON, _ := json.Marshal(value)
    cs.redis.Set(context.Background(), key, valueJSON, ttl)
}

解决方案2:双层过期策略

type CacheValue struct {
    Data      interface{} `json:"data"`
    ExpiresAt int64       `json:"expires_at"` // 逻辑过期时间
}

func (cs *CacheService) SetWithLogicalExpire(key string, value interface{}, ttl time.Duration) {
    cacheValue := &CacheValue{
        Data:      value,
        ExpiresAt: time.Now().Add(ttl).Unix(),
    }

    valueJSON, _ := json.Marshal(cacheValue)

    // 物理过期时间设置为逻辑过期时间的2倍
    cs.redis.Set(context.Background(), key, valueJSON, ttl*2)
}

func (cs *CacheService) GetWithLogicalExpire(key string) (interface{}, error) {
    data, err := cs.redis.Get(context.Background(), key).Result()
    if err != nil {
        return nil, err
    }

    var cacheValue CacheValue
    json.Unmarshal([]byte(data), &cacheValue)

    // 检查逻辑过期时间
    if time.Now().Unix() > cacheValue.ExpiresAt {
        // 逻辑过期,异步更新缓存
        go cs.refreshCache(key)

        // 返回旧数据(避免击穿)
        return cacheValue.Data, nil
    }

    return cacheValue.Data, nil
}

4. Kafka消息队列

4.1 Kafka架构

┌──────────────────────────────────────────────────────────┐
│                       Producer                            │
│  撮合引擎、清算系统、风控系统                              │
└────────────┬────────────┬─────────────┬──────────────────┘
             │            │             │
             ↓            ↓             ↓
┌────────────────────────────────────────────────────────┐
│                     Kafka Cluster                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐    │
│  │   Topic:    │  │   Topic:    │  │   Topic:    │    │
│  │   trades    │  │   orders    │  │   balances  │    │
│  │ (3 parts)   │  │ (5 parts)   │  │ (5 parts)   │    │
│  └─────────────┘  └─────────────┘  └─────────────┘    │
└────────────┬────────────┬─────────────┬──────────────┘
             │            │             │
             ↓            ↓             ↓
┌────────────────────────────────────────────────────────┐
│                      Consumers                          │
│  行情服务、推送服务、数据归档、风控分析                  │
└──────────────────────────────────────────────────────┘

4.2 Producer实现

package kafka

import (
    "context"
    "encoding/json"
    "github.com/segmentio/kafka-go"
)

type TradeProducer struct {
    writer *kafka.Writer
}

func NewTradeProducer(brokers []string) *TradeProducer {
    writer := &kafka.Writer{
        Addr:     kafka.TCP(brokers...),
        Topic:    "trades",
        Balancer: &kafka.Hash{}, // 按key分区
    }

    return &TradeProducer{writer: writer}
}

func (tp *TradeProducer) PublishTrade(trade *Trade) error {
    tradeJSON, err := json.Marshal(trade)
    if err != nil {
        return err
    }

    // 使用symbol作为key,保证同一交易对的消息有序
    message := kafka.Message{
        Key:   []byte(trade.Symbol),
        Value: tradeJSON,
    }

    return tp.writer.WriteMessages(context.Background(), message)
}

func (tp *TradeProducer) Close() error {
    return tp.writer.Close()
}

批量发送优化:

type BatchTradeProducer struct {
    writer  *kafka.Writer
    buffer  chan *Trade
    batchSize int
}

func NewBatchTradeProducer(brokers []string, batchSize int) *BatchTradeProducer {
    writer := &kafka.Writer{
        Addr:     kafka.TCP(brokers...),
        Topic:    "trades",
        Balancer: &kafka.Hash{},
        BatchSize: batchSize,
        BatchTimeout: 10 * time.Millisecond,
    }

    btp := &BatchTradeProducer{
        writer:    writer,
        buffer:    make(chan *Trade, 10000),
        batchSize: batchSize,
    }

    go btp.flushWorker()

    return btp
}

func (btp *BatchTradeProducer) PublishTrade(trade *Trade) {
    btp.buffer <- trade
}

func (btp *BatchTradeProducer) flushWorker() {
    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()

    batch := make([]kafka.Message, 0, btp.batchSize)

    for {
        select {
        case trade := <-btp.buffer:
            tradeJSON, _ := json.Marshal(trade)
            message := kafka.Message{
                Key:   []byte(trade.Symbol),
                Value: tradeJSON,
            }
            batch = append(batch, message)

            // 达到批量大小,立即发送
            if len(batch) >= btp.batchSize {
                btp.writer.WriteMessages(context.Background(), batch...)
                batch = batch[:0]
            }

        case <-ticker.C:
            // 定时刷新
            if len(batch) > 0 {
                btp.writer.WriteMessages(context.Background(), batch...)
                batch = batch[:0]
            }
        }
    }
}

4.3 Consumer实现

type TradeConsumer struct {
    reader *kafka.Reader
}

func NewTradeConsumer(brokers []string, groupID string) *TradeConsumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  brokers,
        Topic:    "trades",
        GroupID:  groupID,
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
    })

    return &TradeConsumer{reader: reader}
}

func (tc *TradeConsumer) Start(handler func(*Trade) error) error {
    for {
        message, err := tc.reader.ReadMessage(context.Background())
        if err != nil {
            return err
        }

        var trade Trade
        err = json.Unmarshal(message.Value, &trade)
        if err != nil {
            // 记录错误,继续处理下一条
            log.Printf("unmarshal error: %v", err)
            continue
        }

        // 处理消息
        err = handler(&trade)
        if err != nil {
            log.Printf("handler error: %v", err)
            // 根据业务决定是否重试
        }

        // 提交offset
        tc.reader.CommitMessages(context.Background(), message)
    }
}

func (tc *TradeConsumer) Close() error {
    return tc.reader.Close()
}

并发消费:

type ParallelTradeConsumer struct {
    reader    *kafka.Reader
    workers   int
    tradesChan chan *Trade
}

func NewParallelTradeConsumer(brokers []string, groupID string, workers int) *ParallelTradeConsumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: brokers,
        Topic:   "trades",
        GroupID: groupID,
    })

    ptc := &ParallelTradeConsumer{
        reader:    reader,
        workers:   workers,
        tradesChan: make(chan *Trade, 10000),
    }

    // 启动多个worker
    for i := 0; i < workers; i++ {
        go ptc.worker(i)
    }

    return ptc
}

func (ptc *ParallelTradeConsumer) Start() error {
    for {
        message, err := ptc.reader.FetchMessage(context.Background())
        if err != nil {
            return err
        }

        var trade Trade
        err = json.Unmarshal(message.Value, &trade)
        if err != nil {
            log.Printf("unmarshal error: %v", err)
            ptc.reader.CommitMessages(context.Background(), message)
            continue
        }

        // 发送到worker处理
        ptc.tradesChan <- &trade

        // 提交offset
        ptc.reader.CommitMessages(context.Background(), message)
    }
}

func (ptc *ParallelTradeConsumer) worker(id int) {
    for trade := range ptc.tradesChan {
        // 处理trade
        err := ptc.processTrade(trade)
        if err != nil {
            log.Printf("worker %d: process error: %v", id, err)
        }
    }
}

func (ptc *ParallelTradeConsumer) processTrade(trade *Trade) error {
    // 业务逻辑:更新行情、推送WebSocket等
    return nil
}

4.4 消息顺序性保证

问题:同一用户的订单消息必须有序处理。

解决方案:按userID分区

type OrderProducer struct {
    writer *kafka.Writer
}

func NewOrderProducer(brokers []string) *OrderProducer {
    writer := &kafka.Writer{
        Addr:     kafka.TCP(brokers...),
        Topic:    "orders",
        // 使用自定义分区器:按userID分区
        Balancer: &UserBalancer{},
    }

    return &OrderProducer{writer: writer}
}

// 自定义分区器
type UserBalancer struct{}

func (ub *UserBalancer) Balance(msg kafka.Message, partitions ...int) int {
    // 从key中提取userID,分配到对应分区
    userID := string(msg.Key)
    hash := fnv.New32a()
    hash.Write([]byte(userID))

    return int(hash.Sum32()) % len(partitions)
}

// 发送订单消息
func (op *OrderProducer) PublishOrder(order *Order) error {
    orderJSON, _ := json.Marshal(order)

    message := kafka.Message{
        Key:   []byte(fmt.Sprintf("%d", order.UserID)), // 使用userID作为key
        Value: orderJSON,
    }

    return op.writer.WriteMessages(context.Background(), message)
}

4.5 消息幂等性

问题:消费者重启后可能重复消费消息。

解决方案1:唯一ID去重

type IdempotentConsumer struct {
    reader      *kafka.Reader
    processedIDs *redis.Client
}

func (ic *IdempotentConsumer) ProcessTrade(trade *Trade) error {
    // 1. 检查是否已处理
    key := fmt.Sprintf("processed:trade:%s", trade.TradeID)
    exists, err := ic.processedIDs.Exists(context.Background(), key).Result()

    if exists > 0 {
        // 已处理,跳过
        log.Printf("trade %s already processed", trade.TradeID)
        return nil
    }

    // 2. 处理业务逻辑
    err = ic.handleTrade(trade)
    if err != nil {
        return err
    }

    // 3. 标记为已处理(TTL 7天)
    ic.processedIDs.Set(context.Background(), key, "1", 7*24*time.Hour)

    return nil
}

解决方案2:数据库唯一约束

CREATE TABLE processed_trades (
    trade_id VARCHAR(64) PRIMARY KEY,
    processed_at DATETIME NOT NULL,
    INDEX idx_processed_at (processed_at)
) ENGINE=InnoDB;

-- 插入时利用主键约束去重
INSERT IGNORE INTO processed_trades (trade_id, processed_at)
VALUES (?, NOW());

5. 实战:成交流处理

5.1 完整流程

撮合引擎 → Kafka(trades) → 消费者 → [行情聚合、WebSocket推送、数据持久化]

5.2 实现

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
    "github.com/segmentio/kafka-go"
)

type TradeStreamProcessor struct {
    kafka    *kafka.Reader
    redis    *redis.Client
    wsServer *WebSocketServer
}

func NewTradeStreamProcessor(brokers []string, redisAddr string) *TradeStreamProcessor {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: brokers,
        Topic:   "trades",
        GroupID: "trade-processor",
    })

    redisClient := redis.NewClient(&redis.Options{
        Addr: redisAddr,
    })

    return &TradeStreamProcessor{
        kafka:    reader,
        redis:    redisClient,
        wsServer: NewWebSocketServer(),
    }
}

func (tsp *TradeStreamProcessor) Start() {
    for {
        message, err := tsp.kafka.ReadMessage(context.Background())
        if err != nil {
            log.Printf("read error: %v", err)
            continue
        }

        var trade Trade
        err = json.Unmarshal(message.Value, &trade)
        if err != nil {
            log.Printf("unmarshal error: %v", err)
            continue
        }

        // 并发处理多个任务
        go tsp.updateTicker(&trade)
        go tsp.updateKline(&trade)
        go tsp.updateDepth(&trade)
        go tsp.pushToWebSocket(&trade)
    }
}

// 更新Ticker
func (tsp *TradeStreamProcessor) updateTicker(trade *Trade) {
    key := fmt.Sprintf("ticker:%s", trade.Symbol)

    // 更新最新价
    tsp.redis.HSet(context.Background(), key, "lastPrice", trade.Price)

    // 累加成交量
    tsp.redis.HIncrByFloat(context.Background(), key, "volume", trade.Quantity)
    tsp.redis.HIncrByFloat(context.Background(), key, "quoteVolume", trade.Amount)

    // 更新24h高低价
    tsp.redis.Eval(context.Background(), `
        local high = redis.call('HGET', KEYS[1], 'highPrice')
        if not high or tonumber(ARGV[1]) > tonumber(high) then
            redis.call('HSET', KEYS[1], 'highPrice', ARGV[1])
        end

        local low = redis.call('HGET', KEYS[1], 'lowPrice')
        if not low or tonumber(ARGV[1]) < tonumber(low) then
            redis.call('HSET', KEYS[1], 'lowPrice', ARGV[1])
        end
    `, []string{key}, trade.Price)

    // 设置过期时间
    tsp.redis.Expire(context.Background(), key, 24*time.Hour)
}

// 更新K线
func (tsp *TradeStreamProcessor) updateKline(trade *Trade) {
    intervals := []string{"1m", "5m", "15m", "1h", "4h", "1d"}

    for _, interval := range intervals {
        key := fmt.Sprintf("kline:%s:%s", trade.Symbol, interval)

        // 获取当前K线的时间窗口
        timestamp := tsp.getKlineTimestamp(trade.Timestamp, interval)

        // 使用Lua脚本原子更新K线
        tsp.redis.Eval(context.Background(), `
            local key = KEYS[1]
            local timestamp = ARGV[1]
            local price = tonumber(ARGV[2])
            local quantity = tonumber(ARGV[3])

            -- 获取当前K线
            local kline = redis.call('HGETALL', key)
            local klineMap = {}
            for i = 1, #kline, 2 do
                klineMap[kline[i]] = kline[i+1]
            end

            if klineMap['timestamp'] == timestamp then
                -- 更新现有K线
                local high = tonumber(klineMap['high'])
                local low = tonumber(klineMap['low'])
                local volume = tonumber(klineMap['volume'])

                redis.call('HSET', key, 'close', price)
                redis.call('HSET', key, 'high', math.max(high, price))
                redis.call('HSET', key, 'low', math.min(low, price))
                redis.call('HINCRBY', key, 'volume', quantity)
            else
                -- 新建K线
                redis.call('HSET', key, 'timestamp', timestamp)
                redis.call('HSET', key, 'open', price)
                redis.call('HSET', key, 'high', price)
                redis.call('HSET', key, 'low', price)
                redis.call('HSET', key, 'close', price)
                redis.call('HSET', key, 'volume', quantity)
            end

            redis.call('EXPIRE', key, 86400)
        `, []string{key}, timestamp, trade.Price, trade.Quantity)
    }
}

// 更新深度(减少对应档位的量)
func (tsp *TradeStreamProcessor) updateDepth(trade *Trade) {
    // 买单成交,减少卖盘深度
    askKey := fmt.Sprintf("depth:ask:%s", trade.Symbol)
    tsp.redis.ZIncrBy(context.Background(), askKey, -trade.Quantity, fmt.Sprintf("%f", trade.Price))

    // 移除数量为0的档位
    tsp.redis.ZRemRangeByScore(context.Background(), askKey, "-inf", "0")
}

// 推送WebSocket
func (tsp *TradeStreamProcessor) pushToWebSocket(trade *Trade) {
    tradeMsg := map[string]interface{}{
        "event":     "trade",
        "symbol":    trade.Symbol,
        "tradeId":   trade.TradeID,
        "price":     trade.Price,
        "quantity":  trade.Quantity,
        "timestamp": trade.Timestamp.Unix(),
    }

    tsp.wsServer.Broadcast(trade.Symbol, tradeMsg)
}

func (tsp *TradeStreamProcessor) getKlineTimestamp(t time.Time, interval string) int64 {
    switch interval {
    case "1m":
        return t.Truncate(time.Minute).Unix()
    case "5m":
        return t.Truncate(5 * time.Minute).Unix()
    case "15m":
        return t.Truncate(15 * time.Minute).Unix()
    case "1h":
        return t.Truncate(time.Hour).Unix()
    case "4h":
        return t.Truncate(4 * time.Hour).Unix()
    case "1d":
        return t.Truncate(24 * time.Hour).Unix()
    default:
        return t.Unix()
    }
}

func main() {
    processor := NewTradeStreamProcessor(
        []string{"localhost:9092"},
        "localhost:6379",
    )

    log.Println("Trade stream processor started")
    processor.Start()
}

小结

本章介绍了缓存与消息队列的设计:

  1. Redis缓存架构:多级缓存(本地 + Redis)、LRU实现
  2. 缓存策略:Cache-Aside、Write-Through、Write-Behind
  3. 缓存问题:穿透、击穿、雪崩的解决方案
  4. Kafka消息队列:Producer/Consumer实现、批量发送、并发消费
  5. 消息顺序性:分区策略保证顺序
  6. 消息幂等性:唯一ID去重、数据库约束
  7. 实战案例:成交流实时处理

下一章将讲解用户系统与KYC认证。

Prev
数据库设计与优化
Next
用户系统与KYC