HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 交易所技术完整体系

    • 交易所技术完整体系
    • 交易所技术架构总览
    • 交易基础概念
    • 撮合引擎原理
    • 撮合引擎实现-内存撮合
    • 撮合引擎优化 - 延迟与吞吐
    • 撮合引擎高可用
    • 清算系统设计
    • 风控系统设计
    • 资金管理系统
    • 行情系统设计
    • 去中心化交易所(DEX)设计
    • 合约交易系统
    • 数据库设计与优化
    • 缓存与消息队列
    • 用户系统与KYC
    • 交易所API设计
    • 监控与告警系统
    • 安全防护与攻防
    • 高可用架构设计
    • 压力测试与性能优化
    • 项目实战-完整交易所实现

资金管理系统

交易所的资金管理系统是平台安全的核心。黑客可能通过钓鱼攻击获取运维人员权限,试图转走热钱包中的资金。即使攻击者获得了部分权限,完善的多重签名机制和冷热钱包分离策略也能确保资金安全,使攻击者只能看到资金但无法转出。

本章参考Coinbase、Gemini等头部交易所的钱包架构,详细讲解如何设计一套采用多重签名、硬件安全模块(HSM)、MPC(多方计算)等多项安全措施的完整资金管理系统,包括钱包架构、充值提现流程、安全防护、对账系统等核心内容。

1. 钱包架构设计

1.1 冷热钱包分离

用户资金分布策略:
├── 热钱包 (5-10%)
│   ├── 在线存储
│   ├── 处理充值提现
│   └── 快速响应
│
├── 温钱包 (20-30%)
│   ├── 半离线存储
│   ├── 定期补充热钱包
│   └── 需要人工审批
│
└── 冷钱包 (60-75%)
    ├── 完全离线存储
    ├── 物理隔离
    ├── 多重签名
    └── 定期审计

1.2 钱包类型

package wallet

import (
	"crypto/ecdsa"
	"crypto/rand"
	"fmt"
	"math/big"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/crypto"
)

// 钱包类型
const (
	WalletTypeHot    = "hot"    // 热钱包
	WalletTypeWarm   = "warm"   // 温钱包
	WalletTypeCold   = "cold"   // 冷钱包
	WalletTypeUser   = "user"   // 用户托管钱包
)

// 钱包接口
type Wallet interface {
	GetAddress() string
	GetBalance() (*big.Int, error)
	Transfer(to string, amount *big.Int) (string, error)
	Sign(message []byte) ([]byte, error)
}

// 热钱包实现
type HotWallet struct {
	privateKey *ecdsa.PrivateKey
	address    common.Address
	chain      string // "ETH", "BTC", etc.
}

func NewHotWallet(chain string) (*HotWallet, error) {
	// 生成私钥
	privateKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
	if err != nil {
		return nil, err
	}

	// 生成地址
	address := crypto.PubkeyToAddress(privateKey.PublicKey)

	return &HotWallet{
		privateKey: privateKey,
		address:    address,
		chain:      chain,
	}, nil
}

func (w *HotWallet) GetAddress() string {
	return w.address.Hex()
}

func (w *HotWallet) GetBalance() (*big.Int, error) {
	// 查询区块链获取余额
	// 实现略
	return big.NewInt(0), nil
}

func (w *HotWallet) Transfer(to string, amount *big.Int) (string, error) {
	// 构造交易
	// 签名
	// 广播到区块链
	// 实现略
	return "", nil
}

func (w *HotWallet) Sign(message []byte) ([]byte, error) {
	hash := crypto.Keccak256Hash(message)
	signature, err := crypto.Sign(hash.Bytes(), w.privateKey)
	return signature, err
}

// 冷钱包(多重签名)
type ColdWallet struct {
	addresses []string // 多个签名地址
	threshold int      // 最小签名数
	chain     string
}

func NewColdWallet(addresses []string, threshold int, chain string) *ColdWallet {
	return &ColdWallet{
		addresses: addresses,
		threshold: threshold,
		chain:     chain,
	}
}

func (w *ColdWallet) GetAddress() string {
	// 生成多签地址
	// 实现略
	return ""
}

func (w *ColdWallet) Transfer(to string, amount *big.Int) (string, error) {
	// 冷钱包转账需要多方签名
	// 1. 创建交易提案
	// 2. 收集签名(至少threshold个)
	// 3. 组合签名并广播
	return "", fmt.Errorf("cold wallet transfer requires multi-signature approval")
}

// 温钱包(时间锁)
type WarmWallet struct {
	*HotWallet
	unlockTime int64 // 解锁时间戳
}

func NewWarmWallet(chain string, unlockDelay int64) (*WarmWallet, error) {
	hotWallet, err := NewHotWallet(chain)
	if err != nil {
		return nil, err
	}

	return &WarmWallet{
		HotWallet:  hotWallet,
		unlockTime: time.Now().Unix() + unlockDelay,
	}, nil
}

func (w *WarmWallet) Transfer(to string, amount *big.Int) (string, error) {
	// 检查是否已解锁
	if time.Now().Unix() < w.unlockTime {
		return "", fmt.Errorf("wallet is still locked until %d", w.unlockTime)
	}

	return w.HotWallet.Transfer(to, amount)
}

1.3 钱包管理系统

type WalletManager struct {
	hotWallets  map[string]*HotWallet  // currency -> wallet
	warmWallets map[string]*WarmWallet
	coldWallets map[string]*ColdWallet

	// 资金分配策略
	hotRatio  float64 // 热钱包比例
	warmRatio float64 // 温钱包比例
	coldRatio float64 // 冷钱包比例

	// 阈值配置
	hotWalletThreshold  *big.Int // 热钱包阈值
	warmWalletThreshold *big.Int // 温钱包阈值

	mu sync.RWMutex
}

func NewWalletManager() *WalletManager {
	return &WalletManager{
		hotWallets:  make(map[string]*HotWallet),
		warmWallets: make(map[string]*WarmWallet),
		coldWallets: make(map[string]*ColdWallet),
		hotRatio:    0.05, // 5%
		warmRatio:   0.25, // 25%
		coldRatio:   0.70, // 70%
	}
}

// 自动平衡钱包资金
func (wm *WalletManager) Rebalance(currency string) error {
	wm.mu.Lock()
	defer wm.mu.Unlock()

	// 1. 获取总余额
	hotBalance, _ := wm.hotWallets[currency].GetBalance()
	warmBalance, _ := wm.warmWallets[currency].GetBalance()
	coldBalance, _ := wm.coldWallets[currency].GetBalance()

	totalBalance := new(big.Int).Add(hotBalance, warmBalance)
	totalBalance.Add(totalBalance, coldBalance)

	// 2. 计算目标余额
	hotTarget := new(big.Int).Mul(totalBalance, big.NewInt(int64(wm.hotRatio*100)))
	hotTarget.Div(hotTarget, big.NewInt(100))

	warmTarget := new(big.Int).Mul(totalBalance, big.NewInt(int64(wm.warmRatio*100)))
	warmTarget.Div(warmTarget, big.NewInt(100))

	// 3. 调整
	if hotBalance.Cmp(hotTarget) < 0 {
		// 热钱包不足,从温钱包补充
		deficit := new(big.Int).Sub(hotTarget, hotBalance)
		if warmBalance.Cmp(deficit) >= 0 {
			wm.warmWallets[currency].Transfer(wm.hotWallets[currency].GetAddress(), deficit)
		}
	} else if hotBalance.Cmp(new(big.Int).Mul(hotTarget, big.NewInt(2))) > 0 {
		// 热钱包过多,转移到温钱包
		surplus := new(big.Int).Sub(hotBalance, hotTarget)
		wm.hotWallets[currency].Transfer(wm.warmWallets[currency].GetAddress(), surplus)
	}

	return nil
}

// 定期平衡任务
func (wm *WalletManager) AutoRebalance(ctx context.Context) {
	ticker := time.NewTicker(1 * time.Hour)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			for currency := range wm.hotWallets {
				if err := wm.Rebalance(currency); err != nil {
					log.Printf("Rebalance failed for %s: %v", currency, err)
				}
			}
		}
	}
}

2. 充值系统

2.1 充值流程

用户发起充值
    ↓
区块链监听服务检测到交易
    ↓
等待确认块数
    ↓
验证交易有效性
    ↓
更新用户余额
    ↓
发送充值成功通知

2.2 区块链监听器

package blockchain

import (
	"context"
	"log"
	"math/big"
	"time"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/ethclient"
)

type BlockchainListener struct {
	client          *ethclient.Client
	depositHandler  DepositHandler
	lastBlockNumber uint64
	confirmations   uint64 // 需要的确认数
}

func NewBlockchainListener(rpcURL string, confirmations uint64) (*BlockchainListener, error) {
	client, err := ethclient.Dial(rpcURL)
	if err != nil {
		return nil, err
	}

	return &BlockchainListener{
		client:        client,
		confirmations: confirmations,
	}, nil
}

// 开始监听区块
func (bl *BlockchainListener) Start(ctx context.Context) {
	ticker := time.NewTicker(3 * time.Second) // 每3秒检查一次新块
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			bl.processNewBlocks()
		}
	}
}

func (bl *BlockchainListener) processNewBlocks() {
	// 获取最新区块号
	header, err := bl.client.HeaderByNumber(context.Background(), nil)
	if err != nil {
		log.Printf("Failed to get latest block: %v", err)
		return
	}

	latestBlockNumber := header.Number.Uint64()

	// 处理未处理的区块
	for blockNum := bl.lastBlockNumber + 1; blockNum <= latestBlockNumber; blockNum++ {
		bl.processBlock(blockNum)
	}

	bl.lastBlockNumber = latestBlockNumber
}

func (bl *BlockchainListener) processBlock(blockNumber uint64) {
	// 获取区块
	block, err := bl.client.BlockByNumber(context.Background(), big.NewInt(int64(blockNumber)))
	if err != nil {
		log.Printf("Failed to get block %d: %v", blockNumber, err)
		return
	}

	// 处理区块中的所有交易
	for _, tx := range block.Transactions() {
		bl.processTransaction(tx, blockNumber)
	}
}

func (bl *BlockchainListener) processTransaction(tx *types.Transaction, blockNumber uint64) {
	// 检查交易接收地址是否是我们的充值地址
	to := tx.To()
	if to == nil {
		return
	}

	// 查询该地址是否属于某个用户
	userID, err := db.GetUserIDByDepositAddress(to.Hex())
	if err != nil {
		return // 不是我们的地址
	}

	// 获取当前区块号,计算确认数
	currentBlock, _ := bl.client.BlockNumber(context.Background())
	confirmations := currentBlock - blockNumber

	// 创建充值记录
	deposit := &Deposit{
		TxHash:        tx.Hash().Hex(),
		UserID:        userID,
		Currency:      "ETH",
		Amount:        tx.Value(),
		FromAddress:   getSender(tx),
		ToAddress:     to.Hex(),
		BlockNumber:   blockNumber,
		Confirmations: confirmations,
		Status:        "pending",
		CreatedAt:     time.Now(),
	}

	// 检查是否达到确认要求
	if confirmations >= bl.confirmations {
		deposit.Status = "confirmed"
		// 更新用户余额
		bl.depositHandler.ProcessDeposit(deposit)
	} else {
		// 保存待确认的充值记录
		db.SavePendingDeposit(deposit)
	}
}

// 获取交易发送者地址
func getSender(tx *types.Transaction) string {
	from, _ := types.Sender(types.NewEIP155Signer(tx.ChainId()), tx)
	return from.Hex()
}

2.3 充值处理器

type DepositHandler struct {
	accountSvc AccountService
	notifier   Notifier
}

func (dh *DepositHandler) ProcessDeposit(deposit *Deposit) error {
	// 1. 防止重复处理
	exists, err := db.DepositExists(deposit.TxHash)
	if err != nil {
		return err
	}
	if exists {
		return nil // 已处理过
	}

	// 2. 验证交易有效性
	if !dh.validateDeposit(deposit) {
		deposit.Status = "invalid"
		db.UpdateDeposit(deposit)
		return fmt.Errorf("invalid deposit")
	}

	// 3. 开启事务
	tx := db.BeginTransaction()
	defer tx.Rollback()

	// 4. 增加用户余额
	err = dh.accountSvc.AddBalance(tx, deposit.UserID, deposit.Currency, deposit.Amount)
	if err != nil {
		return err
	}

	// 5. 记录充值
	deposit.Status = "completed"
	deposit.CompletedAt = time.Now()
	err = db.SaveDeposit(tx, deposit)
	if err != nil {
		return err
	}

	// 6. 提交事务
	if err := tx.Commit(); err != nil {
		return err
	}

	// 7. 发送通知
	dh.notifier.NotifyDeposit(deposit.UserID, deposit)

	log.Printf("Deposit processed: user=%s, amount=%s, tx=%s",
		deposit.UserID, deposit.Amount.String(), deposit.TxHash)

	return nil
}

func (dh *DepositHandler) validateDeposit(deposit *Deposit) bool {
	// 1. 检查金额是否大于最小充值额
	minDeposit := big.NewInt(10000000000000000) // 0.01 ETH
	if deposit.Amount.Cmp(minDeposit) < 0 {
		return false
	}

	// 2. 检查交易是否成功
	receipt, err := ethClient.TransactionReceipt(context.Background(), common.HexToHash(deposit.TxHash))
	if err != nil || receipt.Status != 1 {
		return false
	}

	// 3. 其他验证...

	return true
}

3. 提现系统

3.1 提现流程

用户提交提现申请
    ↓
风控检查
    ↓
扣除用户余额(冻结)
    ↓
创建待处理提现记录
    ↓
[人工审核(可选)]
    ↓
从热钱包发起转账
    ↓
等待交易确认
    ↓
更新提现状态
    ↓
发送提现完成通知

3.2 提现处理器

package withdrawal

import (
	"context"
	"fmt"
	"math/big"
	"time"
)

type WithdrawalProcessor struct {
	accountSvc   AccountService
	walletMgr    *WalletManager
	riskControl  RiskControl
	notifier     Notifier
}

// 提现申请
func (wp *WithdrawalProcessor) RequestWithdrawal(userID, currency, address string, amount *big.Int) (*Withdrawal, error) {
	// 1. 风控检查
	riskResult := wp.riskControl.CheckWithdrawal(userID, currency, address, amount)
	if !riskResult.Passed {
		return nil, fmt.Errorf("risk check failed: %s", riskResult.Reason)
	}

	// 2. 检查余额
	balance, err := wp.accountSvc.GetAvailableBalance(userID, currency)
	if err != nil {
		return nil, err
	}

	// 计算手续费
	fee := wp.calculateFee(currency, amount)
	totalAmount := new(big.Int).Add(amount, fee)

	if balance.Cmp(totalAmount) < 0 {
		return nil, fmt.Errorf("insufficient balance")
	}

	// 3. 开启事务
	tx := db.BeginTransaction()
	defer tx.Rollback()

	// 4. 冻结余额
	err = wp.accountSvc.FreezeBalance(tx, userID, currency, totalAmount)
	if err != nil {
		return nil, err
	}

	// 5. 创建提现记录
	withdrawal := &Withdrawal{
		WithdrawalID: generateWithdrawalID(),
		UserID:       userID,
		Currency:     currency,
		Amount:       amount,
		Fee:          fee,
		Address:      address,
		Status:       "pending",
		RiskScore:    riskResult.RiskScore,
		CreatedAt:    time.Now(),
	}

	// 如果风险分数高,需要人工审核
	if riskResult.RiskScore > 50 {
		withdrawal.Status = "reviewing"
	}

	err = db.SaveWithdrawal(tx, withdrawal)
	if err != nil {
		return nil, err
	}

	// 6. 提交事务
	if err := tx.Commit(); err != nil {
		return nil, err
	}

	// 7. 如果不需要审核,加入处理队列
	if withdrawal.Status == "pending" {
		wp.enqueueWithdrawal(withdrawal)
	}

	return withdrawal, nil
}

// 处理提现
func (wp *WithdrawalProcessor) ProcessWithdrawal(ctx context.Context) {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			wp.processPendingWithdrawals()
		}
	}
}

func (wp *WithdrawalProcessor) processPendingWithdrawals() {
	// 获取待处理提现
	withdrawals, err := db.GetPendingWithdrawals(100)
	if err != nil {
		log.Printf("Failed to get pending withdrawals: %v", err)
		return
	}

	for _, withdrawal := range withdrawals {
		go wp.executeWithdrawal(withdrawal)
	}
}

func (wp *WithdrawalProcessor) executeWithdrawal(withdrawal *Withdrawal) {
	// 1. 获取热钱包
	hotWallet := wp.walletMgr.GetHotWallet(withdrawal.Currency)
	if hotWallet == nil {
		log.Printf("Hot wallet not found for %s", withdrawal.Currency)
		return
	}

	// 2. 检查热钱包余额
	hotBalance, err := hotWallet.GetBalance()
	if err != nil || hotBalance.Cmp(withdrawal.Amount) < 0 {
		// 热钱包余额不足,触发充值
		wp.walletMgr.Rebalance(withdrawal.Currency)
		return
	}

	// 3. 发起转账
	txHash, err := hotWallet.Transfer(withdrawal.Address, withdrawal.Amount)
	if err != nil {
		log.Printf("Transfer failed: %v", err)
		withdrawal.Status = "failed"
		withdrawal.FailReason = err.Error()
		db.UpdateWithdrawal(withdrawal)

		// 解冻余额
		wp.accountSvc.UnfreezeBalance(withdrawal.UserID, withdrawal.Currency,
			new(big.Int).Add(withdrawal.Amount, withdrawal.Fee))
		return
	}

	// 4. 更新提现状态
	withdrawal.TxHash = txHash
	withdrawal.Status = "processing"
	withdrawal.SentAt = time.Now()
	db.UpdateWithdrawal(withdrawal)

	log.Printf("Withdrawal sent: id=%s, tx=%s", withdrawal.WithdrawalID, txHash)
}

// 确认提现完成
func (wp *WithdrawalProcessor) ConfirmWithdrawal(ctx context.Context) {
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			wp.checkProcessingWithdrawals()
		}
	}
}

func (wp *WithdrawalProcessor) checkProcessingWithdrawals() {
	withdrawals, err := db.GetProcessingWithdrawals(100)
	if err != nil {
		return
	}

	for _, withdrawal := range withdrawals {
		// 查询交易确认数
		confirmations, err := getTransactionConfirmations(withdrawal.TxHash)
		if err != nil {
			continue
		}

		requiredConfirmations := uint64(12) // ETH需要12个确认
		if confirmations >= requiredConfirmations {
			// 完成提现
			wp.completeWithdrawal(withdrawal)
		}
	}
}

func (wp *WithdrawalProcessor) completeWithdrawal(withdrawal *Withdrawal) {
	tx := db.BeginTransaction()
	defer tx.Rollback()

	// 1. 扣除冻结余额
	totalAmount := new(big.Int).Add(withdrawal.Amount, withdrawal.Fee)
	err := wp.accountSvc.DeductFrozenBalance(tx, withdrawal.UserID, withdrawal.Currency, totalAmount)
	if err != nil {
		log.Printf("Failed to deduct balance: %v", err)
		return
	}

	// 2. 更新提现状态
	withdrawal.Status = "completed"
	withdrawal.CompletedAt = time.Now()
	db.UpdateWithdrawal(tx, withdrawal)

	// 3. 提交
	if err := tx.Commit(); err != nil {
		log.Printf("Failed to commit: %v", err)
		return
	}

	// 4. 通知用户
	wp.notifier.NotifyWithdrawal(withdrawal.UserID, withdrawal)

	log.Printf("Withdrawal completed: id=%s, tx=%s", withdrawal.WithdrawalID, withdrawal.TxHash)
}

// 计算手续费
func (wp *WithdrawalProcessor) calculateFee(currency string, amount *big.Int) *big.Int {
	// 固定手续费 + 比例手续费
	fixedFee := big.NewInt(1000000000000000) // 0.001 ETH
	rateFee := new(big.Int).Mul(amount, big.NewInt(5))
	rateFee.Div(rateFee, big.NewInt(1000)) // 0.5%

	totalFee := new(big.Int).Add(fixedFee, rateFee)

	// 设置最大手续费
	maxFee := big.NewInt(10000000000000000) // 0.01 ETH
	if totalFee.Cmp(maxFee) > 0 {
		return maxFee
	}

	return totalFee
}

3.3 批量提现优化

对于小额提现,可以批量处理以节省Gas费用。

type BatchWithdrawalProcessor struct {
	*WithdrawalProcessor
	batchSize     int
	batchInterval time.Duration
}

func (bwp *BatchWithdrawalProcessor) ProcessInBatches(ctx context.Context) {
	ticker := time.NewTicker(bwp.batchInterval)
	defer ticker.Stop()

	buffer := make([]*Withdrawal, 0, bwp.batchSize)

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			// 获取待处理提现
			withdrawals, _ := db.GetPendingWithdrawals(bwp.batchSize)

			buffer = append(buffer, withdrawals...)

			if len(buffer) >= bwp.batchSize {
				bwp.executeBatch(buffer[:bwp.batchSize])
				buffer = buffer[bwp.batchSize:]
			}
		}
	}
}

func (bwp *BatchWithdrawalProcessor) executeBatch(withdrawals []*Withdrawal) {
	// 按币种分组
	groups := make(map[string][]*Withdrawal)
	for _, w := range withdrawals {
		groups[w.Currency] = append(groups[w.Currency], w)
	}

	// 每种币批量发送
	for currency, batch := range groups {
		bwp.sendBatchTransfer(currency, batch)
	}
}

func (bwp *BatchWithdrawalProcessor) sendBatchTransfer(currency string, batch []*Withdrawal) {
	// 使用智能合约批量转账
	addresses := make([]string, len(batch))
	amounts := make([]*big.Int, len(batch))

	for i, w := range batch {
		addresses[i] = w.Address
		amounts[i] = w.Amount
	}

	// 调用批量转账合约
	txHash, err := bwp.walletMgr.BatchTransfer(currency, addresses, amounts)
	if err != nil {
		log.Printf("Batch transfer failed: %v", err)
		return
	}

	// 更新所有提现记录
	for _, w := range batch {
		w.TxHash = txHash
		w.Status = "processing"
		db.UpdateWithdrawal(w)
	}

	log.Printf("Batch transfer sent: %d withdrawals, tx=%s", len(batch), txHash)
}

4. 对账系统

4.1 实时对账

type ReconciliationService struct {
	accountSvc AccountService
	walletMgr  *WalletManager
}

func (rs *ReconciliationService) Reconcile(currency string) (*ReconciliationReport, error) {
	report := &ReconciliationReport{
		Currency:  currency,
		Timestamp: time.Now(),
	}

	// 1. 获取所有用户余额总和
	totalUserBalance, err := rs.accountSvc.GetTotalBalance(currency)
	if err != nil {
		return nil, err
	}
	report.TotalUserBalance = totalUserBalance

	// 2. 获取钱包实际余额
	hotBalance, _ := rs.walletMgr.GetHotWallet(currency).GetBalance()
	warmBalance, _ := rs.walletMgr.GetWarmWallet(currency).GetBalance()
	coldBalance, _ := rs.walletMgr.GetColdWallet(currency).GetBalance()

	totalWalletBalance := new(big.Int).Add(hotBalance, warmBalance)
	totalWalletBalance.Add(totalWalletBalance, coldBalance)
	report.TotalWalletBalance = totalWalletBalance

	// 3. 获取待处理提现总额
	pendingWithdrawals, err := db.GetPendingWithdrawalAmount(currency)
	if err != nil {
		return nil, err
	}
	report.PendingWithdrawals = pendingWithdrawals

	// 4. 对账
	// 公式:钱包余额 + 待处理提现 = 用户余额
	expected := new(big.Int).Add(totalWalletBalance, pendingWithdrawals)
	report.Difference = new(big.Int).Sub(expected, totalUserBalance)

	// 5. 检查差异
	threshold := big.NewInt(1000000) // 允许的误差范围
	if report.Difference.Cmp(threshold) > 0 || report.Difference.Cmp(new(big.Int).Neg(threshold)) < 0 {
		report.Status = "mismatch"
		report.AlertLevel = "critical"
	} else {
		report.Status = "matched"
	}

	return report, nil
}

type ReconciliationReport struct {
	Currency           string
	TotalUserBalance   *big.Int
	TotalWalletBalance *big.Int
	PendingWithdrawals *big.Int
	Difference         *big.Int
	Status             string
	AlertLevel         string
	Timestamp          time.Time
}

// 定期对账
func (rs *ReconciliationService) AutoReconcile(ctx context.Context) {
	ticker := time.NewTicker(1 * time.Hour)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			currencies := []string{"BTC", "ETH", "USDT"}
			for _, currency := range currencies {
				report, err := rs.Reconcile(currency)
				if err != nil {
					log.Printf("Reconciliation failed for %s: %v", currency, err)
					continue
				}

				// 保存对账报告
				db.SaveReconciliationReport(report)

				// 如果有差异,发送告警
				if report.Status == "mismatch" {
					alerter.Alert("Reconciliation Mismatch", report)
				}
			}
		}
	}
}

5. 安全防护

5.1 多重签名

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

contract MultiSigWallet {
    event Deposit(address indexed sender, uint amount);
    event Submit(uint indexed txId);
    event Approve(address indexed owner, uint indexed txId);
    event Revoke(address indexed owner, uint indexed txId);
    event Execute(uint indexed txId);

    struct Transaction {
        address to;
        uint value;
        bytes data;
        bool executed;
    }

    address[] public owners;
    mapping(address => bool) public isOwner;
    uint public required; // 需要的签名数

    Transaction[] public transactions;
    mapping(uint => mapping(address => bool)) public approved;

    modifier onlyOwner() {
        require(isOwner[msg.sender], "not owner");
        _;
    }

    modifier txExists(uint _txId) {
        require(_txId < transactions.length, "tx does not exist");
        _;
    }

    modifier notExecuted(uint _txId) {
        require(!transactions[_txId].executed, "tx already executed");
        _;
    }

    modifier notApproved(uint _txId) {
        require(!approved[_txId][msg.sender], "tx already approved");
        _;
    }

    constructor(address[] memory _owners, uint _required) {
        require(_owners.length > 0, "owners required");
        require(_required > 0 && _required <= _owners.length, "invalid required number");

        for (uint i = 0; i < _owners.length; i++) {
            address owner = _owners[i];
            require(owner != address(0), "invalid owner");
            require(!isOwner[owner], "owner is not unique");

            isOwner[owner] = true;
            owners.push(owner);
        }

        required = _required;
    }

    receive() external payable {
        emit Deposit(msg.sender, msg.value);
    }

    function submit(address _to, uint _value, bytes calldata _data) external onlyOwner {
        transactions.push(Transaction({
            to: _to,
            value: _value,
            data: _data,
            executed: false
        }));

        emit Submit(transactions.length - 1);
    }

    function approve(uint _txId)
        external
        onlyOwner
        txExists(_txId)
        notExecuted(_txId)
        notApproved(_txId)
    {
        approved[_txId][msg.sender] = true;
        emit Approve(msg.sender, _txId);
    }

    function execute(uint _txId) external txExists(_txId) notExecuted(_txId) {
        require(_getApprovalCount(_txId) >= required, "approvals < required");

        Transaction storage transaction = transactions[_txId];
        transaction.executed = true;

        (bool success, ) = transaction.to.call{value: transaction.value}(transaction.data);
        require(success, "tx failed");

        emit Execute(_txId);
    }

    function revoke(uint _txId) external onlyOwner txExists(_txId) notExecuted(_txId) {
        require(approved[_txId][msg.sender], "tx not approved");

        approved[_txId][msg.sender] = false;
        emit Revoke(msg.sender, _txId);
    }

    function _getApprovalCount(uint _txId) private view returns (uint count) {
        for (uint i = 0; i < owners.length; i++) {
            if (approved[_txId][owners[i]]) {
                count += 1;
            }
        }
    }
}

5.2 硬件安全模块(HSM)集成

// 使用HSM签名交易
type HSMSigner struct {
	hsmClient *hsm.Client
	keyID     string
}

func (hs *HSMSigner) Sign(message []byte) ([]byte, error) {
	// 将消息发送到HSM设备签名
	signature, err := hs.hsmClient.Sign(hs.keyID, message)
	if err != nil {
		return nil, err
	}

	return signature, nil
}

小结

资金管理系统是交易所的核心,关键要点:

  1. 冷热钱包分离(5% hot, 25% warm, 70% cold)
  2. 自动平衡机制
  3. 完善的充值提现流程
  4. 实时对账系统
  5. 多重签名保护
  6. HSM硬件安全

下一章将讨论交易所的性能优化和高可用架构设计。

Prev
风控系统设计
Next
行情系统设计