压力测试与性能优化
1. 压力测试工具
1.1 Apache Bench (ab)
简单快速的HTTP压测工具。
# 基础测试:1000个请求,并发10
ab -n 1000 -c 10 http://api.exchange.com/api/ticker/BTCUSDT
# 带认证头的测试
ab -n 1000 -c 10 -H "Authorization: Bearer token" http://api.exchange.com/api/orders
# POST请求测试
ab -n 1000 -c 10 -p order.json -T application/json http://api.exchange.com/api/orders
# 查看结果
# Requests per second: 1000 [#/sec]
# Time per request: 10ms [mean]
# Transfer rate: 500KB/sec
1.2 wrk
更强大的HTTP压测工具,支持Lua脚本。
# 基础测试:12个线程,400个连接,持续30秒
wrk -t12 -c400 -d30s http://api.exchange.com/api/ticker/BTCUSDT
# 使用Lua脚本模拟真实场景
wrk -t12 -c400 -d30s -s order.lua http://api.exchange.com/api/orders
order.lua脚本:
-- order.lua
math.randomseed(os.time())
request = function()
-- 随机生成订单参数
local symbols = {"BTCUSDT", "ETHUSDT", "BNBUSDT"}
local sides = {"buy", "sell"}
local symbol = symbols[math.random(#symbols)]
local side = sides[math.random(#sides)]
local price = math.random(40000, 50000)
local quantity = math.random(1, 10) / 10
local body = string.format([[{
"symbol": "%s",
"side": "%s",
"type": "limit",
"price": %d,
"quantity": %.1f
}]], symbol, side, price, quantity)
return wrk.format("POST", nil, {
["Content-Type"] = "application/json",
["Authorization"] = "Bearer test_token"
}, body)
end
1.3 Locust
Python编写的分布式压测工具。
# locustfile.py
from locust import HttpUser, task, between
import random
class ExchangeUser(HttpUser):
wait_time = between(1, 3) # 用户操作间隔1-3秒
def on_start(self):
# 登录
response = self.client.post("/api/auth/login", json={
"email": "test@example.com",
"password": "password123"
})
self.token = response.json()["access_token"]
@task(3) # 权重3
def get_ticker(self):
symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
symbol = random.choice(symbols)
self.client.get(f"/api/ticker/{symbol}", headers={
"Authorization": f"Bearer {self.token}"
})
@task(2) # 权重2
def get_depth(self):
symbols = ["BTCUSDT", "ETHUSDT"]
symbol = random.choice(symbols)
self.client.get(f"/api/depth/{symbol}", headers={
"Authorization": f"Bearer {self.token}"
})
@task(1) # 权重1
def place_order(self):
self.client.post("/api/orders", json={
"symbol": "BTCUSDT",
"side": random.choice(["buy", "sell"]),
"type": "limit",
"price": random.randint(40000, 50000),
"quantity": round(random.uniform(0.001, 0.01), 3)
}, headers={
"Authorization": f"Bearer {self.token}"
})
@task(1)
def get_balance(self):
self.client.get("/api/account/balance", headers={
"Authorization": f"Bearer {self.token}"
})
运行测试:
# 启动Web UI
locust -f locustfile.py --host=http://api.exchange.com
# 命令行模式:100用户,每秒增加10个,持续5分钟
locust -f locustfile.py --host=http://api.exchange.com \
--headless -u 100 -r 10 -t 5m
1.4 k6
现代化的负载测试工具,使用JavaScript编写测试脚本。
// k6-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
export let options = {
stages: [
{ duration: '2m', target: 100 }, // 2分钟内升到100用户
{ duration: '5m', target: 100 }, // 保持100用户5分钟
{ duration: '2m', target: 200 }, // 升到200用户
{ duration: '5m', target: 200 }, // 保持200用户
{ duration: '2m', target: 0 }, // 降到0
],
thresholds: {
http_req_duration: ['p(95)<500'], // 95%请求<500ms
http_req_failed: ['rate<0.01'], // 错误率<1%
},
};
export default function () {
// 获取行情
let tickerRes = http.get('http://api.exchange.com/api/ticker/BTCUSDT');
check(tickerRes, {
'ticker status is 200': (r) => r.status === 200,
'ticker response time < 200ms': (r) => r.timings.duration < 200,
});
sleep(1);
// 下单
let orderPayload = JSON.stringify({
symbol: 'BTCUSDT',
side: 'buy',
type: 'limit',
price: 50000,
quantity: 0.01,
});
let params = {
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test_token',
},
};
let orderRes = http.post('http://api.exchange.com/api/orders', orderPayload, params);
check(orderRes, {
'order status is 200': (r) => r.status === 200,
'order created': (r) => r.json('order_id') != null,
});
sleep(2);
}
运行测试:
k6 run k6-test.js
2. 撮合引擎压测
2.1 撮合引擎性能基准
package benchmark
import (
"testing"
"math/rand"
)
func BenchmarkMatchingEngine(b *testing.B) {
engine := NewMatchingEngine("BTCUSDT")
// 预先填充订单簿
for i := 0; i < 1000; i++ {
engine.AddOrder(&Order{
OrderID: fmt.Sprintf("buy_%d", i),
Side: "buy",
Price: 49000 + rand.Float64()*1000,
Quantity: 0.01,
})
engine.AddOrder(&Order{
OrderID: fmt.Sprintf("sell_%d", i),
Side: "sell",
Price: 50000 + rand.Float64()*1000,
Quantity: 0.01,
})
}
b.ResetTimer()
// 测试性能
for i := 0; i < b.N; i++ {
order := &Order{
OrderID: fmt.Sprintf("test_%d", i),
Side: []string{"buy", "sell"}[i%2],
Price: 49500 + rand.Float64()*1000,
Quantity: 0.01,
}
engine.AddOrder(order)
}
}
// 运行:go test -bench=. -benchtime=10s
// 结果示例:
// BenchmarkMatchingEngine-8 1000000 1234 ns/op
// 说明:每秒可处理约 810,000 笔订单
2.2 并发压测
func TestMatchingEngineConcurrency(t *testing.T) {
engine := NewMatchingEngine("BTCUSDT")
// 并发数
concurrency := 100
// 每个协程发送的订单数
ordersPerRoutine := 10000
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < ordersPerRoutine; j++ {
order := &Order{
OrderID: fmt.Sprintf("%d_%d", id, j),
Side: []string{"buy", "sell"}[j%2],
Price: 49500 + rand.Float64()*1000,
Quantity: 0.01,
}
engine.AddOrder(order)
}
}(i)
}
wg.Wait()
elapsed := time.Since(start)
totalOrders := concurrency * ordersPerRoutine
tps := float64(totalOrders) / elapsed.Seconds()
fmt.Printf("Total orders: %d\n", totalOrders)
fmt.Printf("Time elapsed: %v\n", elapsed)
fmt.Printf("TPS: %.2f\n", tps)
// 示例输出:
// Total orders: 1000000
// Time elapsed: 5.2s
// TPS: 192307.69
}
3. 数据库性能优化
3.1 慢查询分析
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 0.5; -- 记录超过0.5秒的查询
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
-- 分析慢查询
-- 使用 pt-query-digest 工具
pt-query-digest /var/log/mysql/slow.log > slow_query_report.txt
优化示例:
-- 慢查询:查询用户订单(全表扫描)
SELECT * FROM orders WHERE user_id = 1001 ORDER BY created_at DESC LIMIT 20;
-- EXPLAIN分析
EXPLAIN SELECT * FROM orders WHERE user_id = 1001 ORDER BY created_at DESC LIMIT 20;
-- type: ALL (全表扫描)
-- rows: 1000000 (扫描100万行)
-- 优化:添加复合索引
CREATE INDEX idx_user_created ON orders(user_id, created_at DESC);
-- 优化后
EXPLAIN SELECT * FROM orders WHERE user_id = 1001 ORDER BY created_at DESC LIMIT 20;
-- type: ref (索引查找)
-- rows: 100 (只扫描100行)
-- 性能提升:从1.2s降到0.01s
3.2 连接池调优
import "database/sql"
func InitDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
// 最大打开连接数
db.SetMaxOpenConns(100)
// 最大空闲连接数
db.SetMaxIdleConns(20)
// 连接最大生命周期
db.SetConnMaxLifetime(time.Hour)
// 连接最大空闲时间
db.SetConnMaxIdleTime(10 * time.Minute)
return db, nil
}
// 监控连接池状态
func MonitorDBStats(db *sql.DB) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := db.Stats()
log.Printf("DB Stats: Open=%d, InUse=%d, Idle=%d, Waiting=%d",
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
)
// 如果等待连接数过多,需要增加MaxOpenConns
if stats.WaitCount > 100 {
log.Println("Warning: Too many connections waiting")
}
}
}
3.3 批量操作优化
// 慢:逐条插入
func InsertTradesSlow(trades []*Trade) error {
for _, trade := range trades {
_, err := db.Exec(`
INSERT INTO trades (trade_id, symbol, price, quantity, created_at)
VALUES (?, ?, ?, ?, ?)
`, trade.TradeID, trade.Symbol, trade.Price, trade.Quantity, trade.CreatedAt)
if err != nil {
return err
}
}
return nil
}
// 快:批量插入
func InsertTradesFast(trades []*Trade) error {
if len(trades) == 0 {
return nil
}
// 构建批量插入SQL
valueStrings := make([]string, 0, len(trades))
valueArgs := make([]interface{}, 0, len(trades)*5)
for _, trade := range trades {
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, trade.TradeID, trade.Symbol, trade.Price,
trade.Quantity, trade.CreatedAt)
}
query := fmt.Sprintf(`
INSERT INTO trades (trade_id, symbol, price, quantity, created_at)
VALUES %s
`, strings.Join(valueStrings, ","))
_, err := db.Exec(query, valueArgs...)
return err
}
// 性能对比:
// 插入10000条记录
// InsertTradesSlow: 8.2s
// InsertTradesFast: 0.3s (快27倍)
4. Redis优化
4.1 Pipeline批量操作
// 慢:逐个SET
func SetTickersSlow(tickers map[string]*Ticker) error {
for symbol, ticker := range tickers {
key := fmt.Sprintf("ticker:%s", symbol)
data, _ := json.Marshal(ticker)
err := redisClient.Set(ctx, key, data, 5*time.Second).Err()
if err != nil {
return err
}
}
return nil
}
// 快:Pipeline批量操作
func SetTickersFast(tickers map[string]*Ticker) error {
pipe := redisClient.Pipeline()
for symbol, ticker := range tickers {
key := fmt.Sprintf("ticker:%s", symbol)
data, _ := json.Marshal(ticker)
pipe.Set(ctx, key, data, 5*time.Second)
}
_, err := pipe.Exec(ctx)
return err
}
// 性能对比:
// 更新1000个ticker
// SetTickersSlow: 1.2s (往返1000次)
// SetTickersFast: 0.05s (往返1次,快24倍)
4.2 Lua脚本原子操作
// 使用Lua脚本实现原子操作
var updateTickerScript = redis.NewScript(`
local key = KEYS[1]
local lastPrice = tonumber(ARGV[1])
local volume = tonumber(ARGV[2])
redis.call('HSET', key, 'lastPrice', lastPrice)
redis.call('HINCRBY', key, 'volume', volume)
local high = redis.call('HGET', key, 'highPrice')
if not high or lastPrice > tonumber(high) then
redis.call('HSET', key, 'highPrice', lastPrice)
end
local low = redis.call('HGET', key, 'lowPrice')
if not low or lastPrice < tonumber(low) then
redis.call('HSET', key, 'lowPrice', lastPrice)
end
return redis.call('HGETALL', key)
`)
func UpdateTicker(symbol string, lastPrice, volume float64) error {
key := fmt.Sprintf("ticker:%s", symbol)
_, err := updateTickerScript.Run(ctx, redisClient,
[]string{key},
lastPrice, volume,
).Result()
return err
}
4.3 缓存预热
type CacheWarmer struct {
db *sql.DB
redis *redis.Client
}
func (cw *CacheWarmer) WarmUpTickers() error {
// 从数据库加载所有ticker
rows, err := cw.db.Query(`
SELECT symbol, last_price, open_price, high_price, low_price, volume
FROM tickers
`)
if err != nil {
return err
}
defer rows.Close()
pipe := cw.redis.Pipeline()
for rows.Next() {
var ticker Ticker
rows.Scan(&ticker.Symbol, &ticker.LastPrice, &ticker.OpenPrice,
&ticker.HighPrice, &ticker.LowPrice, &ticker.Volume)
key := fmt.Sprintf("ticker:%s", ticker.Symbol)
data, _ := json.Marshal(ticker)
pipe.Set(context.Background(), key, data, 24*time.Hour)
}
_, err = pipe.Exec(context.Background())
return err
}
// 在服务启动时预热缓存
func main() {
warmer := &CacheWarmer{db: db, redis: redisClient}
log.Println("Warming up cache...")
err := warmer.WarmUpTickers()
if err != nil {
log.Fatalf("Cache warmup failed: %v", err)
}
log.Println("Cache warmup completed")
// 启动服务
// ...
}
5. API性能优化
5.1 响应压缩
import "github.com/gorilla/handlers"
func main() {
router := mux.NewRouter()
// 启用Gzip压缩
handler := handlers.CompressHandler(router)
http.ListenAndServe(":8080", handler)
}
// 效果:
// 未压缩: 50KB
// Gzip压缩: 8KB (减少84%)
5.2 连接复用
// HTTP客户端配置
var httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
Timeout: 10 * time.Second,
}
// 使用单例client,避免每次请求创建新连接
func FetchExternalData(url string) ([]byte, error) {
resp, err := httpClient.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
5.3 并发限制
import "golang.org/x/time/rate"
type RateLimitedHandler struct {
limiter *rate.Limiter
handler http.Handler
}
func (rl *RateLimitedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !rl.limiter.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
rl.handler.ServeHTTP(w, r)
}
func main() {
router := mux.NewRouter()
// 限制:每秒1000个请求
limiter := rate.NewLimiter(rate.Limit(1000), 2000)
handler := &RateLimitedHandler{
limiter: limiter,
handler: router,
}
http.ListenAndServe(":8080", handler)
}
6. 性能监控与分析
6.1 pprof性能分析
import _ "net/http/pprof"
func main() {
// 启动pprof服务器
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// 启动应用
// ...
}
使用pprof:
# CPU profiling
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
# 内存分析
go tool pprof http://localhost:6060/debug/pprof/heap
# 查看goroutine
go tool pprof http://localhost:6060/debug/pprof/goroutine
# 生成火焰图
go tool pprof -http=:8081 http://localhost:6060/debug/pprof/profile?seconds=30
6.2 trace追踪
import "runtime/trace"
func main() {
f, _ := os.Create("trace.out")
defer f.Close()
trace.Start(f)
defer trace.Stop()
// 运行代码
benchmarkMatchingEngine()
}
// 分析trace
// go tool trace trace.out
6.3 基准测试最佳实践
func BenchmarkOrderBook(b *testing.B) {
ob := NewOrderBook()
// 预热
for i := 0; i < 1000; i++ {
ob.AddOrder(&Order{
OrderID: fmt.Sprintf("warmup_%d", i),
Side: "buy",
Price: 50000,
Quantity: 0.01,
})
}
// 重置计时器
b.ResetTimer()
// 报告内存分配
b.ReportAllocs()
// 并发测试
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
ob.AddOrder(&Order{
OrderID: fmt.Sprintf("bench_%d", i),
Side: "buy",
Price: 50000 + float64(i%1000),
Quantity: 0.01,
})
i++
}
})
}
// 运行:
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
// 结果示例:
// BenchmarkOrderBook-8 5000000 287 ns/op 128 B/op 2 allocs/op
小结
本章介绍了压力测试与性能优化:
- 压力测试工具:ab、wrk、Locust、k6的使用
- 撮合引擎压测:性能基准、并发测试
- 数据库优化:慢查询分析、连接池调优、批量操作
- Redis优化:Pipeline、Lua脚本、缓存预热
- API优化:响应压缩、连接复用、限流
- 性能分析:pprof、trace、基准测试
下一章是项目实战,将综合所有知识点实现一个完整的交易所。