09 - 支付系统设计
| > 面试频率: 需求类型 | 指标 |
|---|---|
| 可用性 | 99.99%(允许年停机 52.6 分钟) |
| 一致性 | 强一致性(资金不能出错) |
| 响应时间 | P99 < 500ms |
| 并发 | 支持 10 万 QPS(大促场景) |
| 安全性 | PCI DSS 合规 |
1.3 面试官可能的追问
Q1: 支付系统和普通业务系统有什么不同?
A1:
- 资金安全:不能多扣、少扣、重复扣款
- 强一致性:必须保证账户余额准确
- 审计要求:所有操作必须有完整日志
- 监管合规:需要符合金融监管要求
Q2: 如何保证支付不会重复扣款?
A2: 通过幂等性设计,核心方法:
- 唯一订单号(Order ID)
- 数据库唯一索引
- 分布式锁
- 状态机控制
2. 容量估算
2.1 场景假设
假设为一个中型电商平台设计支付系统:
- 日活用户(DAU):1000 万
- 日订单量:100 万
- 支付成功率:80%
- 峰值倍数:5 倍(大促)
2.2 QPS 估算
正常流量
日订单量 = 100 万
支付请求 = 100 万 / 0.8 = 125 万次
平均 QPS = 125 万 / 86400 ≈ 14.5 QPS
峰值 QPS = 14.5 × 5 = 72.5 QPS
大促流量(双十一)
日订单量 = 1000 万
峰值 QPS = 1000 万 / 86400 × 5 ≈ 578 QPS
秒杀场景(前 1 分钟):
瞬时 QPS = 100 万 / 60 ≈ 16,666 QPS
2.3 存储估算
订单表
单条记录 = 500 字节
日订单 = 100 万
月订单 = 3000 万
年订单 = 3.6 亿
年存储 = 3.6 亿 × 500 字节 ≈ 180 GB
3年存储 ≈ 540 GB
流水表
每笔订单 2 条流水(扣款 + 入账)
年流水 = 3.6 亿 × 2 = 7.2 亿条
年存储 = 7.2 亿 × 300 字节 ≈ 216 GB
3年存储 ≈ 648 GB
总存储需求:约 1.2 TB(3 年数据)
2.4 带宽估算
单次支付请求 = 2 KB
单次响应 = 1 KB
总流量 = 3 KB
峰值 QPS = 578
峰值带宽 = 578 × 3 KB = 1.7 MB/s ≈ 14 Mbps
2.5 服务器估算
应用服务器
单机 QPS = 1000(Go 应用)
峰值 QPS = 578
所需服务器 = 578 / 1000 × 2(冗余)≈ 2 台
大促场景:
峰值 QPS = 16,666
所需服务器 = 16,666 / 1000 × 2 ≈ 34 台
数据库
MySQL 主库写 QPS = 5000
峰值写 QPS = 578
所需主库 = 1 台(足够)
读 QPS(查询订单状态)= 2000
所需从库 = 2000 / 10000 ≈ 1 台
3. API 设计
3.1 核心 API
3.1.1 创建支付订单
POST /api/v1/payment/orders
Request:
{
"order_id": "ORD20231113001", // 业务订单号
"user_id": 12345,
"amount": 99.99, // 支付金额(元)
"currency": "CNY",
"payment_method": "wechat", // wechat | alipay | balance
"subject": "购买商品",
"description": "iPhone 15 Pro",
"notify_url": "https://api.example.com/callback",
"return_url": "https://example.com/success",
"timeout": 900, // 超时时间(秒)
"client_ip": "192.168.1.100",
"extra": {} // 扩展字段
}
Response:
{
"code": 0,
"message": "success",
"data": {
"payment_order_id": "PAY20231113001",
"status": "pending",
"qr_code": "weixin://wxpay/...", // 微信二维码
"expires_at": "2023-11-13T12:15:00Z"
}
}
3.1.2 查询支付状态
GET /api/v1/payment/orders/{payment_order_id}
Response:
{
"code": 0,
"message": "success",
"data": {
"payment_order_id": "PAY20231113001",
"order_id": "ORD20231113001",
"status": "success", // pending | success | failed | timeout
"amount": 99.99,
"paid_at": "2023-11-13T12:10:30Z",
"transaction_id": "4200001234567890" // 第三方交易号
}
}
3.1.3 支付回调接口
POST /api/v1/payment/callback/wechat
Request:
{
"appid": "wxd678efh567hg6787",
"mch_id": "1230000109",
"nonce_str": "5K8264ILTKCH16CQ2502SI8ZNMTM67VS",
"sign": "C380BEC2BFD727A4B6845133519F3AD6",
"out_trade_no": "PAY20231113001",
"transaction_id": "4200001234567890",
"total_fee": 9999, // 单位:分
"result_code": "SUCCESS"
}
Response:
{
"return_code": "SUCCESS",
"return_msg": "OK"
}
3.1.4 申请退款
POST /api/v1/payment/refunds
Request:
{
"payment_order_id": "PAY20231113001",
"refund_amount": 99.99,
"refund_reason": "7天无理由退货",
"notify_url": "https://api.example.com/refund_callback"
}
Response:
{
"code": 0,
"message": "success",
"data": {
"refund_id": "REF20231113001",
"status": "processing",
"refund_amount": 99.99
}
}
4. 数据模型设计
4.1 支付订单表
CREATE TABLE payment_orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
payment_order_id VARCHAR(64) NOT NULL UNIQUE COMMENT '支付订单号',
order_id VARCHAR(64) NOT NULL COMMENT '业务订单号',
user_id BIGINT NOT NULL,
amount DECIMAL(10, 2) NOT NULL COMMENT '支付金额',
currency VARCHAR(3) DEFAULT 'CNY',
payment_method VARCHAR(20) NOT NULL COMMENT 'wechat|alipay|balance',
status VARCHAR(20) NOT NULL COMMENT 'pending|paying|success|failed|timeout|closed',
subject VARCHAR(256),
description TEXT,
transaction_id VARCHAR(64) COMMENT '第三方交易号',
notify_url VARCHAR(512),
return_url VARCHAR(512),
client_ip VARCHAR(50),
paid_at DATETIME COMMENT '支付成功时间',
expires_at DATETIME COMMENT '过期时间',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_order_id (order_id),
INDEX idx_user_id (user_id),
INDEX idx_status_created (status, created_at),
INDEX idx_transaction_id (transaction_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='支付订单表';
4.2 账户表
CREATE TABLE accounts (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL UNIQUE,
balance DECIMAL(18, 2) DEFAULT 0.00 COMMENT '可用余额',
frozen_balance DECIMAL(18, 2) DEFAULT 0.00 COMMENT '冻结余额',
total_balance DECIMAL(18, 2) DEFAULT 0.00 COMMENT '总余额',
version INT DEFAULT 0 COMMENT '乐观锁版本号',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='账户表';
4.3 账户流水表
CREATE TABLE account_transactions (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
transaction_id VARCHAR(64) NOT NULL UNIQUE COMMENT '流水号',
user_id BIGINT NOT NULL,
account_id BIGINT NOT NULL,
type VARCHAR(20) NOT NULL COMMENT 'debit|credit|freeze|unfreeze',
amount DECIMAL(18, 2) NOT NULL,
balance_before DECIMAL(18, 2) NOT NULL COMMENT '交易前余额',
balance_after DECIMAL(18, 2) NOT NULL COMMENT '交易后余额',
business_type VARCHAR(50) COMMENT 'payment|refund|recharge',
business_id VARCHAR(64) COMMENT '业务ID',
remark TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_business (business_type, business_id),
INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='账户流水表';
4.4 对账记录表
CREATE TABLE reconciliation_records (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
payment_order_id VARCHAR(64) NOT NULL,
transaction_id VARCHAR(64) COMMENT '第三方交易号',
internal_amount DECIMAL(10, 2) COMMENT '内部金额',
external_amount DECIMAL(10, 2) COMMENT '第三方金额',
status VARCHAR(20) NOT NULL COMMENT 'matched|unmatched|missing|excess',
reconciliation_date DATE NOT NULL COMMENT '对账日期',
difference_amount DECIMAL(10, 2) COMMENT '差异金额',
difference_reason TEXT,
handled BOOLEAN DEFAULT FALSE COMMENT '是否已处理',
handled_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_date_status (reconciliation_date, status),
INDEX idx_payment_order (payment_order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='对账记录表';
4.5 幂等表
CREATE TABLE idempotent_records (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
idempotent_key VARCHAR(128) NOT NULL UNIQUE COMMENT '幂等键',
request_id VARCHAR(64) NOT NULL COMMENT '请求ID',
status VARCHAR(20) NOT NULL COMMENT 'processing|success|failed',
request_data TEXT COMMENT '请求数据',
response_data TEXT COMMENT '响应数据',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
expires_at DATETIME COMMENT '过期时间',
INDEX idx_expires (expires_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='幂等记录表';
5. 架构设计
5.1 整体架构
┌─────────────────────────────────────────────────────────────┐
│ 用户端 │
│ (Web / App / H5) │
└─────────────────────┬───────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ API Gateway │
│ (认证、限流、路由、熔断) │
└─────────────┬───────────────────────────────┬───────────────┘
│ │
┌──────────────────────────┐ ┌──────────────────────────┐
│ 业务服务层 │ │ 支付服务层 │
│ (订单、商品、用户) │◄──►│ (支付核心服务) │
└──────────────────────────┘ └─────────┬────────────────┘
│
┌─────────────────────┼─────────────────────┐
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ 账户服务 │ │ 对账服务 │ │ 通知服务 │
│ (余额、流水) │ │ (自动对账) │ │ (异步通知) │
└────────┬─────────┘ └──────────────────┘ └─────────────────┘
│
┌─────────────────┐
│ 数据库层 │
│ (主从、分库分表) │
└─────────────────┘
┌─────────────────────────────────────────┐
│ 第三方支付 │
│ (微信支付、支付宝、银行直连) │
└─────────────────────────────────────────┘
5.2 V1: 单体架构(MVP)
适用场景:初创公司、日订单 < 1 万
package main
import (
"database/sql"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
_ "github.com/go-sql-driver/mysql"
)
// PaymentOrder 支付订单
type PaymentOrder struct {
ID int64 `json:"id"`
PaymentOrderID string `json:"payment_order_id"`
OrderID string `json:"order_id"`
UserID int64 `json:"user_id"`
Amount float64 `json:"amount"`
Currency string `json:"currency"`
PaymentMethod string `json:"payment_method"`
Status string `json:"status"`
Subject string `json:"subject"`
Description string `json:"description"`
TransactionID string `json:"transaction_id,omitempty"`
NotifyURL string `json:"notify_url"`
ReturnURL string `json:"return_url"`
ClientIP string `json:"client_ip"`
PaidAt *time.Time `json:"paid_at,omitempty"`
ExpiresAt time.Time `json:"expires_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// CreatePaymentRequest 创建支付请求
type CreatePaymentRequest struct {
OrderID string `json:"order_id"`
UserID int64 `json:"user_id"`
Amount float64 `json:"amount"`
Currency string `json:"currency"`
PaymentMethod string `json:"payment_method"`
Subject string `json:"subject"`
Description string `json:"description"`
NotifyURL string `json:"notify_url"`
ReturnURL string `json:"return_url"`
Timeout int `json:"timeout"`
ClientIP string `json:"client_ip"`
}
// PaymentService 支付服务
type PaymentService struct {
db *sql.DB
}
// NewPaymentService 创建支付服务
func NewPaymentService(db *sql.DB) *PaymentService {
return &PaymentService{db: db}
}
// CreatePaymentOrder 创建支付订单
func (s *PaymentService) CreatePaymentOrder(req *CreatePaymentRequest) (*PaymentOrder, error) {
// 1. 生成支付订单号
paymentOrderID := generatePaymentOrderID()
// 2. 检查业务订单是否已支付
var existingOrderID string
err := s.db.QueryRow(`
SELECT payment_order_id
FROM payment_orders
WHERE order_id = ? AND status = 'success'
LIMIT 1
`, req.OrderID).Scan(&existingOrderID)
if err == nil {
return nil, fmt.Errorf("订单已支付: %s", existingOrderID)
}
// 3. 创建支付订单
expiresAt := time.Now().Add(time.Duration(req.Timeout) * time.Second)
result, err := s.db.Exec(`
INSERT INTO payment_orders (
payment_order_id, order_id, user_id, amount, currency,
payment_method, status, subject, description,
notify_url, return_url, client_ip, expires_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, paymentOrderID, req.OrderID, req.UserID, req.Amount, req.Currency,
req.PaymentMethod, "pending", req.Subject, req.Description,
req.NotifyURL, req.ReturnURL, req.ClientIP, expiresAt)
if err != nil {
return nil, fmt.Errorf("创建支付订单失败: %w", err)
}
orderID, _ := result.LastInsertId()
// 4. 调用第三方支付接口(这里简化处理)
qrCode := ""
switch req.PaymentMethod {
case "wechat":
qrCode = callWeChatPay(paymentOrderID, req.Amount)
case "alipay":
qrCode = callAlipay(paymentOrderID, req.Amount)
case "balance":
// 余额支付直接扣款
err = s.deductBalance(req.UserID, req.Amount, paymentOrderID)
if err != nil {
return nil, err
}
}
// 5. 返回支付订单
order := &PaymentOrder{
ID: orderID,
PaymentOrderID: paymentOrderID,
OrderID: req.OrderID,
UserID: req.UserID,
Amount: req.Amount,
Currency: req.Currency,
PaymentMethod: req.PaymentMethod,
Status: "pending",
Subject: req.Subject,
Description: req.Description,
NotifyURL: req.NotifyURL,
ReturnURL: req.ReturnURL,
ClientIP: req.ClientIP,
ExpiresAt: expiresAt,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// 如果有二维码,添加到响应中
if qrCode != "" {
order.Description = qrCode // 简化:实际应该用单独字段
}
return order, nil
}
// deductBalance 扣除余额
func (s *PaymentService) deductBalance(userID int64, amount float64, paymentOrderID string) error {
// 1. 开启事务
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 2. 锁定账户(使用 FOR UPDATE)
var balance float64
var version int
err = tx.QueryRow(`
SELECT balance, version
FROM accounts
WHERE user_id = ?
FOR UPDATE
`, userID).Scan(&balance, &version)
if err != nil {
return fmt.Errorf("账户不存在: %w", err)
}
// 3. 检查余额是否足够
if balance < amount {
return fmt.Errorf("余额不足: 当前余额 %.2f, 需要 %.2f", balance, amount)
}
// 4. 扣减余额(使用乐观锁)
result, err := tx.Exec(`
UPDATE accounts
SET balance = balance - ?,
total_balance = total_balance - ?,
version = version + 1,
updated_at = NOW()
WHERE user_id = ? AND version = ?
`, amount, amount, userID, version)
if err != nil {
return fmt.Errorf("扣款失败: %w", err)
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
return fmt.Errorf("扣款失败: 版本冲突")
}
// 5. 记录流水
transactionID := generateTransactionID()
_, err = tx.Exec(`
INSERT INTO account_transactions (
transaction_id, user_id, account_id, type, amount,
balance_before, balance_after, business_type, business_id, remark
) VALUES (?, ?, (SELECT id FROM accounts WHERE user_id = ?), ?, ?, ?, ?, ?, ?, ?)
`, transactionID, userID, userID, "debit", amount,
balance, balance-amount, "payment", paymentOrderID, "支付扣款")
if err != nil {
return fmt.Errorf("记录流水失败: %w", err)
}
// 6. 更新支付订单状态
_, err = tx.Exec(`
UPDATE payment_orders
SET status = 'success', paid_at = NOW(), updated_at = NOW()
WHERE payment_order_id = ?
`, paymentOrderID)
if err != nil {
return fmt.Errorf("更新订单状态失败: %w", err)
}
// 7. 提交事务
if err = tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}
return nil
}
// QueryPaymentOrder 查询支付订单
func (s *PaymentService) QueryPaymentOrder(paymentOrderID string) (*PaymentOrder, error) {
var order PaymentOrder
err := s.db.QueryRow(`
SELECT id, payment_order_id, order_id, user_id, amount, currency,
payment_method, status, subject, description, transaction_id,
notify_url, return_url, client_ip, paid_at, expires_at,
created_at, updated_at
FROM payment_orders
WHERE payment_order_id = ?
`, paymentOrderID).Scan(
&order.ID, &order.PaymentOrderID, &order.OrderID, &order.UserID,
&order.Amount, &order.Currency, &order.PaymentMethod, &order.Status,
&order.Subject, &order.Description, &order.TransactionID,
&order.NotifyURL, &order.ReturnURL, &order.ClientIP,
&order.PaidAt, &order.ExpiresAt, &order.CreatedAt, &order.UpdatedAt,
)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("订单不存在")
}
if err != nil {
return nil, err
}
return &order, nil
}
// HandleCallback 处理支付回调
func (s *PaymentService) HandleCallback(paymentMethod string, data map[string]interface{}) error {
// 1. 验证签名
if !verifySign(paymentMethod, data) {
return fmt.Errorf("签名验证失败")
}
// 2. 提取订单号和交易号
paymentOrderID := data["out_trade_no"].(string)
transactionID := data["transaction_id"].(string)
resultCode := data["result_code"].(string)
// 3. 查询订单
order, err := s.QueryPaymentOrder(paymentOrderID)
if err != nil {
return err
}
// 4. 幂等性检查
if order.Status == "success" {
return nil // 已经处理过
}
// 5. 更新订单状态
var newStatus string
if resultCode == "SUCCESS" {
newStatus = "success"
} else {
newStatus = "failed"
}
_, err = s.db.Exec(`
UPDATE payment_orders
SET status = ?, transaction_id = ?, paid_at = NOW(), updated_at = NOW()
WHERE payment_order_id = ? AND status = 'pending'
`, newStatus, transactionID, paymentOrderID)
if err != nil {
return fmt.Errorf("更新订单状态失败: %w", err)
}
// 6. 异步通知业务系统
go s.notifyBusiness(order.NotifyURL, order)
return nil
}
// notifyBusiness 通知业务系统
func (s *PaymentService) notifyBusiness(notifyURL string, order *PaymentOrder) {
// 实现重试逻辑
maxRetries := 5
for i := 0; i < maxRetries; i++ {
resp, err := http.Post(notifyURL, "application/json", nil)
if err == nil && resp.StatusCode == 200 {
return
}
// 指数退避
time.Sleep(time.Duration(1<<uint(i)) * time.Second)
}
}
// 辅助函数
func generatePaymentOrderID() string {
return "PAY" + time.Now().Format("20060102") + uuid.New().String()[:8]
}
func generateTransactionID() string {
return "TXN" + time.Now().Format("20060102150405") + uuid.New().String()[:6]
}
func callWeChatPay(orderID string, amount float64) string {
// 调用微信支付统一下单接口
// 返回二维码链接
return "weixin://wxpay/bizpayurl?pr=abc123"
}
func callAlipay(orderID string, amount float64) string {
// 调用支付宝支付接口
return "https://qr.alipay.com/abc123"
}
func verifySign(paymentMethod string, data map[string]interface{}) bool {
// 验证第三方签名
return true
}
// HTTP Handlers
func (s *PaymentService) CreatePaymentHandler(w http.ResponseWriter, r *http.Request) {
var req CreatePaymentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
order, err := s.CreatePaymentOrder(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"code": 0,
"message": "success",
"data": order,
})
}
func (s *PaymentService) QueryPaymentHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
paymentOrderID := vars["payment_order_id"]
order, err := s.QueryPaymentOrder(paymentOrderID)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"code": 0,
"message": "success",
"data": order,
})
}
func main() {
// 连接数据库
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/payment?parseTime=true")
if err != nil {
panic(err)
}
defer db.Close()
// 创建服务
service := NewPaymentService(db)
// 设置路由
r := mux.NewRouter()
r.HandleFunc("/api/v1/payment/orders", service.CreatePaymentHandler).Methods("POST")
r.HandleFunc("/api/v1/payment/orders/{payment_order_id}", service.QueryPaymentHandler).Methods("GET")
// 启动服务
fmt.Println("支付服务启动在 :8080")
http.ListenAndServe(":8080", r)
}
V1 特点:
- 单体应用,部署简单
- 使用数据库事务保证一致性
- 乐观锁防止并发问题
- 无法水平扩展
- 单点故障风险
5.3 V2: 微服务架构 + 分布式事务
优化点:
- 服务拆分(支付、账户、通知)
- TCC 分布式事务
- Redis 分布式锁
- 消息队列异步通知
// ==================== TCC 分布式事务实现 ====================
package tcc
import (
"context"
"fmt"
"time"
)
// TCCTransaction TCC事务
type TCCTransaction struct {
TransactionID string
Participants []*TCCParticipant
Status string // "trying" | "confirmed" | "cancelled"
CreatedAt time.Time
UpdatedAt time.Time
}
// TCCParticipant TCC参与者
type TCCParticipant struct {
ServiceName string
TryFunc func(ctx context.Context, txID string) error
ConfirmFunc func(ctx context.Context, txID string) error
CancelFunc func(ctx context.Context, txID string) error
}
// TCCCoordinator TCC协调器
type TCCCoordinator struct {
transactions map[string]*TCCTransaction
}
// NewTCCCoordinator 创建TCC协调器
func NewTCCCoordinator() *TCCCoordinator {
return &TCCCoordinator{
transactions: make(map[string]*TCCTransaction),
}
}
// ExecuteTCC 执行TCC事务
func (c *TCCCoordinator) ExecuteTCC(ctx context.Context, txID string, participants []*TCCParticipant) error {
// 1. 创建事务记录
tx := &TCCTransaction{
TransactionID: txID,
Participants: participants,
Status: "trying",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
c.transactions[txID] = tx
// 2. Try 阶段:预留资源
for _, participant := range participants {
if err := participant.TryFunc(ctx, txID); err != nil {
// Try失败,执行Cancel
c.cancelAll(ctx, txID, participants)
return fmt.Errorf("try失败 [%s]: %w", participant.ServiceName, err)
}
}
// 3. Confirm 阶段:确认提交
tx.Status = "confirming"
for _, participant := range participants {
if err := participant.ConfirmFunc(ctx, txID); err != nil {
// Confirm失败,需要人工介入或重试
return fmt.Errorf("confirm失败 [%s]: %w", participant.ServiceName, err)
}
}
tx.Status = "confirmed"
tx.UpdatedAt = time.Now()
return nil
}
// cancelAll 取消所有参与者
func (c *TCCCoordinator) cancelAll(ctx context.Context, txID string, participants []*TCCParticipant) {
tx := c.transactions[txID]
tx.Status = "cancelling"
for _, participant := range participants {
// Cancel 必须成功,失败需要重试
for i := 0; i < 3; i++ {
if err := participant.CancelFunc(ctx, txID); err == nil {
break
}
time.Sleep(time.Second * time.Duration(i+1))
}
}
tx.Status = "cancelled"
tx.UpdatedAt = time.Now()
}
// ==================== 支付服务 TCC 实现 ====================
// PaymentTCCService 支付TCC服务
type PaymentTCCService struct {
db *sql.DB
redis *redis.Client
}
// TryPayment Try阶段:冻结余额
func (s *PaymentTCCService) TryPayment(ctx context.Context, txID string, userID int64, amount float64) error {
// 1. 获取分布式锁
lockKey := fmt.Sprintf("payment:lock:user:%d", userID)
lock := s.acquireLock(lockKey, 10*time.Second)
if !lock {
return fmt.Errorf("获取锁失败")
}
defer s.releaseLock(lockKey)
// 2. 开启事务
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 3. 检查余额并冻结
var balance float64
err = tx.QueryRow(`
SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE
`, userID).Scan(&balance)
if err != nil {
return err
}
if balance < amount {
return fmt.Errorf("余额不足")
}
// 4. 冻结金额
_, err = tx.Exec(`
UPDATE accounts
SET balance = balance - ?,
frozen_balance = frozen_balance + ?,
version = version + 1
WHERE user_id = ?
`, amount, amount, userID)
if err != nil {
return err
}
// 5. 记录TCC事务
_, err = tx.Exec(`
INSERT INTO tcc_transactions (tx_id, user_id, amount, status, phase)
VALUES (?, ?, ?, 'trying', 'try')
`, txID, userID, amount)
if err != nil {
return err
}
return tx.Commit()
}
// ConfirmPayment Confirm阶段:确认扣款
func (s *PaymentTCCService) ConfirmPayment(ctx context.Context, txID string) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 1. 查询TCC事务
var userID int64
var amount float64
err = tx.QueryRow(`
SELECT user_id, amount FROM tcc_transactions
WHERE tx_id = ? AND status = 'trying'
`, txID).Scan(&userID, &amount)
if err != nil {
return err
}
// 2. 扣减冻结金额
_, err = tx.Exec(`
UPDATE accounts
SET frozen_balance = frozen_balance - ?,
total_balance = total_balance - ?
WHERE user_id = ?
`, amount, amount, userID)
if err != nil {
return err
}
// 3. 更新TCC事务状态
_, err = tx.Exec(`
UPDATE tcc_transactions
SET status = 'confirmed', phase = 'confirm', updated_at = NOW()
WHERE tx_id = ?
`, txID)
if err != nil {
return err
}
// 4. 记录流水
_, err = tx.Exec(`
INSERT INTO account_transactions (
transaction_id, user_id, account_id, type, amount,
balance_before, balance_after, business_type, business_id
) SELECT ?, user_id, id, 'debit', ?,
balance + ?, balance, 'payment', ?
FROM accounts WHERE user_id = ?
`, generateTransactionID(), amount, amount, txID, userID)
return tx.Commit()
}
// CancelPayment Cancel阶段:解冻余额
func (s *PaymentTCCService) CancelPayment(ctx context.Context, txID string) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 1. 查询TCC事务
var userID int64
var amount float64
err = tx.QueryRow(`
SELECT user_id, amount FROM tcc_transactions
WHERE tx_id = ? AND status = 'trying'
`, txID).Scan(&userID, &amount)
if err != nil {
return err
}
// 2. 解冻金额
_, err = tx.Exec(`
UPDATE accounts
SET balance = balance + ?,
frozen_balance = frozen_balance - ?
WHERE user_id = ?
`, amount, amount, userID)
if err != nil {
return err
}
// 3. 更新TCC事务状态
_, err = tx.Exec(`
UPDATE tcc_transactions
SET status = 'cancelled', phase = 'cancel', updated_at = NOW()
WHERE tx_id = ?
`, txID)
return tx.Commit()
}
// ==================== 分布式锁实现 ====================
func (s *PaymentTCCService) acquireLock(key string, expiration time.Duration) bool {
// 使用 Redis SET NX EX 实现
result, err := s.redis.SetNX(context.Background(), key, "1", expiration).Result()
return err == nil && result
}
func (s *PaymentTCCService) releaseLock(key string) {
s.redis.Del(context.Background(), key)
}
// ==================== 完整支付流程 ====================
func (s *PaymentTCCService) ProcessPayment(ctx context.Context, orderID string, userID int64, amount float64) error {
txID := generateTxID()
// 创建TCC协调器
coordinator := NewTCCCoordinator()
// 定义参与者
participants := []*TCCParticipant{
// 账户服务
{
ServiceName: "AccountService",
TryFunc: func(ctx context.Context, txID string) error {
return s.TryPayment(ctx, txID, userID, amount)
},
ConfirmFunc: func(ctx context.Context, txID string) error {
return s.ConfirmPayment(ctx, txID)
},
CancelFunc: func(ctx context.Context, txID string) error {
return s.CancelPayment(ctx, txID)
},
},
// 可以添加更多参与者(如订单服务、库存服务)
}
// 执行TCC事务
return coordinator.ExecuteTCC(ctx, txID, participants)
}
func generateTxID() string {
return "TCC" + time.Now().Format("20060102150405") + uuid.New().String()[:6]
}
V2 特点:
- 微服务架构,服务独立部署
- TCC 分布式事务保证一致性
- Redis 分布式锁防止并发
- 支持水平扩展
- ⚠️ TCC 实现复杂
- ⚠️ 需要处理 Confirm/Cancel 失败
5.4 V3: 最终一致性 + 可靠消息
优化点:
- 本地消息表 + 定时任务
- RocketMQ 事务消息
- 最大努力通知
- 对账系统
// ==================== 本地消息表方案 ====================
package eventual
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
)
// LocalMessage 本地消息
type LocalMessage struct {
ID int64
MessageID string
BusinessType string
BusinessID string
MessageBody string
Status string // "pending" | "sent" | "consumed"
RetryCount int
MaxRetry int
NextRetryTime time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
// EventualConsistencyPayment 最终一致性支付
type EventualConsistencyPayment struct {
db *sql.DB
mq MessageQueue
}
// CreatePaymentWithMessage 创建支付订单并发送消息
func (p *EventualConsistencyPayment) CreatePaymentWithMessage(
ctx context.Context,
orderID string,
userID int64,
amount float64,
) error {
// 1. 开启数据库事务
tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 2. 创建支付订单
paymentOrderID := generatePaymentOrderID()
_, err = tx.Exec(`
INSERT INTO payment_orders (
payment_order_id, order_id, user_id, amount, status
) VALUES (?, ?, ?, ?, 'pending')
`, paymentOrderID, orderID, userID, amount)
if err != nil {
return fmt.Errorf("创建订单失败: %w", err)
}
// 3. 插入本地消息表
messageID := generateMessageID()
messageBody, _ := json.Marshal(map[string]interface{}{
"payment_order_id": paymentOrderID,
"order_id": orderID,
"user_id": userID,
"amount": amount,
"event_type": "payment_created",
})
_, err = tx.Exec(`
INSERT INTO local_messages (
message_id, business_type, business_id, message_body,
status, retry_count, max_retry, next_retry_time
) VALUES (?, 'payment', ?, ?, 'pending', 0, 5, NOW())
`, messageID, paymentOrderID, string(messageBody))
if err != nil {
return fmt.Errorf("插入消息失败: %w", err)
}
// 4. 提交事务(原子性保证)
if err = tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}
// 5. 异步发送消息(定时任务处理)
// 这里只是记录到本地消息表,由定时任务负责发送
return nil
}
// MessageSendingJob 消息发送定时任务
func (p *EventualConsistencyPayment) MessageSendingJob() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
p.sendPendingMessages()
}
}
// sendPendingMessages 发送待发送消息
func (p *EventualConsistencyPayment) sendPendingMessages() {
// 1. 查询待发送消息
rows, err := p.db.Query(`
SELECT id, message_id, business_type, business_id, message_body, retry_count
FROM local_messages
WHERE status = 'pending'
AND next_retry_time <= NOW()
AND retry_count < max_retry
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return
}
defer rows.Close()
// 2. 遍历消息
for rows.Next() {
var msg LocalMessage
err := rows.Scan(&msg.ID, &msg.MessageID, &msg.BusinessType,
&msg.BusinessID, &msg.MessageBody, &msg.RetryCount)
if err != nil {
continue
}
// 3. 发送消息到MQ
err = p.mq.SendMessage(msg.BusinessType, msg.MessageBody)
if err != nil {
// 发送失败,更新重试次数
p.updateMessageRetry(&msg)
continue
}
// 4. 更新消息状态
_, err = p.db.Exec(`
UPDATE local_messages
SET status = 'sent', updated_at = NOW()
WHERE id = ?
`, msg.ID)
}
}
// updateMessageRetry 更新消息重试信息
func (p *EventualConsistencyPayment) updateMessageRetry(msg *LocalMessage) {
// 指数退避:1s, 2s, 4s, 8s, 16s
nextRetryTime := time.Now().Add(time.Duration(1<<uint(msg.RetryCount)) * time.Second)
_, err := p.db.Exec(`
UPDATE local_messages
SET retry_count = retry_count + 1,
next_retry_time = ?,
updated_at = NOW()
WHERE id = ?
`, nextRetryTime, msg.ID)
if err != nil {
fmt.Printf("更新消息重试失败: %v\n", err)
}
}
// ==================== RocketMQ 事务消息 ====================
type RocketMQTransactionProducer struct {
producer *rocketmq.TransactionProducer
}
// SendTransactionMessage 发送事务消息
func (p *RocketMQTransactionProducer) SendTransactionMessage(
topic string,
body []byte,
localTx func() error,
) error {
// 1. 发送半消息(Half Message)
msg := &rocketmq.Message{
Topic: topic,
Body: body,
}
result, err := p.producer.SendMessageInTransaction(msg)
if err != nil {
return err
}
// 2. 执行本地事务
if err := localTx(); err != nil {
// 本地事务失败,回滚消息
p.producer.Rollback(result.TransactionId)
return err
}
// 3. 提交消息
p.producer.Commit(result.TransactionId)
return nil
}
// CheckLocalTransaction 回查本地事务
func (p *RocketMQTransactionProducer) CheckLocalTransaction(msg *rocketmq.MessageExt) rocketmq.LocalTransactionState {
// 从消息中提取订单ID
var data map[string]interface{}
json.Unmarshal(msg.Body, &data)
paymentOrderID := data["payment_order_id"].(string)
// 查询支付订单状态
var status string
err := p.db.QueryRow(`
SELECT status FROM payment_orders WHERE payment_order_id = ?
`, paymentOrderID).Scan(&status)
if err != nil {
return rocketmq.UnknownTransaction
}
if status == "success" {
return rocketmq.CommitTransaction
} else if status == "failed" {
return rocketmq.RollbackTransaction
}
return rocketmq.UnknownTransaction
}
// ==================== 最大努力通知 ====================
type MaxEffortNotifier struct {
db *sql.DB
}
// NotifyWithRetry 带重试的通知
func (n *MaxEffortNotifier) NotifyWithRetry(notifyURL string, data interface{}) {
// 重试策略:1m, 5m, 10m, 30m, 1h, 2h, 6h, 24h
retryIntervals := []time.Duration{
1 * time.Minute,
5 * time.Minute,
10 * time.Minute,
30 * time.Minute,
1 * time.Hour,
2 * time.Hour,
6 * time.Hour,
24 * time.Hour,
}
for i := 0; i < len(retryIntervals); i++ {
// 发送通知
if n.sendNotification(notifyURL, data) {
return // 成功
}
// 失败,等待后重试
time.Sleep(retryIntervals[i])
}
// 所有重试都失败,记录到失败表,需要人工处理
n.recordFailedNotification(notifyURL, data)
}
func (n *MaxEffortNotifier) sendNotification(url string, data interface{}) bool {
body, _ := json.Marshal(data)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == 200
}
func (n *MaxEffortNotifier) recordFailedNotification(url string, data interface{}) {
body, _ := json.Marshal(data)
n.db.Exec(`
INSERT INTO failed_notifications (url, data, created_at)
VALUES (?, ?, NOW())
`, url, string(body))
}
// ==================== 对账系统 ====================
type ReconciliationService struct {
db *sql.DB
paymentGateway PaymentGateway
}
// DailyReconciliation 每日对账
func (s *ReconciliationService) DailyReconciliation(date time.Time) error {
// 1. 获取内部订单列表
internalOrders, err := s.getInternalOrders(date)
if err != nil {
return err
}
// 2. 获取第三方订单列表
externalOrders, err := s.paymentGateway.GetOrders(date)
if err != nil {
return err
}
// 3. 对账
internalMap := make(map[string]float64)
for _, order := range internalOrders {
internalMap[order.PaymentOrderID] = order.Amount
}
externalMap := make(map[string]float64)
for _, order := range externalOrders {
externalMap[order.OutTradeNo] = order.Amount
}
// 4. 找出差异
for orderID, internalAmount := range internalMap {
externalAmount, exists := externalMap[orderID]
if !exists {
// 内部有,第三方没有(可能是退款)
s.recordDifference(orderID, internalAmount, 0, "missing", date)
} else if internalAmount != externalAmount {
// 金额不一致
s.recordDifference(orderID, internalAmount, externalAmount, "unmatched", date)
} else {
// 一致
s.recordDifference(orderID, internalAmount, externalAmount, "matched", date)
}
}
// 5. 找出多余订单(第三方有,内部没有)
for orderID, externalAmount := range externalMap {
if _, exists := internalMap[orderID]; !exists {
s.recordDifference(orderID, 0, externalAmount, "excess", date)
}
}
return nil
}
func (s *ReconciliationService) getInternalOrders(date time.Time) ([]*PaymentOrder, error) {
rows, err := s.db.Query(`
SELECT payment_order_id, amount
FROM payment_orders
WHERE DATE(paid_at) = ? AND status = 'success'
`, date.Format("2006-01-02"))
if err != nil {
return nil, err
}
defer rows.Close()
orders := make([]*PaymentOrder, 0)
for rows.Next() {
var order PaymentOrder
rows.Scan(&order.PaymentOrderID, &order.Amount)
orders = append(orders, &order)
}
return orders, nil
}
func (s *ReconciliationService) recordDifference(
orderID string,
internalAmount, externalAmount float64,
status string,
date time.Time,
) {
differenceAmount := internalAmount - externalAmount
_, err := s.db.Exec(`
INSERT INTO reconciliation_records (
payment_order_id, internal_amount, external_amount,
status, reconciliation_date, difference_amount, handled
) VALUES (?, ?, ?, ?, ?, ?, FALSE)
`, orderID, internalAmount, externalAmount, status, date, differenceAmount)
if err != nil {
fmt.Printf("记录对账差异失败: %v\n", err)
}
}
// PaymentGateway 支付网关接口
type PaymentGateway interface {
GetOrders(date time.Time) ([]*ExternalOrder, error)
}
type ExternalOrder struct {
OutTradeNo string
Amount float64
}
type MessageQueue interface {
SendMessage(topic string, body string) error
}
func generateMessageID() string {
return "MSG" + time.Now().Format("20060102150405") + uuid.New().String()[:6]
}
V3 特点:
- 最终一致性,性能更高
- 本地消息表保证可靠性
- 定时对账发现问题
- 最大努力通知
- ⚠️ 存在短暂不一致
- 适合大规模场景
6. 核心问题与解决方案
6.1 幂等性保证
问题:用户重复点击支付按钮,或者网络重试,导致重复扣款。
解决方案:
方案1: 唯一订单号 + 数据库唯一索引
CREATE UNIQUE INDEX uk_order_id ON payment_orders(order_id);
方案2: 分布式锁
func (s *PaymentService) CreatePaymentOrderIdempotent(req *CreatePaymentRequest) (*PaymentOrder, error) {
lockKey := fmt.Sprintf("payment:create:order:%s", req.OrderID)
// 获取分布式锁
lock := s.redis.SetNX(context.Background(), lockKey, "1", 10*time.Second)
if !lock.Val() {
return nil, fmt.Errorf("订单正在处理中")
}
defer s.redis.Del(context.Background(), lockKey)
// 检查订单是否已存在
existing, err := s.QueryPaymentOrderByOrderID(req.OrderID)
if err == nil {
return existing, nil // 幂等返回
}
// 创建订单
return s.CreatePaymentOrder(req)
}
方案3: 幂等表
func (s *PaymentService) ProcessWithIdempotent(idempotentKey string, fn func() (interface{}, error)) (interface{}, error) {
// 1. 查询幂等记录
var record IdempotentRecord
err := s.db.QueryRow(`
SELECT status, response_data FROM idempotent_records WHERE idempotent_key = ?
`, idempotentKey).Scan(&record.Status, &record.ResponseData)
if err == nil {
// 已处理过
if record.Status == "success" {
return record.ResponseData, nil
} else if record.Status == "processing" {
return nil, fmt.Errorf("请求正在处理中")
}
}
// 2. 插入处理中记录
_, err = s.db.Exec(`
INSERT INTO idempotent_records (idempotent_key, request_id, status, expires_at)
VALUES (?, ?, 'processing', DATE_ADD(NOW(), INTERVAL 1 HOUR))
ON DUPLICATE KEY UPDATE status = 'processing'
`, idempotentKey, generateRequestID())
if err != nil {
return nil, err
}
// 3. 执行业务逻辑
result, err := fn()
// 4. 更新结果
if err != nil {
s.db.Exec(`UPDATE idempotent_records SET status = 'failed' WHERE idempotent_key = ?`, idempotentKey)
return nil, err
}
responseData, _ := json.Marshal(result)
s.db.Exec(`
UPDATE idempotent_records
SET status = 'success', response_data = ?
WHERE idempotent_key = ?
`, string(responseData), idempotentKey)
return result, nil
}
// 使用
func (s *PaymentService) CreatePayment(req *CreatePaymentRequest) (*PaymentOrder, error) {
idempotentKey := fmt.Sprintf("payment:create:%s", req.OrderID)
result, err := s.ProcessWithIdempotent(idempotentKey, func() (interface{}, error) {
return s.CreatePaymentOrder(req)
})
if err != nil {
return nil, err
}
return result.(*PaymentOrder), nil
}
6.2 超时与重试
问题:调用第三方支付超时,不知道是否成功。
解决方案:
type PaymentWithTimeout struct {
db *sql.DB
paymentGateway PaymentGateway
}
// ProcessPaymentWithRetry 带重试的支付处理
func (p *PaymentWithTimeout) ProcessPaymentWithRetry(
ctx context.Context,
orderID string,
amount float64,
) error {
// 1. 创建支付订单
paymentOrderID := generatePaymentOrderID()
// 2. 调用第三方支付(带超时)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// 重试策略:最多3次,指数退避
var lastErr error
for i := 0; i < 3; i++ {
err := p.callPaymentGateway(ctx, paymentOrderID, amount)
if err == nil {
return nil // 成功
}
lastErr = err
// 判断是否可重试
if !isRetryableError(err) {
return err // 不可重试的错误,直接返回
}
// 查询订单状态(避免重复扣款)
status, err := p.queryPaymentStatus(ctx, paymentOrderID)
if err == nil && status == "success" {
return nil // 已支付成功
}
// 指数退避
time.Sleep(time.Duration(1<<uint(i)) * time.Second)
}
// 3. 所有重试都失败,标记订单为待处理
p.markOrderAsPending(paymentOrderID)
return fmt.Errorf("支付失败: %w", lastErr)
}
func (p *PaymentWithTimeout) callPaymentGateway(ctx context.Context, orderID string, amount float64) error {
// 模拟调用第三方
select {
case <-ctx.Done():
return fmt.Errorf("支付超时")
case <-time.After(2 * time.Second):
return nil
}
}
func (p *PaymentWithTimeout) queryPaymentStatus(ctx context.Context, orderID string) (string, error) {
// 查询第三方订单状态
return "unknown", nil
}
func (p *PaymentWithTimeout) markOrderAsPending(orderID string) {
p.db.Exec(`
UPDATE payment_orders
SET status = 'pending_confirm'
WHERE payment_order_id = ?
`, orderID)
}
func isRetryableError(err error) bool {
// 超时、网络错误可重试
// 余额不足、订单不存在等不可重试
return err.Error() == "支付超时" || err.Error() == "网络错误"
}
6.3 防止资金损失
问题:如何确保资金不会少扣、多扣?
解决方案:
- 数据库事务 + 行锁
func (s *PaymentService) DeductBalanceSafely(userID int64, amount float64) error {
tx, _ := s.db.Begin()
defer tx.Rollback()
// 行锁
var balance float64
tx.QueryRow(`SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE`, userID).Scan(&balance)
if balance < amount {
return fmt.Errorf("余额不足")
}
// 扣款
tx.Exec(`UPDATE accounts SET balance = balance - ? WHERE user_id = ?`, amount, userID)
return tx.Commit()
}
- 乐观锁
func (s *PaymentService) DeductBalanceOptimistic(userID int64, amount float64) error {
for i := 0; i < 3; i++ {
// 读取余额和版本号
var balance float64
var version int
s.db.QueryRow(`SELECT balance, version FROM accounts WHERE user_id = ?`, userID).
Scan(&balance, &version)
if balance < amount {
return fmt.Errorf("余额不足")
}
// 更新(带版本号)
result, _ := s.db.Exec(`
UPDATE accounts
SET balance = balance - ?, version = version + 1
WHERE user_id = ? AND version = ?
`, amount, userID, version)
rowsAffected, _ := result.RowsAffected()
if rowsAffected > 0 {
return nil // 成功
}
// 版本冲突,重试
time.Sleep(time.Millisecond * 10)
}
return fmt.Errorf("扣款失败: 并发冲突")
}
- 流水对账
func (s *PaymentService) DailyBalanceCheck() {
// 每日对账:余额 = 期初余额 + 收入 - 支出
rows, _ := s.db.Query(`
SELECT user_id, balance FROM accounts
`)
for rows.Next() {
var userID int64
var currentBalance float64
rows.Scan(&userID, ¤tBalance)
// 计算理论余额
var totalCredit, totalDebit float64
s.db.QueryRow(`
SELECT
COALESCE(SUM(CASE WHEN type = 'credit' THEN amount ELSE 0 END), 0),
COALESCE(SUM(CASE WHEN type = 'debit' THEN amount ELSE 0 END), 0)
FROM account_transactions
WHERE user_id = ?
`, userID).Scan(&totalCredit, &totalDebit)
expectedBalance := totalCredit - totalDebit
// 检查差异
if math.Abs(currentBalance-expectedBalance) > 0.01 {
fmt.Printf("账户 %d 余额异常: 当前=%.2f, 期望=%.2f\n",
userID, currentBalance, expectedBalance)
}
}
}
7. 性能优化
7.1 数据库优化
7.1.1 索引优化
-- 支付订单表索引
CREATE INDEX idx_user_status_created ON payment_orders(user_id, status, created_at);
CREATE INDEX idx_status_expires ON payment_orders(status, expires_at);
CREATE INDEX idx_paid_at ON payment_orders(paid_at);
-- 流水表索引
CREATE INDEX idx_user_created ON account_transactions(user_id, created_at);
CREATE INDEX idx_business ON account_transactions(business_type, business_id);
-- 对账表索引
CREATE INDEX idx_date_status ON reconciliation_records(reconciliation_date, status);
7.1.2 分库分表
// 按用户ID分表
func getTableName(userID int64, tableCount int) string {
return fmt.Sprintf("account_transactions_%d", userID%int64(tableCount))
}
// 按日期分表
func getTableNameByDate(date time.Time) string {
return fmt.Sprintf("payment_orders_%s", date.Format("200601"))
}
7.1.3 读写分离
type PaymentRepository struct {
masterDB *sql.DB
slaveDB *sql.DB
}
// 写操作走主库
func (r *PaymentRepository) CreateOrder(order *PaymentOrder) error {
_, err := r.masterDB.Exec(`INSERT INTO payment_orders ...`, order)
return err
}
// 读操作走从库
func (r *PaymentRepository) QueryOrder(orderID string) (*PaymentOrder, error) {
var order PaymentOrder
err := r.slaveDB.QueryRow(`SELECT * FROM payment_orders WHERE payment_order_id = ?`, orderID).Scan(&order)
return &order, err
}
7.2 缓存优化
type PaymentCacheService struct {
db *sql.DB
redis *redis.Client
}
// 查询支付订单(带缓存)
func (s *PaymentCacheService) GetPaymentOrder(paymentOrderID string) (*PaymentOrder, error) {
cacheKey := fmt.Sprintf("payment:order:%s", paymentOrderID)
// 1. 查询缓存
cached, err := s.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
var order PaymentOrder
json.Unmarshal([]byte(cached), &order)
return &order, nil
}
// 2. 查询数据库
order, err := s.queryFromDB(paymentOrderID)
if err != nil {
return nil, err
}
// 3. 写入缓存
data, _ := json.Marshal(order)
s.redis.Set(context.Background(), cacheKey, data, 10*time.Minute)
return order, nil
}
// 更新订单时删除缓存
func (s *PaymentCacheService) UpdatePaymentOrder(order *PaymentOrder) error {
// 更新数据库
_, err := s.db.Exec(`UPDATE payment_orders SET ... WHERE payment_order_id = ?`, order.PaymentOrderID)
if err != nil {
return err
}
// 删除缓存
cacheKey := fmt.Sprintf("payment:order:%s", order.PaymentOrderID)
s.redis.Del(context.Background(), cacheKey)
return nil
}
7.3 异步处理
// 异步通知
func (s *PaymentService) NotifyAsync(notifyURL string, data interface{}) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("异步通知异常: %v\n", r)
}
}()
s.notifyBusiness(notifyURL, data)
}()
}
// 使用消息队列
func (s *PaymentService) PublishPaymentEvent(order *PaymentOrder) error {
event := map[string]interface{}{
"event_type": "payment_success",
"payment_order_id": order.PaymentOrderID,
"order_id": order.OrderID,
"amount": order.Amount,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(event)
return s.mq.Publish("payment.events", body)
}
8. 监控与告警
8.1 关键指标
// Prometheus 指标
var (
paymentTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "payment_total",
Help: "Total payment requests",
},
[]string{"method", "status"},
)
paymentDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "payment_duration_seconds",
Help: "Payment request duration",
Buckets: []float64{0.1, 0.5, 1, 2, 5, 10},
},
[]string{"method"},
)
accountBalance = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "account_balance",
Help: "User account balance",
},
[]string{"user_id"},
)
reconciliationErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "reconciliation_errors_total",
Help: "Total reconciliation errors",
},
)
)
// 记录指标
func (s *PaymentService) RecordMetrics(method string, status string, duration time.Duration) {
paymentTotal.WithLabelValues(method, status).Inc()
paymentDuration.WithLabelValues(method).Observe(duration.Seconds())
}
8.2 告警规则
# prometheus-alerts.yml
groups:
- name: payment_alerts
rules:
# 支付成功率低于 95%
- alert: PaymentSuccessRateLow
expr: |
sum(rate(payment_total{status="success"}[5m])) /
sum(rate(payment_total[5m])) < 0.95
for: 5m
labels:
severity: critical
annotations:
summary: "支付成功率过低"
description: "5分钟内支付成功率低于 95%"
# 支付响应时间过长
- alert: PaymentSlowResponse
expr: |
histogram_quantile(0.99,
rate(payment_duration_seconds_bucket[5m])
) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "支付响应时间过长"
description: "P99 响应时间超过 2 秒"
# 对账异常
- alert: ReconciliationError
expr: increase(reconciliation_errors_total[1h]) > 10
labels:
severity: critical
annotations:
summary: "对账发现异常"
description: "1小时内对账错误超过 10 次"
# 余额异常波动
- alert: BalanceAbnormal
expr: abs(delta(account_balance[5m])) > 10000
labels:
severity: warning
annotations:
summary: "账户余额异常波动"
description: "5分钟内余额变化超过 10000"
8.3 日志规范
type PaymentLogger struct {
logger *zap.Logger
}
// 结构化日志
func (l *PaymentLogger) LogPayment(order *PaymentOrder, action string, err error) {
fields := []zap.Field{
zap.String("payment_order_id", order.PaymentOrderID),
zap.String("order_id", order.OrderID),
zap.Int64("user_id", order.UserID),
zap.Float64("amount", order.Amount),
zap.String("payment_method", order.PaymentMethod),
zap.String("action", action),
zap.String("status", order.Status),
}
if err != nil {
fields = append(fields, zap.Error(err))
l.logger.Error("支付操作失败", fields...)
} else {
l.logger.Info("支付操作成功", fields...)
}
}
// 审计日志
func (l *PaymentLogger) AuditLog(userID int64, action string, details map[string]interface{}) {
l.logger.Info("审计日志",
zap.Int64("user_id", userID),
zap.String("action", action),
zap.Any("details", details),
zap.Time("timestamp", time.Now()),
zap.String("ip", getClientIP()),
)
}
9. 面试问答(10个高频题)
如何保证支付不会重复扣款?
答:
- 唯一订单号 + 数据库唯一索引:防止同一订单多次支付
- 幂等表:记录每次请求,相同请求返回相同结果
- 分布式锁:同一时间只允许一个请求处理
- 状态机:订单状态只能从 pending → success,不能重复转换
- 版本号(乐观锁):防止并发更新
支付超时了怎么处理?
答:
- 主动查询:调用第三方查询接口,确认支付状态
- 重试机制:可重试的错误(网络超时)进行重试
- 补偿机制:定时任务扫描超时订单,主动查询
- 用户提示:告知用户支付结果待确认,稍后查看
- 对账:每日对账发现差异
示例:
// 超时处理流程
if err := callPayment(); err != nil {
if isTimeout(err) {
// 查询第三方状态
status := queryThirdParty(orderID)
if status == "success" {
updateOrderSuccess()
} else if status == "failed" {
updateOrderFailed()
} else {
// 未知,标记为待确认
markAsPending()
}
}
}
如何设计对账系统?
答:
对账流程:
- 获取内部订单:查询昨日成功订单
- 获取第三方账单:调用第三方接口下载账单
- 比对:
- 订单号一致,金额一致 → 匹配
- 订单号一致,金额不一致 → 差异
- 内部有,第三方无 → 可能退款
- 第三方有,内部无 → 可能漏单
- 差异处理:
- 自动补单
- 人工核实
- 发起退款
对账时机:
- 实时对账:支付完成后立即对账
- 批量对账:T+1 日批量对账
- 定期对账:每周、每月对账
TCC 和本地消息表有什么区别?
答:
| 对比项 | TCC | 本地消息表 |
|---|---|---|
| 一致性 | 强一致性(Try成功即预留资源) | 最终一致性 |
| 性能 | 较慢(3阶段) | 较快(异步) |
| 实现复杂度 | 高(需要 Try/Confirm/Cancel) | 中(需要定时任务) |
| 适用场景 | 资金、库存等强一致场景 | 通知、积分等最终一致场景 |
| 补偿 | 需要 Cancel 逻辑 | 需要重试机制 |
选择建议:
- 支付扣款:TCC(强一致)
- 订单通知:本地消息表(最终一致)
- 积分增加:本地消息表
如何防止资金损失?
答:
技术保障:
- 数据库事务:扣款和记录流水原子性
- 行锁/乐观锁:防止并发扣款
- 幂等性:防止重复扣款
- 流水记录:每笔操作都有完整日志
- 对账:定期核对余额
业务保障:
- 限额:单笔/单日限额
- 风控:异常交易预警
- 人工审核:大额交易
- 保险:资金损失保险
代码示例:
tx, _ := db.Begin()
defer tx.Rollback()
// 1. 锁定账户
tx.QueryRow(`SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE`, userID)
// 2. 检查余额
if balance < amount {
return errors.New("余额不足")
}
// 3. 扣款
tx.Exec(`UPDATE accounts SET balance = balance - ? WHERE user_id = ?`, amount, userID)
// 4. 记录流水
tx.Exec(`INSERT INTO account_transactions ...`)
// 5. 提交
tx.Commit()
微信支付和支付宝有什么区别?
答:
| 对比项 | 微信支付 | 支付宝 |
|---|---|---|
| 支付方式 | 扫码、H5、APP、小程序 | 扫码、网页、APP |
| 回调方式 | 异步回调 + 主动查询 | 异步回调 + 主动查询 |
| 签名算法 | MD5/HMAC-SHA256 | RSA/RSA2 |
| 证书 | API证书(.p12) | 公私钥(.pem) |
| 退款 | 原路退回 | 原路退回 |
| 对账单 | 下载CSV | 下载CSV |
共同点:
- 都支持异步回调
- 都需要验证签名
- 都支持退款
- 都提供对账单
如何设计支付路由?
答:
支付路由策略:
- 用户选择:用户主动选择支付方式
- 智能路由:
- 成本最低(手续费)
- 成功率最高
- 响应最快
- 动态路由:
- 节假日切换通道
- 故障自动切换
- 限额自动切换
实现:
type PaymentRouter struct {
gateways map[string]PaymentGateway
selector GatewaySelector
}
func (r *PaymentRouter) Route(amount float64, userID int64) PaymentGateway {
// 1. 根据金额选择
if amount < 100 {
return r.gateways["wechat"]
}
// 2. 根据成功率选择
wechatSuccessRate := r.selector.GetSuccessRate("wechat")
alipaySuccessRate := r.selector.GetSuccessRate("alipay")
if wechatSuccessRate > alipaySuccessRate {
return r.gateways["wechat"]
}
return r.gateways["alipay"]
}
大促场景下如何保证支付系统稳定?
答:
容量规划:
- 提前扩容:根据历史数据预估流量
- 限流:保护系统不被打垮
- 降级:非核心功能降级
架构优化:
- 读写分离:查询走从库
- 缓存:热点订单缓存
- 异步:通知、对账异步处理
- 服务隔离:支付和查询分离
示例:
// 限流
limiter := rate.NewLimiter(1000, 2000) // 1000 QPS,桶大小 2000
func (s *PaymentService) CreatePaymentWithLimit(req *CreatePaymentRequest) error {
if !limiter.Allow() {
return errors.New("系统繁忙,请稍后重试")
}
return s.CreatePaymentOrder(req)
}
// 降级
func (s *PaymentService) QueryOrderWithDegraded(orderID string) (*PaymentOrder, error) {
if isDegraded() {
// 降级:只返回基本信息
return s.QueryBasicInfo(orderID)
}
return s.QueryFullInfo(orderID)
}
如何设计退款功能?
答:
退款流程:
- 申请退款:用户发起退款请求
- 校验:
- 订单是否已支付
- 是否已退款
- 退款金额是否合法
- 调用第三方:申请退款
- 更新状态:记录退款订单
- 回调处理:处理退款结果
- 通知用户:退款成功/失败
特殊处理:
- 部分退款:支持多次部分退款
- 退款超时:定时查询退款状态
- 退款失败:原路退回失败,余额退款
func (s *PaymentService) Refund(paymentOrderID string, amount float64, reason string) (*RefundOrder, error) {
// 1. 查询原支付订单
payment, err := s.QueryPaymentOrder(paymentOrderID)
if err != nil {
return nil, err
}
if payment.Status != "success" {
return nil, errors.New("订单未支付成功")
}
// 2. 检查退款金额
refunded, _ := s.GetRefundedAmount(paymentOrderID)
if refunded+amount > payment.Amount {
return nil, errors.New("退款金额超过订单金额")
}
// 3. 创建退款订单
refundID := generateRefundID()
_, err = s.db.Exec(`
INSERT INTO refund_orders (refund_id, payment_order_id, amount, reason, status)
VALUES (?, ?, ?, ?, 'processing')
`, refundID, paymentOrderID, amount, reason)
// 4. 调用第三方退款
err = s.callRefundAPI(payment.PaymentMethod, paymentOrderID, amount)
if err != nil {
s.db.Exec(`UPDATE refund_orders SET status = 'failed' WHERE refund_id = ?`, refundID)
return nil, err
}
return &RefundOrder{RefundID: refundID, Status: "processing"}, nil
}
如何设计风控系统?
答:
风控维度:
- 用户维度:
- 新注册用户限额
- 历史信用评分
- 实名认证状态
- 交易维度:
- 单笔限额
- 单日累计限额
- 交易频率
- 行为维度:
- IP 地址(是否代理)
- 设备指纹
- 交易时间(深夜异常)
- 关系维度:
- 收款方黑名单
- 关联账户风险
风控策略:
type RiskControlService struct {
db *sql.DB
redis *redis.Client
}
// 风控检查
func (s *RiskControlService) CheckRisk(userID int64, amount float64, ip string) (bool, string) {
// 1. 检查用户黑名单
if s.isBlacklist(userID) {
return false, "用户已被限制"
}
// 2. 检查单笔限额
if amount > 50000 {
return false, "超过单笔限额"
}
// 3. 检查单日累计
todayAmount := s.getTodayAmount(userID)
if todayAmount+amount > 100000 {
return false, "超过单日限额"
}
// 4. 检查交易频率
count := s.getRecentCount(userID, 1*time.Minute)
if count > 10 {
return false, "交易频率过高"
}
// 5. 检查 IP
if s.isSuspiciousIP(ip) {
return false, "可疑IP地址"
}
// 6. 风险评分
score := s.calculateRiskScore(userID, amount, ip)
if score > 80 {
return false, "风险评分过高"
}
return true, "通过"
}
func (s *RiskControlService) calculateRiskScore(userID int64, amount float64, ip string) int {
score := 0
// 新用户 +20
if s.isNewUser(userID) {
score += 20
}
// 大额 +30
if amount > 10000 {
score += 30
}
// 异常IP +40
if s.isSuspiciousIP(ip) {
score += 40
}
// 深夜交易 +10
hour := time.Now().Hour()
if hour >= 0 && hour < 6 {
score += 10
}
return score
}
10. 总结
核心要点
资金安全第一
- 数据库事务保证原子性
- 行锁/乐观锁防并发
- 幂等性防重复
- 流水完整记录
分布式事务
- TCC:强一致性(Try-Confirm-Cancel)
- 本地消息表:最终一致性
- RocketMQ 事务消息
- 对账 + 补偿
高可用设计
- 超时重试
- 熔断降级
- 服务隔离
- 多活部署
性能优化
- 读写分离
- 缓存
- 异步处理
- 分库分表
风控系统
- 限额控制
- 风险评分
- 实时监控
- 人工审核
架构演进
V1: 单体 + 数据库事务
↓ (订单量增加)
V2: 微服务 + TCC 分布式事务
↓ (海量订单)
V3: 最终一致性 + 对账系统
面试建议
- 强调资金安全:任何优化都不能牺牲安全性
- 说明权衡:强一致 vs 性能、复杂度 vs 可靠性
- 实战经验:如果有支付相关经验,重点讲解
- 监控告警:展示对生产环境的理解
- 对账系统:这是面试官最关心的差异化点
本章完,祝面试顺利!