HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

压力测试与性能优化

1. 压力测试工具

1.1 Apache Bench (ab)

简单快速的HTTP压测工具。

# 基础测试:1000个请求,并发10
ab -n 1000 -c 10 http://api.exchange.com/api/ticker/BTCUSDT

# 带认证头的测试
ab -n 1000 -c 10 -H "Authorization: Bearer token" http://api.exchange.com/api/orders

# POST请求测试
ab -n 1000 -c 10 -p order.json -T application/json http://api.exchange.com/api/orders

# 查看结果
# Requests per second: 1000 [#/sec]
# Time per request: 10ms [mean]
# Transfer rate: 500KB/sec

1.2 wrk

更强大的HTTP压测工具,支持Lua脚本。

# 基础测试:12个线程,400个连接,持续30秒
wrk -t12 -c400 -d30s http://api.exchange.com/api/ticker/BTCUSDT

# 使用Lua脚本模拟真实场景
wrk -t12 -c400 -d30s -s order.lua http://api.exchange.com/api/orders

order.lua脚本:

-- order.lua
math.randomseed(os.time())

request = function()
    -- 随机生成订单参数
    local symbols = {"BTCUSDT", "ETHUSDT", "BNBUSDT"}
    local sides = {"buy", "sell"}

    local symbol = symbols[math.random(#symbols)]
    local side = sides[math.random(#sides)]
    local price = math.random(40000, 50000)
    local quantity = math.random(1, 10) / 10

    local body = string.format([[{
        "symbol": "%s",
        "side": "%s",
        "type": "limit",
        "price": %d,
        "quantity": %.1f
    }]], symbol, side, price, quantity)

    return wrk.format("POST", nil, {
        ["Content-Type"] = "application/json",
        ["Authorization"] = "Bearer test_token"
    }, body)
end

1.3 Locust

Python编写的分布式压测工具。

# locustfile.py
from locust import HttpUser, task, between
import random

class ExchangeUser(HttpUser):
    wait_time = between(1, 3)  # 用户操作间隔1-3秒

    def on_start(self):
        # 登录
        response = self.client.post("/api/auth/login", json={
            "email": "test@example.com",
            "password": "password123"
        })
        self.token = response.json()["access_token"]

    @task(3)  # 权重3
    def get_ticker(self):
        symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
        symbol = random.choice(symbols)

        self.client.get(f"/api/ticker/{symbol}", headers={
            "Authorization": f"Bearer {self.token}"
        })

    @task(2)  # 权重2
    def get_depth(self):
        symbols = ["BTCUSDT", "ETHUSDT"]
        symbol = random.choice(symbols)

        self.client.get(f"/api/depth/{symbol}", headers={
            "Authorization": f"Bearer {self.token}"
        })

    @task(1)  # 权重1
    def place_order(self):
        self.client.post("/api/orders", json={
            "symbol": "BTCUSDT",
            "side": random.choice(["buy", "sell"]),
            "type": "limit",
            "price": random.randint(40000, 50000),
            "quantity": round(random.uniform(0.001, 0.01), 3)
        }, headers={
            "Authorization": f"Bearer {self.token}"
        })

    @task(1)
    def get_balance(self):
        self.client.get("/api/account/balance", headers={
            "Authorization": f"Bearer {self.token}"
        })

运行测试:

# 启动Web UI
locust -f locustfile.py --host=http://api.exchange.com

# 命令行模式:100用户,每秒增加10个,持续5分钟
locust -f locustfile.py --host=http://api.exchange.com \
    --headless -u 100 -r 10 -t 5m

1.4 k6

现代化的负载测试工具,使用JavaScript编写测试脚本。

// k6-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';

export let options = {
    stages: [
        { duration: '2m', target: 100 },  // 2分钟内升到100用户
        { duration: '5m', target: 100 },  // 保持100用户5分钟
        { duration: '2m', target: 200 },  // 升到200用户
        { duration: '5m', target: 200 },  // 保持200用户
        { duration: '2m', target: 0 },    // 降到0
    ],
    thresholds: {
        http_req_duration: ['p(95)<500'], // 95%请求<500ms
        http_req_failed: ['rate<0.01'],   // 错误率<1%
    },
};

export default function () {
    // 获取行情
    let tickerRes = http.get('http://api.exchange.com/api/ticker/BTCUSDT');
    check(tickerRes, {
        'ticker status is 200': (r) => r.status === 200,
        'ticker response time < 200ms': (r) => r.timings.duration < 200,
    });

    sleep(1);

    // 下单
    let orderPayload = JSON.stringify({
        symbol: 'BTCUSDT',
        side: 'buy',
        type: 'limit',
        price: 50000,
        quantity: 0.01,
    });

    let params = {
        headers: {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer test_token',
        },
    };

    let orderRes = http.post('http://api.exchange.com/api/orders', orderPayload, params);
    check(orderRes, {
        'order status is 200': (r) => r.status === 200,
        'order created': (r) => r.json('order_id') != null,
    });

    sleep(2);
}

运行测试:

k6 run k6-test.js

2. 撮合引擎压测

2.1 撮合引擎性能基准

package benchmark

import (
    "testing"
    "math/rand"
)

func BenchmarkMatchingEngine(b *testing.B) {
    engine := NewMatchingEngine("BTCUSDT")

    // 预先填充订单簿
    for i := 0; i < 1000; i++ {
        engine.AddOrder(&Order{
            OrderID:  fmt.Sprintf("buy_%d", i),
            Side:     "buy",
            Price:    49000 + rand.Float64()*1000,
            Quantity: 0.01,
        })

        engine.AddOrder(&Order{
            OrderID:  fmt.Sprintf("sell_%d", i),
            Side:     "sell",
            Price:    50000 + rand.Float64()*1000,
            Quantity: 0.01,
        })
    }

    b.ResetTimer()

    // 测试性能
    for i := 0; i < b.N; i++ {
        order := &Order{
            OrderID:  fmt.Sprintf("test_%d", i),
            Side:     []string{"buy", "sell"}[i%2],
            Price:    49500 + rand.Float64()*1000,
            Quantity: 0.01,
        }

        engine.AddOrder(order)
    }
}

// 运行:go test -bench=. -benchtime=10s
// 结果示例:
// BenchmarkMatchingEngine-8    1000000    1234 ns/op
// 说明:每秒可处理约 810,000 笔订单

2.2 并发压测

func TestMatchingEngineConcurrency(t *testing.T) {
    engine := NewMatchingEngine("BTCUSDT")

    // 并发数
    concurrency := 100

    // 每个协程发送的订单数
    ordersPerRoutine := 10000

    var wg sync.WaitGroup
    start := time.Now()

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            for j := 0; j < ordersPerRoutine; j++ {
                order := &Order{
                    OrderID:  fmt.Sprintf("%d_%d", id, j),
                    Side:     []string{"buy", "sell"}[j%2],
                    Price:    49500 + rand.Float64()*1000,
                    Quantity: 0.01,
                }

                engine.AddOrder(order)
            }
        }(i)
    }

    wg.Wait()
    elapsed := time.Since(start)

    totalOrders := concurrency * ordersPerRoutine
    tps := float64(totalOrders) / elapsed.Seconds()

    fmt.Printf("Total orders: %d\n", totalOrders)
    fmt.Printf("Time elapsed: %v\n", elapsed)
    fmt.Printf("TPS: %.2f\n", tps)

    // 示例输出:
    // Total orders: 1000000
    // Time elapsed: 5.2s
    // TPS: 192307.69
}

3. 数据库性能优化

3.1 慢查询分析

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 0.5;  -- 记录超过0.5秒的查询
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';

-- 分析慢查询
-- 使用 pt-query-digest 工具
pt-query-digest /var/log/mysql/slow.log > slow_query_report.txt

优化示例:

-- 慢查询:查询用户订单(全表扫描)
SELECT * FROM orders WHERE user_id = 1001 ORDER BY created_at DESC LIMIT 20;

-- EXPLAIN分析
EXPLAIN SELECT * FROM orders WHERE user_id = 1001 ORDER BY created_at DESC LIMIT 20;
-- type: ALL (全表扫描)
-- rows: 1000000 (扫描100万行)

-- 优化:添加复合索引
CREATE INDEX idx_user_created ON orders(user_id, created_at DESC);

-- 优化后
EXPLAIN SELECT * FROM orders WHERE user_id = 1001 ORDER BY created_at DESC LIMIT 20;
-- type: ref (索引查找)
-- rows: 100 (只扫描100行)

-- 性能提升:从1.2s降到0.01s

3.2 连接池调优

import "database/sql"

func InitDB(dsn string) (*sql.DB, error) {
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        return nil, err
    }

    // 最大打开连接数
    db.SetMaxOpenConns(100)

    // 最大空闲连接数
    db.SetMaxIdleConns(20)

    // 连接最大生命周期
    db.SetConnMaxLifetime(time.Hour)

    // 连接最大空闲时间
    db.SetConnMaxIdleTime(10 * time.Minute)

    return db, nil
}

// 监控连接池状态
func MonitorDBStats(db *sql.DB) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        stats := db.Stats()

        log.Printf("DB Stats: Open=%d, InUse=%d, Idle=%d, Waiting=%d",
            stats.OpenConnections,
            stats.InUse,
            stats.Idle,
            stats.WaitCount,
        )

        // 如果等待连接数过多,需要增加MaxOpenConns
        if stats.WaitCount > 100 {
            log.Println("Warning: Too many connections waiting")
        }
    }
}

3.3 批量操作优化

// 慢:逐条插入
func InsertTradesSlow(trades []*Trade) error {
    for _, trade := range trades {
        _, err := db.Exec(`
            INSERT INTO trades (trade_id, symbol, price, quantity, created_at)
            VALUES (?, ?, ?, ?, ?)
        `, trade.TradeID, trade.Symbol, trade.Price, trade.Quantity, trade.CreatedAt)

        if err != nil {
            return err
        }
    }
    return nil
}

// 快:批量插入
func InsertTradesFast(trades []*Trade) error {
    if len(trades) == 0 {
        return nil
    }

    // 构建批量插入SQL
    valueStrings := make([]string, 0, len(trades))
    valueArgs := make([]interface{}, 0, len(trades)*5)

    for _, trade := range trades {
        valueStrings = append(valueStrings, "(?, ?, ?, ?, ?)")
        valueArgs = append(valueArgs, trade.TradeID, trade.Symbol, trade.Price,
            trade.Quantity, trade.CreatedAt)
    }

    query := fmt.Sprintf(`
        INSERT INTO trades (trade_id, symbol, price, quantity, created_at)
        VALUES %s
    `, strings.Join(valueStrings, ","))

    _, err := db.Exec(query, valueArgs...)
    return err
}

// 性能对比:
// 插入10000条记录
// InsertTradesSlow: 8.2s
// InsertTradesFast: 0.3s  (快27倍)

4. Redis优化

4.1 Pipeline批量操作

// 慢:逐个SET
func SetTickersSlow(tickers map[string]*Ticker) error {
    for symbol, ticker := range tickers {
        key := fmt.Sprintf("ticker:%s", symbol)
        data, _ := json.Marshal(ticker)
        err := redisClient.Set(ctx, key, data, 5*time.Second).Err()
        if err != nil {
            return err
        }
    }
    return nil
}

// 快:Pipeline批量操作
func SetTickersFast(tickers map[string]*Ticker) error {
    pipe := redisClient.Pipeline()

    for symbol, ticker := range tickers {
        key := fmt.Sprintf("ticker:%s", symbol)
        data, _ := json.Marshal(ticker)
        pipe.Set(ctx, key, data, 5*time.Second)
    }

    _, err := pipe.Exec(ctx)
    return err
}

// 性能对比:
// 更新1000个ticker
// SetTickersSlow: 1.2s (往返1000次)
// SetTickersFast: 0.05s (往返1次,快24倍)

4.2 Lua脚本原子操作

// 使用Lua脚本实现原子操作
var updateTickerScript = redis.NewScript(`
    local key = KEYS[1]
    local lastPrice = tonumber(ARGV[1])
    local volume = tonumber(ARGV[2])

    redis.call('HSET', key, 'lastPrice', lastPrice)
    redis.call('HINCRBY', key, 'volume', volume)

    local high = redis.call('HGET', key, 'highPrice')
    if not high or lastPrice > tonumber(high) then
        redis.call('HSET', key, 'highPrice', lastPrice)
    end

    local low = redis.call('HGET', key, 'lowPrice')
    if not low or lastPrice < tonumber(low) then
        redis.call('HSET', key, 'lowPrice', lastPrice)
    end

    return redis.call('HGETALL', key)
`)

func UpdateTicker(symbol string, lastPrice, volume float64) error {
    key := fmt.Sprintf("ticker:%s", symbol)

    _, err := updateTickerScript.Run(ctx, redisClient,
        []string{key},
        lastPrice, volume,
    ).Result()

    return err
}

4.3 缓存预热

type CacheWarmer struct {
    db    *sql.DB
    redis *redis.Client
}

func (cw *CacheWarmer) WarmUpTickers() error {
    // 从数据库加载所有ticker
    rows, err := cw.db.Query(`
        SELECT symbol, last_price, open_price, high_price, low_price, volume
        FROM tickers
    `)
    if err != nil {
        return err
    }
    defer rows.Close()

    pipe := cw.redis.Pipeline()

    for rows.Next() {
        var ticker Ticker
        rows.Scan(&ticker.Symbol, &ticker.LastPrice, &ticker.OpenPrice,
            &ticker.HighPrice, &ticker.LowPrice, &ticker.Volume)

        key := fmt.Sprintf("ticker:%s", ticker.Symbol)
        data, _ := json.Marshal(ticker)

        pipe.Set(context.Background(), key, data, 24*time.Hour)
    }

    _, err = pipe.Exec(context.Background())
    return err
}

// 在服务启动时预热缓存
func main() {
    warmer := &CacheWarmer{db: db, redis: redisClient}

    log.Println("Warming up cache...")
    err := warmer.WarmUpTickers()
    if err != nil {
        log.Fatalf("Cache warmup failed: %v", err)
    }
    log.Println("Cache warmup completed")

    // 启动服务
    // ...
}

5. API性能优化

5.1 响应压缩

import "github.com/gorilla/handlers"

func main() {
    router := mux.NewRouter()

    // 启用Gzip压缩
    handler := handlers.CompressHandler(router)

    http.ListenAndServe(":8080", handler)
}

// 效果:
// 未压缩: 50KB
// Gzip压缩: 8KB (减少84%)

5.2 连接复用

// HTTP客户端配置
var httpClient = &http.Client{
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        IdleConnTimeout:     90 * time.Second,
        DisableKeepAlives:   false,
    },
    Timeout: 10 * time.Second,
}

// 使用单例client,避免每次请求创建新连接
func FetchExternalData(url string) ([]byte, error) {
    resp, err := httpClient.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    return ioutil.ReadAll(resp.Body)
}

5.3 并发限制

import "golang.org/x/time/rate"

type RateLimitedHandler struct {
    limiter *rate.Limiter
    handler http.Handler
}

func (rl *RateLimitedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if !rl.limiter.Allow() {
        http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
        return
    }

    rl.handler.ServeHTTP(w, r)
}

func main() {
    router := mux.NewRouter()

    // 限制:每秒1000个请求
    limiter := rate.NewLimiter(rate.Limit(1000), 2000)

    handler := &RateLimitedHandler{
        limiter: limiter,
        handler: router,
    }

    http.ListenAndServe(":8080", handler)
}

6. 性能监控与分析

6.1 pprof性能分析

import _ "net/http/pprof"

func main() {
    // 启动pprof服务器
    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()

    // 启动应用
    // ...
}

使用pprof:

# CPU profiling
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30

# 内存分析
go tool pprof http://localhost:6060/debug/pprof/heap

# 查看goroutine
go tool pprof http://localhost:6060/debug/pprof/goroutine

# 生成火焰图
go tool pprof -http=:8081 http://localhost:6060/debug/pprof/profile?seconds=30

6.2 trace追踪

import "runtime/trace"

func main() {
    f, _ := os.Create("trace.out")
    defer f.Close()

    trace.Start(f)
    defer trace.Stop()

    // 运行代码
    benchmarkMatchingEngine()
}

// 分析trace
// go tool trace trace.out

6.3 基准测试最佳实践

func BenchmarkOrderBook(b *testing.B) {
    ob := NewOrderBook()

    // 预热
    for i := 0; i < 1000; i++ {
        ob.AddOrder(&Order{
            OrderID:  fmt.Sprintf("warmup_%d", i),
            Side:     "buy",
            Price:    50000,
            Quantity: 0.01,
        })
    }

    // 重置计时器
    b.ResetTimer()

    // 报告内存分配
    b.ReportAllocs()

    // 并发测试
    b.RunParallel(func(pb *testing.PB) {
        i := 0
        for pb.Next() {
            ob.AddOrder(&Order{
                OrderID:  fmt.Sprintf("bench_%d", i),
                Side:     "buy",
                Price:    50000 + float64(i%1000),
                Quantity: 0.01,
            })
            i++
        }
    })
}

// 运行:
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof

// 结果示例:
// BenchmarkOrderBook-8    5000000    287 ns/op    128 B/op    2 allocs/op

小结

本章介绍了压力测试与性能优化:

  1. 压力测试工具:ab、wrk、Locust、k6的使用
  2. 撮合引擎压测:性能基准、并发测试
  3. 数据库优化:慢查询分析、连接池调优、批量操作
  4. Redis优化:Pipeline、Lua脚本、缓存预热
  5. API优化:响应压缩、连接复用、限流
  6. 性能分析:pprof、trace、基准测试

下一章是项目实战,将综合所有知识点实现一个完整的交易所。

Prev
高可用架构设计
Next
项目实战-完整交易所实现