HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

风控系统设计

交易所的风控系统是保障平台安全运行的核心防线。攻击者可能利用API漏洞在短时间内下大量小额订单,试图操纵市场价格;或通过其他手段进行洗钱、刷量等恶意行为。一个完善的风控系统需要能够实时检测并拦截这些异常行为,冻结可疑账户,最大程度降低平台损失。

本章参考Binance、Coinbase等头部交易所的风控架构,讲解如何设计一套包含实时风险评分、行为分析、反洗钱等多个模块的完整风控系统。

1. 风控体系架构

1.1 多层防御

第一层:API层防护
├── 频率限制 (Rate Limiting)
├── IP黑名单
└── 签名验证

第二层:订单层防护
├── 订单合法性检查
├── 自成交检测
└── 价格偏离检测

第三层:账户层防护
├── 余额检查
├── 风险限额
└── 异常行为检测

第四层:市场层防护
├── 市场操纵检测
├── 异常交易模式
└── 大额交易监控

第五层:合规层防护
├── KYC/AML
├── 黑名单监控
└── 大额提现审核

1.2 系统架构

package riskcontrol

import (
	"context"
	"sync"
)

// 风控引擎
type RiskControlEngine struct {
	// 各模块
	rateLimiter      *RateLimiter
	orderValidator   *OrderValidator
	accountMonitor   *AccountMonitor
	marketMonitor    *MarketMonitor
	complianceCheck  *ComplianceCheck

	// 规则引擎
	ruleEngine *RuleEngine

	// 风险评分
	riskScorer *RiskScorer

	// 黑名单
	blacklist *Blacklist
	mu        sync.RWMutex

	// 告警
	alerter *Alerter
}

func NewRiskControlEngine() *RiskControlEngine {
	return &RiskControlEngine{
		rateLimiter:     NewRateLimiter(),
		orderValidator:  NewOrderValidator(),
		accountMonitor:  NewAccountMonitor(),
		marketMonitor:   NewMarketMonitor(),
		complianceCheck: NewComplianceCheck(),
		ruleEngine:      NewRuleEngine(),
		riskScorer:      NewRiskScorer(),
		blacklist:       NewBlacklist(),
		alerter:         NewAlerter(),
	}
}

// 风控检查结果
type RiskCheckResult struct {
	Passed       bool
	RiskScore    float64 // 0-100,越高越危险
	RejectReason string
	Action       string // "allow", "reject", "review"
	Warnings     []string
}

// 统一风控检查入口
func (e *RiskControlEngine) CheckOrder(ctx context.Context, order *Order, user *User) *RiskCheckResult {
	result := &RiskCheckResult{
		Passed:    true,
		RiskScore: 0,
		Warnings:  make([]string, 0),
	}

	// 1. 频率限制检查
	if !e.rateLimiter.Allow(user.UserID) {
		result.Passed = false
		result.RejectReason = "Rate limit exceeded"
		result.Action = "reject"
		return result
	}

	// 2. 黑名单检查
	if e.blacklist.Contains(user.UserID) {
		result.Passed = false
		result.RejectReason = "User in blacklist"
		result.Action = "reject"
		e.alerter.Alert("Blacklisted user attempt", user.UserID)
		return result
	}

	// 3. 订单合法性检查
	if err := e.orderValidator.Validate(order); err != nil {
		result.Passed = false
		result.RejectReason = err.Error()
		result.Action = "reject"
		return result
	}

	// 4. 账户风险检查
	accountRisk := e.accountMonitor.CheckRisk(user, order)
	result.RiskScore += accountRisk.Score
	result.Warnings = append(result.Warnings, accountRisk.Warnings...)

	// 5. 市场风险检查
	marketRisk := e.marketMonitor.CheckRisk(order)
	result.RiskScore += marketRisk.Score
	result.Warnings = append(result.Warnings, marketRisk.Warnings...)

	// 6. 合规检查
	complianceRisk := e.complianceCheck.Check(user, order)
	result.RiskScore += complianceRisk.Score

	// 7. 规则引擎评估
	ruleResult := e.ruleEngine.Evaluate(user, order, result.RiskScore)
	if !ruleResult.Passed {
		result.Passed = false
		result.RejectReason = ruleResult.Reason
		result.Action = ruleResult.Action
		return result
	}

	// 8. 风险评分决策
	if result.RiskScore > 80 {
		result.Passed = false
		result.RejectReason = "High risk score"
		result.Action = "reject"
	} else if result.RiskScore > 50 {
		result.Passed = true
		result.Action = "review" // 允许但标记为待审核
	} else {
		result.Action = "allow"
	}

	return result
}

2. 频率限制 (Rate Limiting)

2.1 令牌桶算法

package ratelimit

import (
	"sync"
	"time"
)

// 令牌桶
type TokenBucket struct {
	capacity      int64     // 桶容量
	tokens        int64     // 当前令牌数
	refillRate    int64     // 每秒填充速率
	lastRefillTime time.Time
	mu            sync.Mutex
}

func NewTokenBucket(capacity, refillRate int64) *TokenBucket {
	return &TokenBucket{
		capacity:       capacity,
		tokens:         capacity,
		refillRate:     refillRate,
		lastRefillTime: time.Now(),
	}
}

func (tb *TokenBucket) Allow() bool {
	tb.mu.Lock()
	defer tb.mu.Unlock()

	// 填充令牌
	tb.refill()

	// 检查是否有令牌
	if tb.tokens > 0 {
		tb.tokens--
		return true
	}

	return false
}

func (tb *TokenBucket) refill() {
	now := time.Now()
	elapsed := now.Sub(tb.lastRefillTime)

	// 计算应该填充的令牌数
	tokensToAdd := int64(elapsed.Seconds()) * tb.refillRate

	if tokensToAdd > 0 {
		tb.tokens += tokensToAdd
		if tb.tokens > tb.capacity {
			tb.tokens = tb.capacity
		}
		tb.lastRefillTime = now
	}
}

// 多级限流
type RateLimiter struct {
	// 按用户限流
	userLimiters map[string]*TokenBucket
	userMu       sync.RWMutex

	// 按IP限流
	ipLimiters map[string]*TokenBucket
	ipMu       sync.RWMutex

	// 全局限流
	globalLimiter *TokenBucket

	// 限流配置
	config *RateLimitConfig
}

type RateLimitConfig struct {
	// 用户级限制
	UserCapacity    int64 // 桶容量
	UserRefillRate  int64 // 每秒填充

	// IP级限制
	IPCapacity      int64
	IPRefillRate    int64

	// 全局限制
	GlobalCapacity  int64
	GlobalRefillRate int64
}

func NewRateLimiter(config *RateLimitConfig) *RateLimiter {
	return &RateLimiter{
		userLimiters:  make(map[string]*TokenBucket),
		ipLimiters:    make(map[string]*TokenBucket),
		globalLimiter: NewTokenBucket(config.GlobalCapacity, config.GlobalRefillRate),
		config:        config,
	}
}

func (rl *RateLimiter) Allow(userID string, ip string) bool {
	// 1. 全局限流检查
	if !rl.globalLimiter.Allow() {
		return false
	}

	// 2. IP限流检查
	ipLimiter := rl.getIPLimiter(ip)
	if !ipLimiter.Allow() {
		return false
	}

	// 3. 用户限流检查
	userLimiter := rl.getUserLimiter(userID)
	if !userLimiter.Allow() {
		return false
	}

	return true
}

func (rl *RateLimiter) getUserLimiter(userID string) *TokenBucket {
	rl.userMu.RLock()
	limiter, exists := rl.userLimiters[userID]
	rl.userMu.RUnlock()

	if !exists {
		rl.userMu.Lock()
		// Double-check
		limiter, exists = rl.userLimiters[userID]
		if !exists {
			limiter = NewTokenBucket(rl.config.UserCapacity, rl.config.UserRefillRate)
			rl.userLimiters[userID] = limiter
		}
		rl.userMu.Unlock()
	}

	return limiter
}

func (rl *RateLimiter) getIPLimiter(ip string) *TokenBucket {
	rl.ipMu.RLock()
	limiter, exists := rl.ipLimiters[ip]
	rl.ipMu.RUnlock()

	if !exists {
		rl.ipMu.Lock()
		limiter, exists = rl.ipLimiters[ip]
		if !exists {
			limiter = NewTokenBucket(rl.config.IPCapacity, rl.config.IPRefillRate)
			rl.ipLimiters[ip] = limiter
		}
		rl.ipMu.Unlock()
	}

	return limiter
}

// 动态限流:根据系统负载调整
func (rl *RateLimiter) AdjustLimits(cpuUsage float64, qps int64) {
	if cpuUsage > 0.8 {
		// CPU使用率高,降低限流阈值
		rl.globalLimiter.refillRate = rl.globalLimiter.refillRate * 80 / 100
	} else if cpuUsage < 0.5 && qps < rl.config.GlobalCapacity/2 {
		// CPU使用率低且QPS低,提高限流阈值
		rl.globalLimiter.refillRate = rl.config.GlobalRefillRate
	}
}

2.2 实际配置

// 不同用户等级的限流配置
var rateLimitConfigs = map[string]*RateLimitConfig{
	"normal": {
		UserCapacity:     100,  // 普通用户每秒100请求
		UserRefillRate:   10,
		IPCapacity:       500,
		IPRefillRate:     50,
		GlobalCapacity:   100000,
		GlobalRefillRate: 10000,
	},
	"vip": {
		UserCapacity:     500,  // VIP用户每秒500请求
		UserRefillRate:   50,
		IPCapacity:       2000,
		IPRefillRate:     200,
		GlobalCapacity:   100000,
		GlobalRefillRate: 10000,
	},
	"api": {
		UserCapacity:     2000, // API用户每秒2000请求
		UserRefillRate:   200,
		IPCapacity:       5000,
		IPRefillRate:     500,
		GlobalCapacity:   100000,
		GlobalRefillRate: 10000,
	},
}

3. 订单风险检测

3.1 价格偏离检测

type PriceDeviationChecker struct {
	maxDeviation float64 // 最大偏离度,如0.05表示5%
}

func (c *PriceDeviationChecker) Check(order *Order, marketPrice float64) error {
	if order.Type != "limit" {
		return nil // 市价单不检查
	}

	var deviation float64
	if order.Side == "buy" {
		// 买单价格不应远高于市场价
		deviation = (order.Price - marketPrice) / marketPrice
	} else {
		// 卖单价格不应远低于市场价
		deviation = (marketPrice - order.Price) / marketPrice
	}

	if deviation > c.maxDeviation {
		return fmt.Errorf("price deviates too much from market: %.2f%%", deviation*100)
	}

	return nil
}

3.2 自成交检测

type SelfTradeChecker struct {
	// 记录用户在订单簿中的订单
	userOrders map[string]map[string]*Order // symbol -> userID -> orders
	mu         sync.RWMutex
}

func (c *SelfTradeChecker) Check(order *Order) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	symbol := order.Symbol
	userID := order.UserID

	existingOrders, exists := c.userOrders[symbol]
	if !exists {
		return nil
	}

	userExistingOrders, exists := existingOrders[userID]
	if !exists {
		return nil
	}

	// 检查是否会与自己的订单成交
	for _, existingOrder := range userExistingOrders {
		if existingOrder.Side != order.Side {
			// 有反向订单,可能自成交
			if wouldMatch(order, existingOrder) {
				return fmt.Errorf("self-trade detected")
			}
		}
	}

	return nil
}

func wouldMatch(order1, order2 *Order) bool {
	if order1.Type == "market" || order2.Type == "market" {
		return true
	}

	// 限价单匹配条件
	if order1.Side == "buy" && order2.Side == "sell" {
		return order1.Price >= order2.Price
	} else if order1.Side == "sell" && order2.Side == "buy" {
		return order2.Price >= order1.Price
	}

	return false
}

3.3 异常订单模式检测

type AnomalyDetector struct {
	// 记录用户近期订单
	recentOrders map[string]*OrderHistory // userID -> history
	mu           sync.RWMutex
}

type OrderHistory struct {
	Orders    []*Order
	MaxSize   int // 保留最近N个订单
	CreatedAt time.Time
}

func (ad *AnomalyDetector) Detect(user *User, order *Order) *Risk {
	ad.mu.Lock()
	history := ad.getOrCreateHistory(user.UserID)
	history.Orders = append(history.Orders, order)

	// 保持固定大小
	if len(history.Orders) > history.MaxSize {
		history.Orders = history.Orders[len(history.Orders)-history.MaxSize:]
	}
	ad.mu.Unlock()

	risk := &Risk{Score: 0, Warnings: make([]string, 0)}

	// 检测1:大量小额订单
	if ad.detectMicroOrders(history) {
		risk.Score += 30
		risk.Warnings = append(risk.Warnings, "Large number of micro orders")
	}

	// 检测2:快速撤单
	if ad.detectRapidCancels(history) {
		risk.Score += 25
		risk.Warnings = append(risk.Warnings, "Rapid order cancellations")
	}

	// 检测3:来回刷单
	if ad.detectWashTrading(history) {
		risk.Score += 40
		risk.Warnings = append(risk.Warnings, "Potential wash trading")
	}

	// 检测4:分层订单(Layering)
	if ad.detectLayering(history) {
		risk.Score += 35
		risk.Warnings = append(risk.Warnings, "Layering pattern detected")
	}

	return risk
}

// 检测大量小额订单
func (ad *AnomalyDetector) detectMicroOrders(history *OrderHistory) bool {
	if len(history.Orders) < 20 {
		return false
	}

	// 最近20个订单中,有超过15个是小额订单(<10 USDT)
	microCount := 0
	for i := len(history.Orders) - 20; i < len(history.Orders); i++ {
		orderValue := history.Orders[i].Price * history.Orders[i].Quantity
		if orderValue < 10 {
			microCount++
		}
	}

	return microCount > 15
}

// 检测快速撤单
func (ad *AnomalyDetector) detectRapidCancels(history *OrderHistory) bool {
	if len(history.Orders) < 10 {
		return false
	}

	// 最近10个订单中,有超过7个在1秒内被撤销
	rapidCancelCount := 0
	for i := len(history.Orders) - 10; i < len(history.Orders); i++ {
		order := history.Orders[i]
		if order.Status == "cancelled" {
			lifeTime := order.CancelledAt.Sub(order.CreatedAt)
			if lifeTime < time.Second {
				rapidCancelCount++
			}
		}
	}

	return rapidCancelCount > 7
}

// 检测刷单(Wash Trading)
func (ad *AnomalyDetector) detectWashTrading(history *OrderHistory) bool {
	// 短时间内频繁买卖同一交易对,且价格和数量相近
	if len(history.Orders) < 10 {
		return false
	}

	recent := history.Orders[len(history.Orders)-10:]

	buyOrders := make([]*Order, 0)
	sellOrders := make([]*Order, 0)

	for _, order := range recent {
		if order.Side == "buy" {
			buyOrders = append(buyOrders, order)
		} else {
			sellOrders = append(sellOrders, order)
		}
	}

	// 买单和卖单数量相当,且价格和数量接近
	if len(buyOrders) > 3 && len(sellOrders) > 3 {
		for _, buy := range buyOrders {
			for _, sell := range sellOrders {
				priceDiff := math.Abs(buy.Price-sell.Price) / buy.Price
				qtyDiff := math.Abs(buy.Quantity-sell.Quantity) / buy.Quantity
				timeDiff := sell.CreatedAt.Sub(buy.CreatedAt)

				// 价格差<1%,数量差<5%,时间差<5分钟
				if priceDiff < 0.01 && qtyDiff < 0.05 && timeDiff < 5*time.Minute {
					return true
				}
			}
		}
	}

	return false
}

// 检测分层订单(Layering/Spoofing)
func (ad *AnomalyDetector) detectLayering(history *OrderHistory) bool {
	// 在一侧挂大量订单制造虚假流动性,然后在另一侧小单成交
	// 简化检测:同一时间在买方挂了>5个订单,但在卖方成交
	if len(history.Orders) < 10 {
		return false
	}

	recent := history.Orders[len(history.Orders)-10:]

	pendingBuys := 0
	filledSells := 0

	for _, order := range recent {
		age := time.Since(order.CreatedAt)
		if age < 30*time.Second { // 近30秒内的订单
			if order.Side == "buy" && order.Status == "pending" {
				pendingBuys++
			} else if order.Side == "sell" && order.Status == "filled" {
				filledSells++
			}
		}
	}

	return pendingBuys > 5 && filledSells > 2
}

4. 账户风险监控

4.1 异常提现检测

type WithdrawalMonitor struct {
	// 用户提现历史
	history map[string]*WithdrawalHistory
	mu      sync.RWMutex
}

type WithdrawalHistory struct {
	Recent24h   []*Withdrawal
	Recent7d    []*Withdrawal
	TotalAmount float64
}

type Withdrawal struct {
	UserID    string
	Address   string
	Amount    float64
	Currency  string
	CreatedAt time.Time
	Status    string
}

func (wm *WithdrawalMonitor) CheckRisk(user *User, withdrawal *Withdrawal) *Risk {
	risk := &Risk{Score: 0, Warnings: make([]string, 0)}

	history := wm.getHistory(user.UserID)

	// 检测1:大额提现
	if withdrawal.Amount > user.DailyWithdrawLimit*0.8 {
		risk.Score += 20
		risk.Warnings = append(risk.Warnings, "Large withdrawal amount")
	}

	// 检测2:频繁提现
	if len(history.Recent24h) > 5 {
		risk.Score += 15
		risk.Warnings = append(risk.Warnings, "Frequent withdrawals in 24h")
	}

	// 检测3:新地址提现
	if wm.isNewAddress(user.UserID, withdrawal.Address) {
		risk.Score += 25
		risk.Warnings = append(risk.Warnings, "Withdrawal to new address")
	}

	// 检测4:账户刚充值后立即提现
	if wm.isQuickWithdrawAfterDeposit(user.UserID) {
		risk.Score += 30
		risk.Warnings = append(risk.Warnings, "Quick withdrawal after deposit")
	}

	// 检测5:异常登录后提现
	if wm.isAbnormalLogin(user) {
		risk.Score += 40
		risk.Warnings = append(risk.Warnings, "Withdrawal after abnormal login")
	}

	return risk
}

func (wm *WithdrawalMonitor) isNewAddress(userID, address string) bool {
	// 检查用户是否曾经向该地址提现过
	history := wm.getHistory(userID)
	for _, w := range history.Recent7d {
		if w.Address == address {
			return false
		}
	}
	return true
}

func (wm *WithdrawalMonitor) isQuickWithdrawAfterDeposit(userID string) bool {
	// 检查用户是否在充值后30分钟内提现
	// 这可能是洗钱行为
	// 实现略
	return false
}

func (wm *WithdrawalMonitor) isAbnormalLogin(user *User) bool {
	// 检查最近是否有异常登录
	// - 新设备登录
	// - 新IP登录
	// - 异地登录
	// 实现略
	return false
}

4.2 账户行为画像

type UserProfiler struct {
	profiles map[string]*UserProfile
	mu       sync.RWMutex
}

type UserProfile struct {
	UserID string

	// 交易行为特征
	AvgOrderSize      float64
	AvgOrderInterval  time.Duration
	PreferredSymbols  []string
	TradingHours      []int // 0-23,偏好的交易时间

	// 设备特征
	CommonIPs         []string
	CommonDevices     []string

	// 资金特征
	AvgBalance        float64
	DepositFrequency  int // 每月充值次数
	WithdrawFrequency int // 每月提现次数

	// 风险特征
	HistoricalRiskScore float64
	ViolationCount      int

	LastUpdated time.Time
}

func (up *UserProfiler) BuildProfile(userID string) *UserProfile {
	// 从历史数据构建用户画像
	orders := db.GetUserOrders(userID, 30*24*time.Hour) // 近30天订单
	deposits := db.GetUserDeposits(userID, 30*24*time.Hour)
	withdrawals := db.GetUserWithdrawals(userID, 30*24*time.Hour)

	profile := &UserProfile{UserID: userID}

	// 计算平均订单大小
	totalValue := 0.0
	for _, order := range orders {
		totalValue += order.Price * order.Quantity
	}
	if len(orders) > 0 {
		profile.AvgOrderSize = totalValue / float64(len(orders))
	}

	// 计算平均订单间隔
	if len(orders) > 1 {
		totalInterval := time.Duration(0)
		for i := 1; i < len(orders); i++ {
			interval := orders[i].CreatedAt.Sub(orders[i-1].CreatedAt)
			totalInterval += interval
		}
		profile.AvgOrderInterval = totalInterval / time.Duration(len(orders)-1)
	}

	// 统计偏好交易对
	symbolCount := make(map[string]int)
	for _, order := range orders {
		symbolCount[order.Symbol]++
	}
	// 取Top3
	profile.PreferredSymbols = getTop3Symbols(symbolCount)

	// 统计交易时间偏好
	hourCount := make([]int, 24)
	for _, order := range orders {
		hour := order.CreatedAt.Hour()
		hourCount[hour]++
	}
	// 找出高峰时段(交易量>平均值)
	avgCount := len(orders) / 24
	for hour, count := range hourCount {
		if count > avgCount {
			profile.TradingHours = append(profile.TradingHours, hour)
		}
	}

	// 其他特征
	profile.DepositFrequency = len(deposits)
	profile.WithdrawFrequency = len(withdrawals)
	profile.LastUpdated = time.Now()

	return profile
}

// 检测行为偏离
func (up *UserProfiler) DetectDeviation(userID string, order *Order) float64 {
	profile := up.getProfile(userID)
	if profile == nil {
		return 0 // 新用户,无画像
	}

	deviationScore := 0.0

	// 订单大小偏离
	orderValue := order.Price * order.Quantity
	if profile.AvgOrderSize > 0 {
		sizeDeviation := math.Abs(orderValue-profile.AvgOrderSize) / profile.AvgOrderSize
		if sizeDeviation > 2.0 { // 偏离2倍
			deviationScore += 20
		}
	}

	// 交易对偏离
	isPreferred := false
	for _, symbol := range profile.PreferredSymbols {
		if symbol == order.Symbol {
			isPreferred = true
			break
		}
	}
	if !isPreferred {
		deviationScore += 10 // 交易不常见的交易对
	}

	// 交易时间偏离
	hour := order.CreatedAt.Hour()
	isPreferredHour := false
	for _, h := range profile.TradingHours {
		if h == hour {
			isPreferredHour = true
			break
		}
	}
	if !isPreferredHour {
		deviationScore += 15 // 在非常规时间交易
	}

	return deviationScore
}

5. 市场操纵检测

5.1 拉升砸盘检测

type MarketManipulationDetector struct {
	// 市场价格历史
	priceHistory map[string]*PriceHistory
	mu           sync.RWMutex
}

type PriceHistory struct {
	Symbol string
	Prices []PricePoint
}

type PricePoint struct {
	Price     float64
	Volume    float64
	Timestamp time.Time
}

func (mmd *MarketManipulationDetector) DetectPumpAndDump(symbol string) bool {
	history := mmd.getPriceHistory(symbol)
	if len(history.Prices) < 100 {
		return false
	}

	// 取最近100个数据点
	recent := history.Prices[len(history.Prices)-100:]

	// 检测特征:
	// 1. 短时间内价格急剧上涨(>20%)
	// 2. 伴随大量交易量
	// 3. 然后价格快速回落

	// 计算价格变化
	startPrice := recent[0].Price
	maxPrice := startPrice
	maxIndex := 0

	for i, point := range recent {
		if point.Price > maxPrice {
			maxPrice = point.Price
			maxIndex = i
		}
	}

	priceIncrease := (maxPrice - startPrice) / startPrice

	// 检查是否急剧上涨
	if priceIncrease > 0.2 && maxIndex < 50 { // 前50个点内涨幅>20%
		// 检查后续是否回落
		if maxIndex < len(recent)-10 {
			endPrice := recent[len(recent)-1].Price
			priceDecrease := (maxPrice - endPrice) / maxPrice

			if priceDecrease > 0.15 { // 回落>15%
				return true
			}
		}
	}

	return false
}

5.2 虚假交易量检测

func (mmd *MarketManipulationDetector) DetectFakeVolume(symbol string) bool {
	// 检测特征:
	// 1. 交易量突然暴增,但价格几乎不变
	// 2. 大量小额交易
	// 3. 买卖双方可能是同一实体

	history := mmd.getPriceHistory(symbol)
	if len(history.Prices) < 20 {
		return false
	}

	recent := history.Prices[len(history.Prices)-20:]

	// 计算平均成交量
	avgVolume := 0.0
	for _, point := range recent[:10] {
		avgVolume += point.Volume
	}
	avgVolume /= 10

	// 计算最近成交量
	recentVolume := 0.0
	for _, point := range recent[10:] {
		recentVolume += point.Volume
	}
	recentVolume /= 10

	// 成交量暴增
	volumeIncrease := (recentVolume - avgVolume) / avgVolume

	// 价格变化
	priceChange := math.Abs(recent[19].Price-recent[10].Price) / recent[10].Price

	// 量增但价格几乎不变
	if volumeIncrease > 2.0 && priceChange < 0.02 {
		return true
	}

	return false
}

6. 合规检查

6.1 KYC/AML检查

type ComplianceChecker struct {
	// 黑名单数据库
	sanctionList *SanctionList

	// KYC数据库
	kycDB *KYCDB

	// AML规则引擎
	amlEngine *AMLEngine
}

func (cc *ComplianceChecker) CheckUser(user *User) *ComplianceRisk {
	risk := &ComplianceRisk{Score: 0, Issues: make([]string, 0)}

	// 1. 检查制裁名单
	if cc.sanctionList.Contains(user.Name, user.Country) {
		risk.Score = 100
		risk.Issues = append(risk.Issues, "User in sanction list")
		return risk
	}

	// 2. 检查KYC状态
	kycStatus := cc.kycDB.GetStatus(user.UserID)
	if kycStatus != "verified" {
		risk.Score += 50
		risk.Issues = append(risk.Issues, "KYC not verified")
	}

	// 3. 检查高风险国家
	if cc.isHighRiskCountry(user.Country) {
		risk.Score += 30
		risk.Issues = append(risk.Issues, "High risk country")
	}

	// 4. 检查PEP(政治公众人物)
	if cc.isPEP(user) {
		risk.Score += 40
		risk.Issues = append(risk.Issues, "Politically Exposed Person")
	}

	return risk
}

func (cc *ComplianceChecker) CheckTransaction(tx *Transaction) *ComplianceRisk {
	risk := &ComplianceRisk{Score: 0, Issues: make([]string, 0)}

	// 1. 大额交易报告(CTR - Currency Transaction Report)
	if tx.Amount > 10000 { // 超过1万美元
		risk.Score += 20
		risk.Issues = append(risk.Issues, "Large transaction - requires CTR")
	}

	// 2. 可疑交易报告(SAR - Suspicious Activity Report)
	if cc.amlEngine.IsSuspicious(tx) {
		risk.Score += 60
		risk.Issues = append(risk.Issues, "Suspicious activity detected")
	}

	// 3. 结构化交易(Structuring)检测
	// 故意拆分大额交易以规避监管
	if cc.detectStructuring(tx.UserID, tx.Amount) {
		risk.Score += 70
		risk.Issues = append(risk.Issues, "Potential structuring detected")
	}

	return risk
}

func (cc *ComplianceChecker) detectStructuring(userID string, amount float64) bool {
	// 检查用户是否在短时间内进行多笔略低于报告阈值的交易
	recent := db.GetUserTransactions(userID, 24*time.Hour)

	// 计算近24小时交易总额
	totalAmount := amount
	txCount := 1
	for _, tx := range recent {
		if tx.Amount > 8000 && tx.Amount < 10000 {
			// 单笔接近但未超过1万美元
			totalAmount += tx.Amount
			txCount++
		}
	}

	// 多笔交易总额超过3万,但每笔都未达到报告阈值
	if txCount >= 3 && totalAmount > 30000 {
		return true
	}

	return false
}

7. 实时风险评分

type RiskScorer struct {
	// 权重配置
	weights *RiskWeights
}

type RiskWeights struct {
	OrderRisk       float64
	AccountRisk     float64
	MarketRisk      float64
	ComplianceRisk  float64
	BehaviorRisk    float64
}

func (rs *RiskScorer) CalculateScore(ctx *RiskContext) float64 {
	score := 0.0

	// 加权求和
	score += ctx.OrderRisk * rs.weights.OrderRisk
	score += ctx.AccountRisk * rs.weights.AccountRisk
	score += ctx.MarketRisk * rs.weights.MarketRisk
	score += ctx.ComplianceRisk * rs.weights.ComplianceRisk
	score += ctx.BehaviorRisk * rs.weights.BehaviorRisk

	// 归一化到0-100
	return math.Min(score, 100)
}

// 风险评分可视化
func (rs *RiskScorer) GetScoreBreakdown(ctx *RiskContext) string {
	return fmt.Sprintf(`
Risk Score Breakdown:
├── Order Risk:      %.1f (weight: %.2f)
├── Account Risk:    %.1f (weight: %.2f)
├── Market Risk:     %.1f (weight: %.2f)
├── Compliance Risk: %.1f (weight: %.2f)
└── Behavior Risk:   %.1f (weight: %.2f)

Total Score: %.1f / 100
`,
		ctx.OrderRisk, rs.weights.OrderRisk,
		ctx.AccountRisk, rs.weights.AccountRisk,
		ctx.MarketRisk, rs.weights.MarketRisk,
		ctx.ComplianceRisk, rs.weights.ComplianceRisk,
		ctx.BehaviorRisk, rs.weights.BehaviorRisk,
		rs.CalculateScore(ctx),
	)
}

8. 告警与响应

type Alerter struct {
	// 告警通道
	channels []AlertChannel

	// 告警规则
	rules []*AlertRule
}

type AlertChannel interface {
	Send(alert *Alert) error
}

type Alert struct {
	Level    string // "low", "medium", "high", "critical"
	Type     string
	UserID   string
	Message  string
	Metadata map[string]interface{}
	Time     time.Time
}

type AlertRule struct {
	Name      string
	Condition func(*RiskContext) bool
	Level     string
	Action    func(*RiskContext)
}

func (a *Alerter) Process(ctx *RiskContext) {
	for _, rule := range a.rules {
		if rule.Condition(ctx) {
			// 触发告警
			alert := &Alert{
				Level:   rule.Level,
				Type:    rule.Name,
				UserID:  ctx.UserID,
				Message: fmt.Sprintf("Risk rule '%s' triggered", rule.Name),
				Metadata: map[string]interface{}{
					"risk_score": ctx.TotalScore,
				},
				Time: time.Now(),
			}

			// 发送告警
			for _, channel := range a.channels {
				channel.Send(alert)
			}

			// 执行响应动作
			rule.Action(ctx)
		}
	}
}

// 告警规则示例
var alertRules = []*AlertRule{
	{
		Name:  "HighRiskTransaction",
		Condition: func(ctx *RiskContext) bool {
			return ctx.TotalScore > 80
		},
		Level: "critical",
		Action: func(ctx *RiskContext) {
			// 冻结账户
			db.FreezeAccount(ctx.UserID)
			// 通知风控团队
			notifyRiskTeam(ctx)
		},
	},
	{
		Name:  "SuspiciousActivity",
		Condition: func(ctx *RiskContext) bool {
			return ctx.ComplianceRisk > 60
		},
		Level: "high",
		Action: func(ctx *RiskContext) {
			// 标记为待审核
			db.MarkForReview(ctx.UserID)
		},
	},
}

小结

风控系统是交易所安全的核心,涵盖:

  1. 多层防御体系
  2. 频率限制和黑名单
  3. 订单风险检测(价格偏离、自成交、异常模式)
  4. 账户风险监控(异常提现、行为画像)
  5. 市场操纵检测(拉升砸盘、虚假交易量)
  6. 合规检查(KYC/AML、大额交易报告)
  7. 实时风险评分与告警

下一章将讨论交易所的资金管理系统,包括钱包架构、充值提现流程、热冷钱包分离等内容。

Prev
清算系统设计
Next
资金管理系统