风控系统设计
交易所的风控系统是保障平台安全运行的核心防线。攻击者可能利用API漏洞在短时间内下大量小额订单,试图操纵市场价格;或通过其他手段进行洗钱、刷量等恶意行为。一个完善的风控系统需要能够实时检测并拦截这些异常行为,冻结可疑账户,最大程度降低平台损失。
本章参考Binance、Coinbase等头部交易所的风控架构,讲解如何设计一套包含实时风险评分、行为分析、反洗钱等多个模块的完整风控系统。
1. 风控体系架构
1.1 多层防御
第一层:API层防护
├── 频率限制 (Rate Limiting)
├── IP黑名单
└── 签名验证
第二层:订单层防护
├── 订单合法性检查
├── 自成交检测
└── 价格偏离检测
第三层:账户层防护
├── 余额检查
├── 风险限额
└── 异常行为检测
第四层:市场层防护
├── 市场操纵检测
├── 异常交易模式
└── 大额交易监控
第五层:合规层防护
├── KYC/AML
├── 黑名单监控
└── 大额提现审核
1.2 系统架构
package riskcontrol
import (
"context"
"sync"
)
// 风控引擎
type RiskControlEngine struct {
// 各模块
rateLimiter *RateLimiter
orderValidator *OrderValidator
accountMonitor *AccountMonitor
marketMonitor *MarketMonitor
complianceCheck *ComplianceCheck
// 规则引擎
ruleEngine *RuleEngine
// 风险评分
riskScorer *RiskScorer
// 黑名单
blacklist *Blacklist
mu sync.RWMutex
// 告警
alerter *Alerter
}
func NewRiskControlEngine() *RiskControlEngine {
return &RiskControlEngine{
rateLimiter: NewRateLimiter(),
orderValidator: NewOrderValidator(),
accountMonitor: NewAccountMonitor(),
marketMonitor: NewMarketMonitor(),
complianceCheck: NewComplianceCheck(),
ruleEngine: NewRuleEngine(),
riskScorer: NewRiskScorer(),
blacklist: NewBlacklist(),
alerter: NewAlerter(),
}
}
// 风控检查结果
type RiskCheckResult struct {
Passed bool
RiskScore float64 // 0-100,越高越危险
RejectReason string
Action string // "allow", "reject", "review"
Warnings []string
}
// 统一风控检查入口
func (e *RiskControlEngine) CheckOrder(ctx context.Context, order *Order, user *User) *RiskCheckResult {
result := &RiskCheckResult{
Passed: true,
RiskScore: 0,
Warnings: make([]string, 0),
}
// 1. 频率限制检查
if !e.rateLimiter.Allow(user.UserID) {
result.Passed = false
result.RejectReason = "Rate limit exceeded"
result.Action = "reject"
return result
}
// 2. 黑名单检查
if e.blacklist.Contains(user.UserID) {
result.Passed = false
result.RejectReason = "User in blacklist"
result.Action = "reject"
e.alerter.Alert("Blacklisted user attempt", user.UserID)
return result
}
// 3. 订单合法性检查
if err := e.orderValidator.Validate(order); err != nil {
result.Passed = false
result.RejectReason = err.Error()
result.Action = "reject"
return result
}
// 4. 账户风险检查
accountRisk := e.accountMonitor.CheckRisk(user, order)
result.RiskScore += accountRisk.Score
result.Warnings = append(result.Warnings, accountRisk.Warnings...)
// 5. 市场风险检查
marketRisk := e.marketMonitor.CheckRisk(order)
result.RiskScore += marketRisk.Score
result.Warnings = append(result.Warnings, marketRisk.Warnings...)
// 6. 合规检查
complianceRisk := e.complianceCheck.Check(user, order)
result.RiskScore += complianceRisk.Score
// 7. 规则引擎评估
ruleResult := e.ruleEngine.Evaluate(user, order, result.RiskScore)
if !ruleResult.Passed {
result.Passed = false
result.RejectReason = ruleResult.Reason
result.Action = ruleResult.Action
return result
}
// 8. 风险评分决策
if result.RiskScore > 80 {
result.Passed = false
result.RejectReason = "High risk score"
result.Action = "reject"
} else if result.RiskScore > 50 {
result.Passed = true
result.Action = "review" // 允许但标记为待审核
} else {
result.Action = "allow"
}
return result
}
2. 频率限制 (Rate Limiting)
2.1 令牌桶算法
package ratelimit
import (
"sync"
"time"
)
// 令牌桶
type TokenBucket struct {
capacity int64 // 桶容量
tokens int64 // 当前令牌数
refillRate int64 // 每秒填充速率
lastRefillTime time.Time
mu sync.Mutex
}
func NewTokenBucket(capacity, refillRate int64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefillTime: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 填充令牌
tb.refill()
// 检查是否有令牌
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime)
// 计算应该填充的令牌数
tokensToAdd := int64(elapsed.Seconds()) * tb.refillRate
if tokensToAdd > 0 {
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastRefillTime = now
}
}
// 多级限流
type RateLimiter struct {
// 按用户限流
userLimiters map[string]*TokenBucket
userMu sync.RWMutex
// 按IP限流
ipLimiters map[string]*TokenBucket
ipMu sync.RWMutex
// 全局限流
globalLimiter *TokenBucket
// 限流配置
config *RateLimitConfig
}
type RateLimitConfig struct {
// 用户级限制
UserCapacity int64 // 桶容量
UserRefillRate int64 // 每秒填充
// IP级限制
IPCapacity int64
IPRefillRate int64
// 全局限制
GlobalCapacity int64
GlobalRefillRate int64
}
func NewRateLimiter(config *RateLimitConfig) *RateLimiter {
return &RateLimiter{
userLimiters: make(map[string]*TokenBucket),
ipLimiters: make(map[string]*TokenBucket),
globalLimiter: NewTokenBucket(config.GlobalCapacity, config.GlobalRefillRate),
config: config,
}
}
func (rl *RateLimiter) Allow(userID string, ip string) bool {
// 1. 全局限流检查
if !rl.globalLimiter.Allow() {
return false
}
// 2. IP限流检查
ipLimiter := rl.getIPLimiter(ip)
if !ipLimiter.Allow() {
return false
}
// 3. 用户限流检查
userLimiter := rl.getUserLimiter(userID)
if !userLimiter.Allow() {
return false
}
return true
}
func (rl *RateLimiter) getUserLimiter(userID string) *TokenBucket {
rl.userMu.RLock()
limiter, exists := rl.userLimiters[userID]
rl.userMu.RUnlock()
if !exists {
rl.userMu.Lock()
// Double-check
limiter, exists = rl.userLimiters[userID]
if !exists {
limiter = NewTokenBucket(rl.config.UserCapacity, rl.config.UserRefillRate)
rl.userLimiters[userID] = limiter
}
rl.userMu.Unlock()
}
return limiter
}
func (rl *RateLimiter) getIPLimiter(ip string) *TokenBucket {
rl.ipMu.RLock()
limiter, exists := rl.ipLimiters[ip]
rl.ipMu.RUnlock()
if !exists {
rl.ipMu.Lock()
limiter, exists = rl.ipLimiters[ip]
if !exists {
limiter = NewTokenBucket(rl.config.IPCapacity, rl.config.IPRefillRate)
rl.ipLimiters[ip] = limiter
}
rl.ipMu.Unlock()
}
return limiter
}
// 动态限流:根据系统负载调整
func (rl *RateLimiter) AdjustLimits(cpuUsage float64, qps int64) {
if cpuUsage > 0.8 {
// CPU使用率高,降低限流阈值
rl.globalLimiter.refillRate = rl.globalLimiter.refillRate * 80 / 100
} else if cpuUsage < 0.5 && qps < rl.config.GlobalCapacity/2 {
// CPU使用率低且QPS低,提高限流阈值
rl.globalLimiter.refillRate = rl.config.GlobalRefillRate
}
}
2.2 实际配置
// 不同用户等级的限流配置
var rateLimitConfigs = map[string]*RateLimitConfig{
"normal": {
UserCapacity: 100, // 普通用户每秒100请求
UserRefillRate: 10,
IPCapacity: 500,
IPRefillRate: 50,
GlobalCapacity: 100000,
GlobalRefillRate: 10000,
},
"vip": {
UserCapacity: 500, // VIP用户每秒500请求
UserRefillRate: 50,
IPCapacity: 2000,
IPRefillRate: 200,
GlobalCapacity: 100000,
GlobalRefillRate: 10000,
},
"api": {
UserCapacity: 2000, // API用户每秒2000请求
UserRefillRate: 200,
IPCapacity: 5000,
IPRefillRate: 500,
GlobalCapacity: 100000,
GlobalRefillRate: 10000,
},
}
3. 订单风险检测
3.1 价格偏离检测
type PriceDeviationChecker struct {
maxDeviation float64 // 最大偏离度,如0.05表示5%
}
func (c *PriceDeviationChecker) Check(order *Order, marketPrice float64) error {
if order.Type != "limit" {
return nil // 市价单不检查
}
var deviation float64
if order.Side == "buy" {
// 买单价格不应远高于市场价
deviation = (order.Price - marketPrice) / marketPrice
} else {
// 卖单价格不应远低于市场价
deviation = (marketPrice - order.Price) / marketPrice
}
if deviation > c.maxDeviation {
return fmt.Errorf("price deviates too much from market: %.2f%%", deviation*100)
}
return nil
}
3.2 自成交检测
type SelfTradeChecker struct {
// 记录用户在订单簿中的订单
userOrders map[string]map[string]*Order // symbol -> userID -> orders
mu sync.RWMutex
}
func (c *SelfTradeChecker) Check(order *Order) error {
c.mu.RLock()
defer c.mu.RUnlock()
symbol := order.Symbol
userID := order.UserID
existingOrders, exists := c.userOrders[symbol]
if !exists {
return nil
}
userExistingOrders, exists := existingOrders[userID]
if !exists {
return nil
}
// 检查是否会与自己的订单成交
for _, existingOrder := range userExistingOrders {
if existingOrder.Side != order.Side {
// 有反向订单,可能自成交
if wouldMatch(order, existingOrder) {
return fmt.Errorf("self-trade detected")
}
}
}
return nil
}
func wouldMatch(order1, order2 *Order) bool {
if order1.Type == "market" || order2.Type == "market" {
return true
}
// 限价单匹配条件
if order1.Side == "buy" && order2.Side == "sell" {
return order1.Price >= order2.Price
} else if order1.Side == "sell" && order2.Side == "buy" {
return order2.Price >= order1.Price
}
return false
}
3.3 异常订单模式检测
type AnomalyDetector struct {
// 记录用户近期订单
recentOrders map[string]*OrderHistory // userID -> history
mu sync.RWMutex
}
type OrderHistory struct {
Orders []*Order
MaxSize int // 保留最近N个订单
CreatedAt time.Time
}
func (ad *AnomalyDetector) Detect(user *User, order *Order) *Risk {
ad.mu.Lock()
history := ad.getOrCreateHistory(user.UserID)
history.Orders = append(history.Orders, order)
// 保持固定大小
if len(history.Orders) > history.MaxSize {
history.Orders = history.Orders[len(history.Orders)-history.MaxSize:]
}
ad.mu.Unlock()
risk := &Risk{Score: 0, Warnings: make([]string, 0)}
// 检测1:大量小额订单
if ad.detectMicroOrders(history) {
risk.Score += 30
risk.Warnings = append(risk.Warnings, "Large number of micro orders")
}
// 检测2:快速撤单
if ad.detectRapidCancels(history) {
risk.Score += 25
risk.Warnings = append(risk.Warnings, "Rapid order cancellations")
}
// 检测3:来回刷单
if ad.detectWashTrading(history) {
risk.Score += 40
risk.Warnings = append(risk.Warnings, "Potential wash trading")
}
// 检测4:分层订单(Layering)
if ad.detectLayering(history) {
risk.Score += 35
risk.Warnings = append(risk.Warnings, "Layering pattern detected")
}
return risk
}
// 检测大量小额订单
func (ad *AnomalyDetector) detectMicroOrders(history *OrderHistory) bool {
if len(history.Orders) < 20 {
return false
}
// 最近20个订单中,有超过15个是小额订单(<10 USDT)
microCount := 0
for i := len(history.Orders) - 20; i < len(history.Orders); i++ {
orderValue := history.Orders[i].Price * history.Orders[i].Quantity
if orderValue < 10 {
microCount++
}
}
return microCount > 15
}
// 检测快速撤单
func (ad *AnomalyDetector) detectRapidCancels(history *OrderHistory) bool {
if len(history.Orders) < 10 {
return false
}
// 最近10个订单中,有超过7个在1秒内被撤销
rapidCancelCount := 0
for i := len(history.Orders) - 10; i < len(history.Orders); i++ {
order := history.Orders[i]
if order.Status == "cancelled" {
lifeTime := order.CancelledAt.Sub(order.CreatedAt)
if lifeTime < time.Second {
rapidCancelCount++
}
}
}
return rapidCancelCount > 7
}
// 检测刷单(Wash Trading)
func (ad *AnomalyDetector) detectWashTrading(history *OrderHistory) bool {
// 短时间内频繁买卖同一交易对,且价格和数量相近
if len(history.Orders) < 10 {
return false
}
recent := history.Orders[len(history.Orders)-10:]
buyOrders := make([]*Order, 0)
sellOrders := make([]*Order, 0)
for _, order := range recent {
if order.Side == "buy" {
buyOrders = append(buyOrders, order)
} else {
sellOrders = append(sellOrders, order)
}
}
// 买单和卖单数量相当,且价格和数量接近
if len(buyOrders) > 3 && len(sellOrders) > 3 {
for _, buy := range buyOrders {
for _, sell := range sellOrders {
priceDiff := math.Abs(buy.Price-sell.Price) / buy.Price
qtyDiff := math.Abs(buy.Quantity-sell.Quantity) / buy.Quantity
timeDiff := sell.CreatedAt.Sub(buy.CreatedAt)
// 价格差<1%,数量差<5%,时间差<5分钟
if priceDiff < 0.01 && qtyDiff < 0.05 && timeDiff < 5*time.Minute {
return true
}
}
}
}
return false
}
// 检测分层订单(Layering/Spoofing)
func (ad *AnomalyDetector) detectLayering(history *OrderHistory) bool {
// 在一侧挂大量订单制造虚假流动性,然后在另一侧小单成交
// 简化检测:同一时间在买方挂了>5个订单,但在卖方成交
if len(history.Orders) < 10 {
return false
}
recent := history.Orders[len(history.Orders)-10:]
pendingBuys := 0
filledSells := 0
for _, order := range recent {
age := time.Since(order.CreatedAt)
if age < 30*time.Second { // 近30秒内的订单
if order.Side == "buy" && order.Status == "pending" {
pendingBuys++
} else if order.Side == "sell" && order.Status == "filled" {
filledSells++
}
}
}
return pendingBuys > 5 && filledSells > 2
}
4. 账户风险监控
4.1 异常提现检测
type WithdrawalMonitor struct {
// 用户提现历史
history map[string]*WithdrawalHistory
mu sync.RWMutex
}
type WithdrawalHistory struct {
Recent24h []*Withdrawal
Recent7d []*Withdrawal
TotalAmount float64
}
type Withdrawal struct {
UserID string
Address string
Amount float64
Currency string
CreatedAt time.Time
Status string
}
func (wm *WithdrawalMonitor) CheckRisk(user *User, withdrawal *Withdrawal) *Risk {
risk := &Risk{Score: 0, Warnings: make([]string, 0)}
history := wm.getHistory(user.UserID)
// 检测1:大额提现
if withdrawal.Amount > user.DailyWithdrawLimit*0.8 {
risk.Score += 20
risk.Warnings = append(risk.Warnings, "Large withdrawal amount")
}
// 检测2:频繁提现
if len(history.Recent24h) > 5 {
risk.Score += 15
risk.Warnings = append(risk.Warnings, "Frequent withdrawals in 24h")
}
// 检测3:新地址提现
if wm.isNewAddress(user.UserID, withdrawal.Address) {
risk.Score += 25
risk.Warnings = append(risk.Warnings, "Withdrawal to new address")
}
// 检测4:账户刚充值后立即提现
if wm.isQuickWithdrawAfterDeposit(user.UserID) {
risk.Score += 30
risk.Warnings = append(risk.Warnings, "Quick withdrawal after deposit")
}
// 检测5:异常登录后提现
if wm.isAbnormalLogin(user) {
risk.Score += 40
risk.Warnings = append(risk.Warnings, "Withdrawal after abnormal login")
}
return risk
}
func (wm *WithdrawalMonitor) isNewAddress(userID, address string) bool {
// 检查用户是否曾经向该地址提现过
history := wm.getHistory(userID)
for _, w := range history.Recent7d {
if w.Address == address {
return false
}
}
return true
}
func (wm *WithdrawalMonitor) isQuickWithdrawAfterDeposit(userID string) bool {
// 检查用户是否在充值后30分钟内提现
// 这可能是洗钱行为
// 实现略
return false
}
func (wm *WithdrawalMonitor) isAbnormalLogin(user *User) bool {
// 检查最近是否有异常登录
// - 新设备登录
// - 新IP登录
// - 异地登录
// 实现略
return false
}
4.2 账户行为画像
type UserProfiler struct {
profiles map[string]*UserProfile
mu sync.RWMutex
}
type UserProfile struct {
UserID string
// 交易行为特征
AvgOrderSize float64
AvgOrderInterval time.Duration
PreferredSymbols []string
TradingHours []int // 0-23,偏好的交易时间
// 设备特征
CommonIPs []string
CommonDevices []string
// 资金特征
AvgBalance float64
DepositFrequency int // 每月充值次数
WithdrawFrequency int // 每月提现次数
// 风险特征
HistoricalRiskScore float64
ViolationCount int
LastUpdated time.Time
}
func (up *UserProfiler) BuildProfile(userID string) *UserProfile {
// 从历史数据构建用户画像
orders := db.GetUserOrders(userID, 30*24*time.Hour) // 近30天订单
deposits := db.GetUserDeposits(userID, 30*24*time.Hour)
withdrawals := db.GetUserWithdrawals(userID, 30*24*time.Hour)
profile := &UserProfile{UserID: userID}
// 计算平均订单大小
totalValue := 0.0
for _, order := range orders {
totalValue += order.Price * order.Quantity
}
if len(orders) > 0 {
profile.AvgOrderSize = totalValue / float64(len(orders))
}
// 计算平均订单间隔
if len(orders) > 1 {
totalInterval := time.Duration(0)
for i := 1; i < len(orders); i++ {
interval := orders[i].CreatedAt.Sub(orders[i-1].CreatedAt)
totalInterval += interval
}
profile.AvgOrderInterval = totalInterval / time.Duration(len(orders)-1)
}
// 统计偏好交易对
symbolCount := make(map[string]int)
for _, order := range orders {
symbolCount[order.Symbol]++
}
// 取Top3
profile.PreferredSymbols = getTop3Symbols(symbolCount)
// 统计交易时间偏好
hourCount := make([]int, 24)
for _, order := range orders {
hour := order.CreatedAt.Hour()
hourCount[hour]++
}
// 找出高峰时段(交易量>平均值)
avgCount := len(orders) / 24
for hour, count := range hourCount {
if count > avgCount {
profile.TradingHours = append(profile.TradingHours, hour)
}
}
// 其他特征
profile.DepositFrequency = len(deposits)
profile.WithdrawFrequency = len(withdrawals)
profile.LastUpdated = time.Now()
return profile
}
// 检测行为偏离
func (up *UserProfiler) DetectDeviation(userID string, order *Order) float64 {
profile := up.getProfile(userID)
if profile == nil {
return 0 // 新用户,无画像
}
deviationScore := 0.0
// 订单大小偏离
orderValue := order.Price * order.Quantity
if profile.AvgOrderSize > 0 {
sizeDeviation := math.Abs(orderValue-profile.AvgOrderSize) / profile.AvgOrderSize
if sizeDeviation > 2.0 { // 偏离2倍
deviationScore += 20
}
}
// 交易对偏离
isPreferred := false
for _, symbol := range profile.PreferredSymbols {
if symbol == order.Symbol {
isPreferred = true
break
}
}
if !isPreferred {
deviationScore += 10 // 交易不常见的交易对
}
// 交易时间偏离
hour := order.CreatedAt.Hour()
isPreferredHour := false
for _, h := range profile.TradingHours {
if h == hour {
isPreferredHour = true
break
}
}
if !isPreferredHour {
deviationScore += 15 // 在非常规时间交易
}
return deviationScore
}
5. 市场操纵检测
5.1 拉升砸盘检测
type MarketManipulationDetector struct {
// 市场价格历史
priceHistory map[string]*PriceHistory
mu sync.RWMutex
}
type PriceHistory struct {
Symbol string
Prices []PricePoint
}
type PricePoint struct {
Price float64
Volume float64
Timestamp time.Time
}
func (mmd *MarketManipulationDetector) DetectPumpAndDump(symbol string) bool {
history := mmd.getPriceHistory(symbol)
if len(history.Prices) < 100 {
return false
}
// 取最近100个数据点
recent := history.Prices[len(history.Prices)-100:]
// 检测特征:
// 1. 短时间内价格急剧上涨(>20%)
// 2. 伴随大量交易量
// 3. 然后价格快速回落
// 计算价格变化
startPrice := recent[0].Price
maxPrice := startPrice
maxIndex := 0
for i, point := range recent {
if point.Price > maxPrice {
maxPrice = point.Price
maxIndex = i
}
}
priceIncrease := (maxPrice - startPrice) / startPrice
// 检查是否急剧上涨
if priceIncrease > 0.2 && maxIndex < 50 { // 前50个点内涨幅>20%
// 检查后续是否回落
if maxIndex < len(recent)-10 {
endPrice := recent[len(recent)-1].Price
priceDecrease := (maxPrice - endPrice) / maxPrice
if priceDecrease > 0.15 { // 回落>15%
return true
}
}
}
return false
}
5.2 虚假交易量检测
func (mmd *MarketManipulationDetector) DetectFakeVolume(symbol string) bool {
// 检测特征:
// 1. 交易量突然暴增,但价格几乎不变
// 2. 大量小额交易
// 3. 买卖双方可能是同一实体
history := mmd.getPriceHistory(symbol)
if len(history.Prices) < 20 {
return false
}
recent := history.Prices[len(history.Prices)-20:]
// 计算平均成交量
avgVolume := 0.0
for _, point := range recent[:10] {
avgVolume += point.Volume
}
avgVolume /= 10
// 计算最近成交量
recentVolume := 0.0
for _, point := range recent[10:] {
recentVolume += point.Volume
}
recentVolume /= 10
// 成交量暴增
volumeIncrease := (recentVolume - avgVolume) / avgVolume
// 价格变化
priceChange := math.Abs(recent[19].Price-recent[10].Price) / recent[10].Price
// 量增但价格几乎不变
if volumeIncrease > 2.0 && priceChange < 0.02 {
return true
}
return false
}
6. 合规检查
6.1 KYC/AML检查
type ComplianceChecker struct {
// 黑名单数据库
sanctionList *SanctionList
// KYC数据库
kycDB *KYCDB
// AML规则引擎
amlEngine *AMLEngine
}
func (cc *ComplianceChecker) CheckUser(user *User) *ComplianceRisk {
risk := &ComplianceRisk{Score: 0, Issues: make([]string, 0)}
// 1. 检查制裁名单
if cc.sanctionList.Contains(user.Name, user.Country) {
risk.Score = 100
risk.Issues = append(risk.Issues, "User in sanction list")
return risk
}
// 2. 检查KYC状态
kycStatus := cc.kycDB.GetStatus(user.UserID)
if kycStatus != "verified" {
risk.Score += 50
risk.Issues = append(risk.Issues, "KYC not verified")
}
// 3. 检查高风险国家
if cc.isHighRiskCountry(user.Country) {
risk.Score += 30
risk.Issues = append(risk.Issues, "High risk country")
}
// 4. 检查PEP(政治公众人物)
if cc.isPEP(user) {
risk.Score += 40
risk.Issues = append(risk.Issues, "Politically Exposed Person")
}
return risk
}
func (cc *ComplianceChecker) CheckTransaction(tx *Transaction) *ComplianceRisk {
risk := &ComplianceRisk{Score: 0, Issues: make([]string, 0)}
// 1. 大额交易报告(CTR - Currency Transaction Report)
if tx.Amount > 10000 { // 超过1万美元
risk.Score += 20
risk.Issues = append(risk.Issues, "Large transaction - requires CTR")
}
// 2. 可疑交易报告(SAR - Suspicious Activity Report)
if cc.amlEngine.IsSuspicious(tx) {
risk.Score += 60
risk.Issues = append(risk.Issues, "Suspicious activity detected")
}
// 3. 结构化交易(Structuring)检测
// 故意拆分大额交易以规避监管
if cc.detectStructuring(tx.UserID, tx.Amount) {
risk.Score += 70
risk.Issues = append(risk.Issues, "Potential structuring detected")
}
return risk
}
func (cc *ComplianceChecker) detectStructuring(userID string, amount float64) bool {
// 检查用户是否在短时间内进行多笔略低于报告阈值的交易
recent := db.GetUserTransactions(userID, 24*time.Hour)
// 计算近24小时交易总额
totalAmount := amount
txCount := 1
for _, tx := range recent {
if tx.Amount > 8000 && tx.Amount < 10000 {
// 单笔接近但未超过1万美元
totalAmount += tx.Amount
txCount++
}
}
// 多笔交易总额超过3万,但每笔都未达到报告阈值
if txCount >= 3 && totalAmount > 30000 {
return true
}
return false
}
7. 实时风险评分
type RiskScorer struct {
// 权重配置
weights *RiskWeights
}
type RiskWeights struct {
OrderRisk float64
AccountRisk float64
MarketRisk float64
ComplianceRisk float64
BehaviorRisk float64
}
func (rs *RiskScorer) CalculateScore(ctx *RiskContext) float64 {
score := 0.0
// 加权求和
score += ctx.OrderRisk * rs.weights.OrderRisk
score += ctx.AccountRisk * rs.weights.AccountRisk
score += ctx.MarketRisk * rs.weights.MarketRisk
score += ctx.ComplianceRisk * rs.weights.ComplianceRisk
score += ctx.BehaviorRisk * rs.weights.BehaviorRisk
// 归一化到0-100
return math.Min(score, 100)
}
// 风险评分可视化
func (rs *RiskScorer) GetScoreBreakdown(ctx *RiskContext) string {
return fmt.Sprintf(`
Risk Score Breakdown:
├── Order Risk: %.1f (weight: %.2f)
├── Account Risk: %.1f (weight: %.2f)
├── Market Risk: %.1f (weight: %.2f)
├── Compliance Risk: %.1f (weight: %.2f)
└── Behavior Risk: %.1f (weight: %.2f)
Total Score: %.1f / 100
`,
ctx.OrderRisk, rs.weights.OrderRisk,
ctx.AccountRisk, rs.weights.AccountRisk,
ctx.MarketRisk, rs.weights.MarketRisk,
ctx.ComplianceRisk, rs.weights.ComplianceRisk,
ctx.BehaviorRisk, rs.weights.BehaviorRisk,
rs.CalculateScore(ctx),
)
}
8. 告警与响应
type Alerter struct {
// 告警通道
channels []AlertChannel
// 告警规则
rules []*AlertRule
}
type AlertChannel interface {
Send(alert *Alert) error
}
type Alert struct {
Level string // "low", "medium", "high", "critical"
Type string
UserID string
Message string
Metadata map[string]interface{}
Time time.Time
}
type AlertRule struct {
Name string
Condition func(*RiskContext) bool
Level string
Action func(*RiskContext)
}
func (a *Alerter) Process(ctx *RiskContext) {
for _, rule := range a.rules {
if rule.Condition(ctx) {
// 触发告警
alert := &Alert{
Level: rule.Level,
Type: rule.Name,
UserID: ctx.UserID,
Message: fmt.Sprintf("Risk rule '%s' triggered", rule.Name),
Metadata: map[string]interface{}{
"risk_score": ctx.TotalScore,
},
Time: time.Now(),
}
// 发送告警
for _, channel := range a.channels {
channel.Send(alert)
}
// 执行响应动作
rule.Action(ctx)
}
}
}
// 告警规则示例
var alertRules = []*AlertRule{
{
Name: "HighRiskTransaction",
Condition: func(ctx *RiskContext) bool {
return ctx.TotalScore > 80
},
Level: "critical",
Action: func(ctx *RiskContext) {
// 冻结账户
db.FreezeAccount(ctx.UserID)
// 通知风控团队
notifyRiskTeam(ctx)
},
},
{
Name: "SuspiciousActivity",
Condition: func(ctx *RiskContext) bool {
return ctx.ComplianceRisk > 60
},
Level: "high",
Action: func(ctx *RiskContext) {
// 标记为待审核
db.MarkForReview(ctx.UserID)
},
},
}
小结
风控系统是交易所安全的核心,涵盖:
- 多层防御体系
- 频率限制和黑名单
- 订单风险检测(价格偏离、自成交、异常模式)
- 账户风险监控(异常提现、行为画像)
- 市场操纵检测(拉升砸盘、虚假交易量)
- 合规检查(KYC/AML、大额交易报告)
- 实时风险评分与告警
下一章将讨论交易所的资金管理系统,包括钱包架构、充值提现流程、热冷钱包分离等内容。