HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

项目实战-完整交易所实现

1. 项目架构总览

1.1 技术栈

后端:
- Go 1.21+
- MySQL 8.0
- Redis 7.0
- Kafka 3.5
- WebSocket

前端:
- Vue 3
- TypeScript
- TradingView (K线图)

基础设施:
- Docker / Kubernetes
- Prometheus + Grafana
- ELK Stack
- Nginx

1.2 目录结构

exchange/
├── cmd/
│   ├── api/              # API服务
│   ├── matching/         # 撮合引擎
│   ├── market/           # 行情服务
│   ├── settlement/       # 清算服务
│   └── websocket/        # WebSocket服务
├── internal/
│   ├── domain/           # 领域模型
│   │   ├── order/
│   │   ├── trade/
│   │   ├── account/
│   │   └── position/
│   ├── repository/       # 数据访问层
│   │   ├── mysql/
│   │   └── redis/
│   ├── service/          # 业务逻辑层
│   │   ├── matching/
│   │   ├── risk/
│   │   └── wallet/
│   └── infrastructure/   # 基础设施
│       ├── cache/
│       ├── mq/
│       └── config/
├── api/                  # API定义
│   └── proto/            # gRPC proto
├── web/                  # 前端代码
│   ├── src/
│   ├── public/
│   └── package.json
├── scripts/              # 部署脚本
│   ├── deploy.sh
│   └── migrate.sh
├── docker-compose.yml
├── go.mod
└── README.md

2. 核心模块实现

2.1 订单模型

// internal/domain/order/order.go
package order

import (
    "time"
)

type Order struct {
    ID           int64     `json:"id"`
    OrderID      string    `json:"order_id"`
    UserID       int64     `json:"user_id"`
    Symbol       string    `json:"symbol"`
    Side         Side      `json:"side"`
    Type         Type      `json:"type"`
    Price        float64   `json:"price"`
    Quantity     float64   `json:"quantity"`
    FilledQty    float64   `json:"filled_qty"`
    FilledAmount float64   `json:"filled_amount"`
    AvgPrice     float64   `json:"avg_price"`
    Status       Status    `json:"status"`
    TimeInForce  TIF       `json:"time_in_force"`
    CreatedAt    time.Time `json:"created_at"`
    UpdatedAt    time.Time `json:"updated_at"`
}

type Side string

const (
    SideBuy  Side = "buy"
    SideSell Side = "sell"
)

type Type string

const (
    TypeLimit  Type = "limit"
    TypeMarket Type = "market"
)

type Status string

const (
    StatusPending   Status = "pending"
    StatusPartial   Status = "partial"
    StatusFilled    Status = "filled"
    StatusCancelled Status = "cancelled"
    StatusRejected  Status = "rejected"
)

type TIF string

const (
    TIFGTC TIF = "GTC" // Good Till Cancel
    TIFIOC TIF = "IOC" // Immediate or Cancel
    TIFFOK TIF = "FOK" // Fill or Kill
)

2.2 撮合引擎服务

// cmd/matching/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"

    "exchange/internal/service/matching"
    "exchange/internal/infrastructure/mq"
)

func main() {
    // 初始化配置
    cfg := loadConfig()

    // 初始化Kafka
    consumer := mq.NewKafkaConsumer(cfg.Kafka.Brokers, "matching-engine")
    producer := mq.NewKafkaProducer(cfg.Kafka.Brokers)

    // 创建撮合引擎
    engines := make(map[string]*matching.Engine)
    symbols := []string{"BTCUSDT", "ETHUSDT", "BNBUSDT"}

    for _, symbol := range symbols {
        engine := matching.NewEngine(symbol, producer)
        engines[symbol] = engine
        go engine.Start()
    }

    // 消费订单
    go consumeOrders(consumer, engines)

    // 优雅关闭
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    log.Println("Shutting down matching engine...")
    for _, engine := range engines {
        engine.Stop()
    }
}

func consumeOrders(consumer *mq.KafkaConsumer, engines map[string]*matching.Engine) {
    for msg := range consumer.Messages("orders") {
        var order order.Order
        json.Unmarshal(msg.Value, &order)

        engine, exists := engines[order.Symbol]
        if !exists {
            log.Printf("Unknown symbol: %s", order.Symbol)
            continue
        }

        engine.AddOrder(&order)
    }
}

2.3 撮合引擎核心

// internal/service/matching/engine.go
package matching

import (
    "encoding/json"
    "sync"
    "time"

    "exchange/internal/domain/order"
    "exchange/internal/domain/trade"
    "exchange/internal/infrastructure/mq"
)

type Engine struct {
    symbol    string
    orderBook *OrderBook
    orderChan chan *order.Order
    producer  *mq.KafkaProducer
    mu        sync.RWMutex
    stopped   bool
}

func NewEngine(symbol string, producer *mq.KafkaProducer) *Engine {
    return &Engine{
        symbol:    symbol,
        orderBook: NewOrderBook(),
        orderChan: make(chan *order.Order, 10000),
        producer:  producer,
    }
}

func (e *Engine) Start() {
    for order := range e.orderChan {
        e.processOrder(order)
    }
}

func (e *Engine) Stop() {
    e.mu.Lock()
    e.stopped = true
    e.mu.Unlock()

    close(e.orderChan)
}

func (e *Engine) AddOrder(order *order.Order) {
    e.mu.RLock()
    if e.stopped {
        e.mu.RUnlock()
        return
    }
    e.mu.RUnlock()

    e.orderChan <- order
}

func (e *Engine) processOrder(order *order.Order) {
    // 1. 验证订单
    if err := e.validateOrder(order); err != nil {
        e.rejectOrder(order, err)
        return
    }

    // 2. 尝试匹配
    trades := e.match(order)

    // 3. 发布成交消息
    for _, trade := range trades {
        e.publishTrade(trade)
    }

    // 4. 如果未完全成交,加入订单簿
    if order.FilledQty < order.Quantity {
        if order.TimeInForce == order.TIFIOC || order.TimeInForce == order.TIFFOK {
            // IOC/FOK订单,取消未成交部分
            order.Status = order.StatusCancelled
        } else {
            // 加入订单簿
            e.orderBook.Add(order)
            order.Status = order.StatusPartial
        }
    } else {
        order.Status = order.StatusFilled
    }

    // 5. 发布订单更新
    e.publishOrderUpdate(order)
}

func (e *Engine) match(order *order.Order) []*trade.Trade {
    var trades []*trade.Trade

    if order.Side == order.SideBuy {
        // 买单:匹配卖盘
        for {
            bestAsk := e.orderBook.BestAsk()
            if bestAsk == nil {
                break
            }

            // 限价单:价格不满足
            if order.Type == order.TypeLimit && order.Price < bestAsk.Price {
                break
            }

            // 成交
            matchQty := min(order.Quantity-order.FilledQty, bestAsk.Quantity-bestAsk.FilledQty)
            matchPrice := bestAsk.Price

            trade := &trade.Trade{
                TradeID:      generateTradeID(),
                Symbol:       order.Symbol,
                BuyOrderID:   order.OrderID,
                SellOrderID:  bestAsk.OrderID,
                BuyUserID:    order.UserID,
                SellUserID:   bestAsk.UserID,
                Price:        matchPrice,
                Quantity:     matchQty,
                Amount:       matchPrice * matchQty,
                BuyerIsMaker: false,
                Timestamp:    time.Now(),
            }

            trades = append(trades, trade)

            // 更新订单
            order.FilledQty += matchQty
            order.FilledAmount += trade.Amount
            order.AvgPrice = order.FilledAmount / order.FilledQty

            bestAsk.FilledQty += matchQty
            bestAsk.FilledAmount += trade.Amount
            bestAsk.AvgPrice = bestAsk.FilledAmount / bestAsk.FilledQty

            // 完全成交,移除订单簿
            if bestAsk.FilledQty >= bestAsk.Quantity {
                e.orderBook.Remove(bestAsk)
                bestAsk.Status = order.StatusFilled
                e.publishOrderUpdate(bestAsk)
            }

            // 买单完全成交,结束
            if order.FilledQty >= order.Quantity {
                break
            }
        }
    } else {
        // 卖单:匹配买盘(类似逻辑)
        // ...省略
    }

    return trades
}

func (e *Engine) publishTrade(trade *trade.Trade) {
    data, _ := json.Marshal(trade)
    e.producer.Send("trades", data)
}

func (e *Engine) publishOrderUpdate(order *order.Order) {
    data, _ := json.Marshal(order)
    e.producer.Send("order_updates", data)
}

func (e *Engine) validateOrder(order *order.Order) error {
    if order.Quantity <= 0 {
        return errors.New("invalid quantity")
    }

    if order.Type == order.TypeLimit && order.Price <= 0 {
        return errors.New("invalid price")
    }

    return nil
}

func (e *Engine) rejectOrder(order *order.Order, err error) {
    order.Status = order.StatusRejected
    e.publishOrderUpdate(order)
}

2.4 订单簿实现

// internal/service/matching/orderbook.go
package matching

import (
    "container/list"
    "sync"

    "exchange/internal/domain/order"
)

type OrderBook struct {
    bids *PriceLevel // 买盘(价格从高到低)
    asks *PriceLevel // 卖盘(价格从低到高)
    mu   sync.RWMutex
}

type PriceLevel struct {
    levels map[float64]*list.List
}

func NewOrderBook() *OrderBook {
    return &OrderBook{
        bids: &PriceLevel{levels: make(map[float64]*list.List)},
        asks: &PriceLevel{levels: make(map[float64]*list.List)},
    }
}

func (ob *OrderBook) Add(order *order.Order) {
    ob.mu.Lock()
    defer ob.mu.Unlock()

    var pl *PriceLevel
    if order.Side == order.SideBuy {
        pl = ob.bids
    } else {
        pl = ob.asks
    }

    if pl.levels[order.Price] == nil {
        pl.levels[order.Price] = list.New()
    }

    pl.levels[order.Price].PushBack(order)
}

func (ob *OrderBook) Remove(order *order.Order) {
    ob.mu.Lock()
    defer ob.mu.Unlock()

    var pl *PriceLevel
    if order.Side == order.SideBuy {
        pl = ob.bids
    } else {
        pl = ob.asks
    }

    if l := pl.levels[order.Price]; l != nil {
        for e := l.Front(); e != nil; e = e.Next() {
            if e.Value.(*order.Order).OrderID == order.OrderID {
                l.Remove(e)
                if l.Len() == 0 {
                    delete(pl.levels, order.Price)
                }
                break
            }
        }
    }
}

func (ob *OrderBook) BestBid() *order.Order {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    var bestPrice float64
    for price := range ob.bids.levels {
        if price > bestPrice {
            bestPrice = price
        }
    }

    if bestPrice == 0 {
        return nil
    }

    l := ob.bids.levels[bestPrice]
    if l.Len() == 0 {
        return nil
    }

    return l.Front().Value.(*order.Order)
}

func (ob *OrderBook) BestAsk() *order.Order {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    var bestPrice float64 = -1
    for price := range ob.asks.levels {
        if bestPrice < 0 || price < bestPrice {
            bestPrice = price
        }
    }

    if bestPrice < 0 {
        return nil
    }

    l := ob.asks.levels[bestPrice]
    if l.Len() == 0 {
        return nil
    }

    return l.Front().Value.(*order.Order)
}

3. API服务实现

3.1 HTTP API

// cmd/api/main.go
package main

import (
    "log"
    "net/http"

    "github.com/gorilla/mux"

    "exchange/internal/api/handler"
    "exchange/internal/api/middleware"
)

func main() {
    r := mux.NewRouter()

    // 中间件
    r.Use(middleware.CORS)
    r.Use(middleware.Logging)
    r.Use(middleware.RateLimiting)

    // 公开API
    r.HandleFunc("/api/v1/ticker/{symbol}", handler.GetTicker).Methods("GET")
    r.HandleFunc("/api/v1/depth/{symbol}", handler.GetDepth).Methods("GET")
    r.HandleFunc("/api/v1/trades/{symbol}", handler.GetTrades).Methods("GET")
    r.HandleFunc("/api/v1/klines/{symbol}", handler.GetKlines).Methods("GET")

    // 认证API
    auth := r.PathPrefix("/api/v1").Subrouter()
    auth.Use(middleware.JWTAuth)

    auth.HandleFunc("/account/balance", handler.GetBalance).Methods("GET")
    auth.HandleFunc("/orders", handler.PlaceOrder).Methods("POST")
    auth.HandleFunc("/orders/{orderId}", handler.CancelOrder).Methods("DELETE")
    auth.HandleFunc("/orders", handler.GetOrders).Methods("GET")
    auth.HandleFunc("/trades", handler.GetMyTrades).Methods("GET")

    log.Println("API server listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", r))
}

3.2 订单处理Handler

// internal/api/handler/order.go
package handler

import (
    "encoding/json"
    "net/http"

    "exchange/internal/domain/order"
    "exchange/internal/service"
)

type PlaceOrderRequest struct {
    Symbol      string  `json:"symbol"`
    Side        string  `json:"side"`
    Type        string  `json:"type"`
    Price       float64 `json:"price"`
    Quantity    float64 `json:"quantity"`
    TimeInForce string  `json:"time_in_force"`
}

func PlaceOrder(w http.ResponseWriter, r *http.Request) {
    userID := r.Context().Value("user_id").(int64)

    var req PlaceOrderRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // 创建订单
    order := &order.Order{
        OrderID:     generateOrderID(),
        UserID:      userID,
        Symbol:      req.Symbol,
        Side:        order.Side(req.Side),
        Type:        order.Type(req.Type),
        Price:       req.Price,
        Quantity:    req.Quantity,
        TimeInForce: order.TIF(req.TimeInForce),
        Status:      order.StatusPending,
    }

    // 提交订单
    orderService := service.NewOrderService()
    err := orderService.PlaceOrder(r.Context(), order)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(order)
}

func CancelOrder(w http.ResponseWriter, r *http.Request) {
    userID := r.Context().Value("user_id").(int64)
    orderID := mux.Vars(r)["orderId"]

    orderService := service.NewOrderService()
    err := orderService.CancelOrder(r.Context(), userID, orderID)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    w.WriteHeader(http.StatusOK)
}

func GetOrders(w http.ResponseWriter, r *http.Request) {
    userID := r.Context().Value("user_id").(int64)
    symbol := r.URL.Query().Get("symbol")

    orderService := service.NewOrderService()
    orders, err := orderService.GetOrders(r.Context(), userID, symbol)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(orders)
}

4. WebSocket实时推送

4.1 WebSocket服务器

// cmd/websocket/main.go
package main

import (
    "log"
    "net/http"

    "exchange/internal/websocket"
)

func main() {
    hub := websocket.NewHub()
    go hub.Run()

    // 订阅成交流
    go consumeTrades(hub)

    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        websocket.ServeWS(hub, w, r)
    })

    log.Println("WebSocket server listening on :8081")
    log.Fatal(http.ListenAndServe(":8081", nil))
}

func consumeTrades(hub *websocket.Hub) {
    consumer := mq.NewKafkaConsumer([]string{"localhost:9092"}, "websocket")

    for msg := range consumer.Messages("trades") {
        var trade trade.Trade
        json.Unmarshal(msg.Value, &trade)

        // 广播给订阅该交易对的所有客户端
        hub.Broadcast(trade.Symbol, msg.Value)
    }
}

4.2 Hub实现

// internal/websocket/hub.go
package websocket

import "sync"

type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    rooms      map[string]map[*Client]bool
    mu         sync.RWMutex
}

func NewHub() *Hub {
    return &Hub{
        clients:    make(map[*Client]bool),
        broadcast:  make(chan []byte, 10000),
        register:   make(chan *Client),
        unregister: make(chan *Client),
        rooms:      make(map[string]map[*Client]bool),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            h.mu.Unlock()

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client.send)

                // 从所有房间移除
                for room := range h.rooms {
                    delete(h.rooms[room], client)
                }
            }
            h.mu.Unlock()
        }
    }
}

func (h *Hub) Broadcast(room string, message []byte) {
    h.mu.RLock()
    defer h.mu.RUnlock()

    if clients, ok := h.rooms[room]; ok {
        for client := range clients {
            select {
            case client.send <- message:
            default:
                close(client.send)
                delete(h.clients, client)
                delete(h.rooms[room], client)
            }
        }
    }
}

func (h *Hub) Subscribe(client *Client, room string) {
    h.mu.Lock()
    defer h.mu.Unlock()

    if h.rooms[room] == nil {
        h.rooms[room] = make(map[*Client]bool)
    }

    h.rooms[room][client] = true
}

5. 部署配置

5.1 Docker Compose

# docker-compose.yml
version: '3.8'

services:
  # 数据库
  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: exchange
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql

  # Redis
  redis:
    image: redis:7.0
    ports:
      - "6379:6379"

  # Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    ports:
      - "9092:9092"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  # API服务
  api:
    build:
      context: .
      dockerfile: Dockerfile.api
    ports:
      - "8080:8080"
    depends_on:
      - mysql
      - redis
      - kafka

  # 撮合引擎
  matching:
    build:
      context: .
      dockerfile: Dockerfile.matching
    depends_on:
      - kafka

  # WebSocket服务
  websocket:
    build:
      context: .
      dockerfile: Dockerfile.websocket
    ports:
      - "8081:8081"
    depends_on:
      - kafka

  # Prometheus
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  # Grafana
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

volumes:
  mysql_data:

5.2 Kubernetes部署

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: matching-engine
spec:
  replicas: 3
  selector:
    matchLabels:
      app: matching-engine
  template:
    metadata:
      labels:
        app: matching-engine
    spec:
      containers:
      - name: matching
        image: exchange/matching:latest
        resources:
          requests:
            cpu: 1000m
            memory: 2Gi
          limits:
            cpu: 2000m
            memory: 4Gi
        env:
        - name: KAFKA_BROKERS
          value: "kafka:9092"

6. 测试

6.1 单元测试

// internal/service/matching/engine_test.go
package matching_test

import (
    "testing"

    "exchange/internal/domain/order"
    "exchange/internal/service/matching"
)

func TestMatchingEngine(t *testing.T) {
    engine := matching.NewEngine("BTCUSDT", nil)

    // 添加买单
    buyOrder := &order.Order{
        OrderID:  "buy1",
        Side:     order.SideBuy,
        Type:     order.TypeLimit,
        Price:    50000,
        Quantity: 1.0,
    }

    // 添加卖单
    sellOrder := &order.Order{
        OrderID:  "sell1",
        Side:     order.SideSell,
        Type:     order.TypeLimit,
        Price:    50000,
        Quantity: 1.0,
    }

    engine.AddOrder(buyOrder)
    engine.AddOrder(sellOrder)

    // 验证成交
    if buyOrder.Status != order.StatusFilled {
        t.Errorf("Expected buy order to be filled")
    }

    if sellOrder.Status != order.StatusFilled {
        t.Errorf("Expected sell order to be filled")
    }
}

6.2 集成测试

func TestPlaceOrderIntegration(t *testing.T) {
    // 启动测试服务器
    ts := httptest.NewServer(router)
    defer ts.Close()

    // 下单
    payload := `{
        "symbol": "BTCUSDT",
        "side": "buy",
        "type": "limit",
        "price": 50000,
        "quantity": 0.01
    }`

    req, _ := http.NewRequest("POST", ts.URL+"/api/v1/orders", strings.NewReader(payload))
    req.Header.Set("Authorization", "Bearer test_token")
    req.Header.Set("Content-Type", "application/json")

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        t.Fatal(err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        t.Errorf("Expected status 200, got %d", resp.StatusCode)
    }
}

小结

本章通过一个完整的交易所项目实战,整合了前面所有章节的知识点:

  1. 项目架构:微服务架构、技术栈选型、目录结构
  2. 核心模块:订单模型、撮合引擎、订单簿实现
  3. API服务:RESTful API、订单处理、查询接口
  4. 实时推送:WebSocket Hub、消息广播、订阅机制
  5. 部署配置:Docker Compose、Kubernetes部署
  6. 测试:单元测试、集成测试

完整代码示例展示了如何构建一个高性能、高可用的加密货币交易所系统。通过这个项目,你可以掌握交易所开发的完整流程,从架构设计到代码实现,再到部署上线。

下一步学习建议:

  • 添加更多交易对支持
  • 实现合约交易功能
  • 集成区块链充值提现
  • 优化撮合引擎性能
  • 完善风控系统
  • 添加管理后台

祝你在交易所开发之路上取得成功!

Prev
压力测试与性能优化