项目实战-完整交易所实现
1. 项目架构总览
1.1 技术栈
后端:
- Go 1.21+
- MySQL 8.0
- Redis 7.0
- Kafka 3.5
- WebSocket
前端:
- Vue 3
- TypeScript
- TradingView (K线图)
基础设施:
- Docker / Kubernetes
- Prometheus + Grafana
- ELK Stack
- Nginx
1.2 目录结构
exchange/
├── cmd/
│ ├── api/ # API服务
│ ├── matching/ # 撮合引擎
│ ├── market/ # 行情服务
│ ├── settlement/ # 清算服务
│ └── websocket/ # WebSocket服务
├── internal/
│ ├── domain/ # 领域模型
│ │ ├── order/
│ │ ├── trade/
│ │ ├── account/
│ │ └── position/
│ ├── repository/ # 数据访问层
│ │ ├── mysql/
│ │ └── redis/
│ ├── service/ # 业务逻辑层
│ │ ├── matching/
│ │ ├── risk/
│ │ └── wallet/
│ └── infrastructure/ # 基础设施
│ ├── cache/
│ ├── mq/
│ └── config/
├── api/ # API定义
│ └── proto/ # gRPC proto
├── web/ # 前端代码
│ ├── src/
│ ├── public/
│ └── package.json
├── scripts/ # 部署脚本
│ ├── deploy.sh
│ └── migrate.sh
├── docker-compose.yml
├── go.mod
└── README.md
2. 核心模块实现
2.1 订单模型
// internal/domain/order/order.go
package order
import (
"time"
)
type Order struct {
ID int64 `json:"id"`
OrderID string `json:"order_id"`
UserID int64 `json:"user_id"`
Symbol string `json:"symbol"`
Side Side `json:"side"`
Type Type `json:"type"`
Price float64 `json:"price"`
Quantity float64 `json:"quantity"`
FilledQty float64 `json:"filled_qty"`
FilledAmount float64 `json:"filled_amount"`
AvgPrice float64 `json:"avg_price"`
Status Status `json:"status"`
TimeInForce TIF `json:"time_in_force"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Side string
const (
SideBuy Side = "buy"
SideSell Side = "sell"
)
type Type string
const (
TypeLimit Type = "limit"
TypeMarket Type = "market"
)
type Status string
const (
StatusPending Status = "pending"
StatusPartial Status = "partial"
StatusFilled Status = "filled"
StatusCancelled Status = "cancelled"
StatusRejected Status = "rejected"
)
type TIF string
const (
TIFGTC TIF = "GTC" // Good Till Cancel
TIFIOC TIF = "IOC" // Immediate or Cancel
TIFFOK TIF = "FOK" // Fill or Kill
)
2.2 撮合引擎服务
// cmd/matching/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"exchange/internal/service/matching"
"exchange/internal/infrastructure/mq"
)
func main() {
// 初始化配置
cfg := loadConfig()
// 初始化Kafka
consumer := mq.NewKafkaConsumer(cfg.Kafka.Brokers, "matching-engine")
producer := mq.NewKafkaProducer(cfg.Kafka.Brokers)
// 创建撮合引擎
engines := make(map[string]*matching.Engine)
symbols := []string{"BTCUSDT", "ETHUSDT", "BNBUSDT"}
for _, symbol := range symbols {
engine := matching.NewEngine(symbol, producer)
engines[symbol] = engine
go engine.Start()
}
// 消费订单
go consumeOrders(consumer, engines)
// 优雅关闭
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down matching engine...")
for _, engine := range engines {
engine.Stop()
}
}
func consumeOrders(consumer *mq.KafkaConsumer, engines map[string]*matching.Engine) {
for msg := range consumer.Messages("orders") {
var order order.Order
json.Unmarshal(msg.Value, &order)
engine, exists := engines[order.Symbol]
if !exists {
log.Printf("Unknown symbol: %s", order.Symbol)
continue
}
engine.AddOrder(&order)
}
}
2.3 撮合引擎核心
// internal/service/matching/engine.go
package matching
import (
"encoding/json"
"sync"
"time"
"exchange/internal/domain/order"
"exchange/internal/domain/trade"
"exchange/internal/infrastructure/mq"
)
type Engine struct {
symbol string
orderBook *OrderBook
orderChan chan *order.Order
producer *mq.KafkaProducer
mu sync.RWMutex
stopped bool
}
func NewEngine(symbol string, producer *mq.KafkaProducer) *Engine {
return &Engine{
symbol: symbol,
orderBook: NewOrderBook(),
orderChan: make(chan *order.Order, 10000),
producer: producer,
}
}
func (e *Engine) Start() {
for order := range e.orderChan {
e.processOrder(order)
}
}
func (e *Engine) Stop() {
e.mu.Lock()
e.stopped = true
e.mu.Unlock()
close(e.orderChan)
}
func (e *Engine) AddOrder(order *order.Order) {
e.mu.RLock()
if e.stopped {
e.mu.RUnlock()
return
}
e.mu.RUnlock()
e.orderChan <- order
}
func (e *Engine) processOrder(order *order.Order) {
// 1. 验证订单
if err := e.validateOrder(order); err != nil {
e.rejectOrder(order, err)
return
}
// 2. 尝试匹配
trades := e.match(order)
// 3. 发布成交消息
for _, trade := range trades {
e.publishTrade(trade)
}
// 4. 如果未完全成交,加入订单簿
if order.FilledQty < order.Quantity {
if order.TimeInForce == order.TIFIOC || order.TimeInForce == order.TIFFOK {
// IOC/FOK订单,取消未成交部分
order.Status = order.StatusCancelled
} else {
// 加入订单簿
e.orderBook.Add(order)
order.Status = order.StatusPartial
}
} else {
order.Status = order.StatusFilled
}
// 5. 发布订单更新
e.publishOrderUpdate(order)
}
func (e *Engine) match(order *order.Order) []*trade.Trade {
var trades []*trade.Trade
if order.Side == order.SideBuy {
// 买单:匹配卖盘
for {
bestAsk := e.orderBook.BestAsk()
if bestAsk == nil {
break
}
// 限价单:价格不满足
if order.Type == order.TypeLimit && order.Price < bestAsk.Price {
break
}
// 成交
matchQty := min(order.Quantity-order.FilledQty, bestAsk.Quantity-bestAsk.FilledQty)
matchPrice := bestAsk.Price
trade := &trade.Trade{
TradeID: generateTradeID(),
Symbol: order.Symbol,
BuyOrderID: order.OrderID,
SellOrderID: bestAsk.OrderID,
BuyUserID: order.UserID,
SellUserID: bestAsk.UserID,
Price: matchPrice,
Quantity: matchQty,
Amount: matchPrice * matchQty,
BuyerIsMaker: false,
Timestamp: time.Now(),
}
trades = append(trades, trade)
// 更新订单
order.FilledQty += matchQty
order.FilledAmount += trade.Amount
order.AvgPrice = order.FilledAmount / order.FilledQty
bestAsk.FilledQty += matchQty
bestAsk.FilledAmount += trade.Amount
bestAsk.AvgPrice = bestAsk.FilledAmount / bestAsk.FilledQty
// 完全成交,移除订单簿
if bestAsk.FilledQty >= bestAsk.Quantity {
e.orderBook.Remove(bestAsk)
bestAsk.Status = order.StatusFilled
e.publishOrderUpdate(bestAsk)
}
// 买单完全成交,结束
if order.FilledQty >= order.Quantity {
break
}
}
} else {
// 卖单:匹配买盘(类似逻辑)
// ...省略
}
return trades
}
func (e *Engine) publishTrade(trade *trade.Trade) {
data, _ := json.Marshal(trade)
e.producer.Send("trades", data)
}
func (e *Engine) publishOrderUpdate(order *order.Order) {
data, _ := json.Marshal(order)
e.producer.Send("order_updates", data)
}
func (e *Engine) validateOrder(order *order.Order) error {
if order.Quantity <= 0 {
return errors.New("invalid quantity")
}
if order.Type == order.TypeLimit && order.Price <= 0 {
return errors.New("invalid price")
}
return nil
}
func (e *Engine) rejectOrder(order *order.Order, err error) {
order.Status = order.StatusRejected
e.publishOrderUpdate(order)
}
2.4 订单簿实现
// internal/service/matching/orderbook.go
package matching
import (
"container/list"
"sync"
"exchange/internal/domain/order"
)
type OrderBook struct {
bids *PriceLevel // 买盘(价格从高到低)
asks *PriceLevel // 卖盘(价格从低到高)
mu sync.RWMutex
}
type PriceLevel struct {
levels map[float64]*list.List
}
func NewOrderBook() *OrderBook {
return &OrderBook{
bids: &PriceLevel{levels: make(map[float64]*list.List)},
asks: &PriceLevel{levels: make(map[float64]*list.List)},
}
}
func (ob *OrderBook) Add(order *order.Order) {
ob.mu.Lock()
defer ob.mu.Unlock()
var pl *PriceLevel
if order.Side == order.SideBuy {
pl = ob.bids
} else {
pl = ob.asks
}
if pl.levels[order.Price] == nil {
pl.levels[order.Price] = list.New()
}
pl.levels[order.Price].PushBack(order)
}
func (ob *OrderBook) Remove(order *order.Order) {
ob.mu.Lock()
defer ob.mu.Unlock()
var pl *PriceLevel
if order.Side == order.SideBuy {
pl = ob.bids
} else {
pl = ob.asks
}
if l := pl.levels[order.Price]; l != nil {
for e := l.Front(); e != nil; e = e.Next() {
if e.Value.(*order.Order).OrderID == order.OrderID {
l.Remove(e)
if l.Len() == 0 {
delete(pl.levels, order.Price)
}
break
}
}
}
}
func (ob *OrderBook) BestBid() *order.Order {
ob.mu.RLock()
defer ob.mu.RUnlock()
var bestPrice float64
for price := range ob.bids.levels {
if price > bestPrice {
bestPrice = price
}
}
if bestPrice == 0 {
return nil
}
l := ob.bids.levels[bestPrice]
if l.Len() == 0 {
return nil
}
return l.Front().Value.(*order.Order)
}
func (ob *OrderBook) BestAsk() *order.Order {
ob.mu.RLock()
defer ob.mu.RUnlock()
var bestPrice float64 = -1
for price := range ob.asks.levels {
if bestPrice < 0 || price < bestPrice {
bestPrice = price
}
}
if bestPrice < 0 {
return nil
}
l := ob.asks.levels[bestPrice]
if l.Len() == 0 {
return nil
}
return l.Front().Value.(*order.Order)
}
3. API服务实现
3.1 HTTP API
// cmd/api/main.go
package main
import (
"log"
"net/http"
"github.com/gorilla/mux"
"exchange/internal/api/handler"
"exchange/internal/api/middleware"
)
func main() {
r := mux.NewRouter()
// 中间件
r.Use(middleware.CORS)
r.Use(middleware.Logging)
r.Use(middleware.RateLimiting)
// 公开API
r.HandleFunc("/api/v1/ticker/{symbol}", handler.GetTicker).Methods("GET")
r.HandleFunc("/api/v1/depth/{symbol}", handler.GetDepth).Methods("GET")
r.HandleFunc("/api/v1/trades/{symbol}", handler.GetTrades).Methods("GET")
r.HandleFunc("/api/v1/klines/{symbol}", handler.GetKlines).Methods("GET")
// 认证API
auth := r.PathPrefix("/api/v1").Subrouter()
auth.Use(middleware.JWTAuth)
auth.HandleFunc("/account/balance", handler.GetBalance).Methods("GET")
auth.HandleFunc("/orders", handler.PlaceOrder).Methods("POST")
auth.HandleFunc("/orders/{orderId}", handler.CancelOrder).Methods("DELETE")
auth.HandleFunc("/orders", handler.GetOrders).Methods("GET")
auth.HandleFunc("/trades", handler.GetMyTrades).Methods("GET")
log.Println("API server listening on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}
3.2 订单处理Handler
// internal/api/handler/order.go
package handler
import (
"encoding/json"
"net/http"
"exchange/internal/domain/order"
"exchange/internal/service"
)
type PlaceOrderRequest struct {
Symbol string `json:"symbol"`
Side string `json:"side"`
Type string `json:"type"`
Price float64 `json:"price"`
Quantity float64 `json:"quantity"`
TimeInForce string `json:"time_in_force"`
}
func PlaceOrder(w http.ResponseWriter, r *http.Request) {
userID := r.Context().Value("user_id").(int64)
var req PlaceOrderRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 创建订单
order := &order.Order{
OrderID: generateOrderID(),
UserID: userID,
Symbol: req.Symbol,
Side: order.Side(req.Side),
Type: order.Type(req.Type),
Price: req.Price,
Quantity: req.Quantity,
TimeInForce: order.TIF(req.TimeInForce),
Status: order.StatusPending,
}
// 提交订单
orderService := service.NewOrderService()
err := orderService.PlaceOrder(r.Context(), order)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(order)
}
func CancelOrder(w http.ResponseWriter, r *http.Request) {
userID := r.Context().Value("user_id").(int64)
orderID := mux.Vars(r)["orderId"]
orderService := service.NewOrderService()
err := orderService.CancelOrder(r.Context(), userID, orderID)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
}
func GetOrders(w http.ResponseWriter, r *http.Request) {
userID := r.Context().Value("user_id").(int64)
symbol := r.URL.Query().Get("symbol")
orderService := service.NewOrderService()
orders, err := orderService.GetOrders(r.Context(), userID, symbol)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(orders)
}
4. WebSocket实时推送
4.1 WebSocket服务器
// cmd/websocket/main.go
package main
import (
"log"
"net/http"
"exchange/internal/websocket"
)
func main() {
hub := websocket.NewHub()
go hub.Run()
// 订阅成交流
go consumeTrades(hub)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
websocket.ServeWS(hub, w, r)
})
log.Println("WebSocket server listening on :8081")
log.Fatal(http.ListenAndServe(":8081", nil))
}
func consumeTrades(hub *websocket.Hub) {
consumer := mq.NewKafkaConsumer([]string{"localhost:9092"}, "websocket")
for msg := range consumer.Messages("trades") {
var trade trade.Trade
json.Unmarshal(msg.Value, &trade)
// 广播给订阅该交易对的所有客户端
hub.Broadcast(trade.Symbol, msg.Value)
}
}
4.2 Hub实现
// internal/websocket/hub.go
package websocket
import "sync"
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
rooms map[string]map[*Client]bool
mu sync.RWMutex
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte, 10000),
register: make(chan *Client),
unregister: make(chan *Client),
rooms: make(map[string]map[*Client]bool),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
// 从所有房间移除
for room := range h.rooms {
delete(h.rooms[room], client)
}
}
h.mu.Unlock()
}
}
}
func (h *Hub) Broadcast(room string, message []byte) {
h.mu.RLock()
defer h.mu.RUnlock()
if clients, ok := h.rooms[room]; ok {
for client := range clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
delete(h.rooms[room], client)
}
}
}
}
func (h *Hub) Subscribe(client *Client, room string) {
h.mu.Lock()
defer h.mu.Unlock()
if h.rooms[room] == nil {
h.rooms[room] = make(map[*Client]bool)
}
h.rooms[room][client] = true
}
5. 部署配置
5.1 Docker Compose
# docker-compose.yml
version: '3.8'
services:
# 数据库
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: exchange
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
# Redis
redis:
image: redis:7.0
ports:
- "6379:6379"
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
ports:
- "9092:9092"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# API服务
api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "8080:8080"
depends_on:
- mysql
- redis
- kafka
# 撮合引擎
matching:
build:
context: .
dockerfile: Dockerfile.matching
depends_on:
- kafka
# WebSocket服务
websocket:
build:
context: .
dockerfile: Dockerfile.websocket
ports:
- "8081:8081"
depends_on:
- kafka
# Prometheus
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
# Grafana
grafana:
image: grafana/grafana
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
mysql_data:
5.2 Kubernetes部署
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: matching-engine
spec:
replicas: 3
selector:
matchLabels:
app: matching-engine
template:
metadata:
labels:
app: matching-engine
spec:
containers:
- name: matching
image: exchange/matching:latest
resources:
requests:
cpu: 1000m
memory: 2Gi
limits:
cpu: 2000m
memory: 4Gi
env:
- name: KAFKA_BROKERS
value: "kafka:9092"
6. 测试
6.1 单元测试
// internal/service/matching/engine_test.go
package matching_test
import (
"testing"
"exchange/internal/domain/order"
"exchange/internal/service/matching"
)
func TestMatchingEngine(t *testing.T) {
engine := matching.NewEngine("BTCUSDT", nil)
// 添加买单
buyOrder := &order.Order{
OrderID: "buy1",
Side: order.SideBuy,
Type: order.TypeLimit,
Price: 50000,
Quantity: 1.0,
}
// 添加卖单
sellOrder := &order.Order{
OrderID: "sell1",
Side: order.SideSell,
Type: order.TypeLimit,
Price: 50000,
Quantity: 1.0,
}
engine.AddOrder(buyOrder)
engine.AddOrder(sellOrder)
// 验证成交
if buyOrder.Status != order.StatusFilled {
t.Errorf("Expected buy order to be filled")
}
if sellOrder.Status != order.StatusFilled {
t.Errorf("Expected sell order to be filled")
}
}
6.2 集成测试
func TestPlaceOrderIntegration(t *testing.T) {
// 启动测试服务器
ts := httptest.NewServer(router)
defer ts.Close()
// 下单
payload := `{
"symbol": "BTCUSDT",
"side": "buy",
"type": "limit",
"price": 50000,
"quantity": 0.01
}`
req, _ := http.NewRequest("POST", ts.URL+"/api/v1/orders", strings.NewReader(payload))
req.Header.Set("Authorization", "Bearer test_token")
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected status 200, got %d", resp.StatusCode)
}
}
小结
本章通过一个完整的交易所项目实战,整合了前面所有章节的知识点:
- 项目架构:微服务架构、技术栈选型、目录结构
- 核心模块:订单模型、撮合引擎、订单簿实现
- API服务:RESTful API、订单处理、查询接口
- 实时推送:WebSocket Hub、消息广播、订阅机制
- 部署配置:Docker Compose、Kubernetes部署
- 测试:单元测试、集成测试
完整代码示例展示了如何构建一个高性能、高可用的加密货币交易所系统。通过这个项目,你可以掌握交易所开发的完整流程,从架构设计到代码实现,再到部署上线。
下一步学习建议:
- 添加更多交易对支持
- 实现合约交易功能
- 集成区块链充值提现
- 优化撮合引擎性能
- 完善风控系统
- 添加管理后台
祝你在交易所开发之路上取得成功!