HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

【编程难度第四名】复杂交易系统 - 钱的事,一分都不能错

本系列文章

➤ [NO.1 调度器]

➤ [NO.2 一致性协议](Paxos / Raft)

➤ [NO.3 高性能异步系统](消息队列、回调、重试)

➤ NO.4 交易系统(钱的事不能错)

➤ [NO.5 普通业务系统](绝大多数人做的)

一、为什么交易系统如此困难?

1.1 核心挑战

代表系统:

  • 淘宝订单系统: 日均千万级订单,双 11 峰值 58 万笔/秒
  • 支付宝支付系统: 全球最大的支付平台,零容错
  • 美团外卖订单: 实时库存、骑手调度、超时取消
  • 券商交易系统: 毫秒级撮合,强一致性要求
  • 银行核心系统: 账务处理,必须保证强一致性

为什么难:

  • 钱的事不能错: 一分钱都不能多也不能少
  • 高并发冲突: 两个人同时买最后一件商品
  • 分布式事务: 订单、库存、账户、积分在不同系统
  • 状态机复杂: 订单有十几种状态和转换
  • 补偿机制: 失败了要能回滚,但不能用数据库事务

与普通系统的差异:

普通系统:
- 丢一条评论: 用户可能不在意
- 推荐不准: 用户可以刷新重试
- 数据延迟: 几秒钟无所谓

交易系统:
- 扣错钱: 用户投诉,公司赔钱
- 超卖: 商家亏损,用户投诉
- 订单丢失: 用户下单了但没记录
- 重复扣款: 用户支付了两次

每一个 Bug 都可能是几百万的损失!

二、核心难点深度解析

2.1 难点一:库存扣减的并发问题

经典超卖场景

商品库存: 1 件
用户 A: 下单(同时)
用户 B: 下单(同时)

时序图:
时刻 T1: A 查询库存 = 1 
时刻 T2: B 查询库存 = 1  (问题!)
时刻 T3: A 扣减库存,剩余 = 0
时刻 T4: B 扣减库存,剩余 = -1 ✗ (超卖!)
时刻 T5: A 和 B 都下单成功
时刻 T6: 但是只有 1 件商品!
时刻 T7: 商家亏损,用户投诉

这是电商系统最常见的 Bug!

方案一:悲观锁(SELECT FOR UPDATE)

原理: 先锁住库存行,再扣减

-- 开启事务
BEGIN;

-- 锁定库存行(其他事务必须等待)
SELECT stock, version
FROM products
WHERE id = 123
FOR UPDATE;

-- 检查库存
IF stock >= 1 THEN
    -- 扣减库存
    UPDATE products
    SET stock = stock - 1,
        updated_at = NOW()
    WHERE id = 123;

    -- 创建订单
    INSERT INTO orders (
        user_id,
        product_id,
        quantity,
        total_amount,
        status
    ) VALUES (
        1001,
        123,
        1,
        99.99,
        'Created'
    );

    COMMIT;
ELSE
    ROLLBACK;
    RETURN 'Stock not enough';
END IF;

Go 代码实现:

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrderWithLock(
    ctx context.Context,
    userID int64,
    productID int64,
    quantity int,
) (*Order, error) {
    // 开启事务
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()  // 失败自动回滚

    // 锁定库存行
    var stock int
    var price decimal.Decimal
    err = tx.QueryRowContext(ctx, `
        SELECT stock, price
        FROM products
        WHERE id = ?
        FOR UPDATE
    `, productID).Scan(&stock, &price)

    if err != nil {
        return nil, fmt.Errorf("query product failed: %w", err)
    }

    log.Info("locked product",
        "productID", productID,
        "stock", stock)

    // 检查库存
    if stock < quantity {
        return nil, errors.New("stock not enough")
    }

    // 扣减库存
    result, err := tx.ExecContext(ctx, `
        UPDATE products
        SET stock = stock - ?,
            updated_at = NOW()
        WHERE id = ?
    `, quantity, productID)

    if err != nil {
        return nil, fmt.Errorf("deduct stock failed: %w", err)
    }

    affected, _ := result.RowsAffected()
    log.Info("stock deducted",
        "productID", productID,
        "quantity", quantity,
        "affected", affected)

    // 创建订单
    totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))

    order := &Order{
        UserID:      userID,
        ProductID:   productID,
        Quantity:    quantity,
        TotalAmount: totalAmount,
        Status:      OrderStatusCreated,
        CreatedAt:   time.Now(),
    }

    result, err = tx.ExecContext(ctx, `
        INSERT INTO orders (
            user_id, product_id, quantity,
            total_amount, status, created_at
        ) VALUES (?, ?, ?, ?, ?, ?)
    `, order.UserID, order.ProductID, order.Quantity,
       order.TotalAmount, order.Status, order.CreatedAt)

    if err != nil {
        return nil, fmt.Errorf("create order failed: %w", err)
    }

    orderID, _ := result.LastInsertId()
    order.ID = orderID

    // 提交事务
    if err := tx.Commit(); err != nil {
        return nil, fmt.Errorf("commit failed: %w", err)
    }

    log.Info("order created",
        "orderID", orderID,
        "userID", userID,
        "productID", productID)

    return order, nil
}

优点:

  • 简单直接
  • 不会超卖
  • 数据库保证一致性

缺点:

  • 性能差,高并发时大量请求等待锁
  • 可能死锁(如果有多个商品)
  • 锁范围大,影响其他操作

性能测试:

并发: 1000 个请求同时购买同一商品
结果:
- 吞吐量: 500 QPS
- 平均延迟: 2 秒
- P99 延迟: 5 秒
- 大量请求超时

不适合高并发场景!

方案二:乐观锁(CAS - Compare And Swap)

原理: 使用版本号,更新时检查版本是否变化

-- 1. 查询库存和版本号
SELECT stock, version, price
FROM products
WHERE id = 123;
-- 结果: stock=10, version=5, price=99.99

-- 2. 尝试扣减(CAS)
UPDATE products
SET stock = stock - 1,
    version = version + 1,
    updated_at = NOW()
WHERE id = 123
  AND version = 5  -- 关键! 版本号必须匹配
  AND stock >= 1;  -- 库存必须足够

-- 3. 检查影响行数
affected_rows = ?

IF affected_rows = 0 THEN
    -- 失败: 版本号变了(被别人抢先)或库存不足
    -- 重试或返回失败
    RETRY;
ELSE
    -- 成功: 创建订单
    INSERT INTO orders (...) VALUES (...);
END IF;

Go 代码实现:

func (s *OrderService) CreateOrderWithOptimisticLock(
    ctx context.Context,
    userID int64,
    productID int64,
    quantity int,
) (*Order, error) {
    // 最多重试 3 次
    maxRetries := 3

    for attempt := 0; attempt < maxRetries; attempt++ {
        // 1. 查询商品信息
        var stock int
        var version int64
        var price decimal.Decimal

        err := s.db.QueryRowContext(ctx, `
            SELECT stock, version, price
            FROM products
            WHERE id = ?
        `, productID).Scan(&stock, &version, &price)

        if err != nil {
            return nil, fmt.Errorf("query product failed: %w", err)
        }

        log.Debug("read product",
            "productID", productID,
            "stock", stock,
            "version", version,
            "attempt", attempt)

        // 2. 检查库存
        if stock < quantity {
            return nil, errors.New("stock not enough")
        }

        // 3. 尝试扣减库存(CAS)
        result, err := s.db.ExecContext(ctx, `
            UPDATE products
            SET stock = stock - ?,
                version = version + 1,
                updated_at = NOW()
            WHERE id = ?
              AND version = ?
              AND stock >= ?
        `, quantity, productID, version, quantity)

        if err != nil {
            return nil, fmt.Errorf("deduct stock failed: %w", err)
        }

        affected, _ := result.RowsAffected()

        if affected == 0 {
            // CAS 失败,重试
            log.Warn("CAS conflict, retrying",
                "productID", productID,
                "attempt", attempt)

            // 指数退避
            backoff := time.Duration(1<<uint(attempt)) * 10 * time.Millisecond
            time.Sleep(backoff)

            continue
        }

        log.Info("stock deducted",
            "productID", productID,
            "quantity", quantity,
            "newVersion", version+1)

        // 4. 创建订单
        totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))

        order := &Order{
            UserID:      userID,
            ProductID:   productID,
            Quantity:    quantity,
            TotalAmount: totalAmount,
            Status:      OrderStatusCreated,
            CreatedAt:   time.Now(),
        }

        result, err = s.db.ExecContext(ctx, `
            INSERT INTO orders (
                user_id, product_id, quantity,
                total_amount, status, created_at
            ) VALUES (?, ?, ?, ?, ?, ?)
        `, order.UserID, order.ProductID, order.Quantity,
           order.TotalAmount, order.Status, order.CreatedAt)

        if err != nil {
            // 订单创建失败,但库存已扣减!
            // 需要补偿机制(稍后讲解)
            log.Error("create order failed, stock already deducted",
                "productID", productID,
                "err", err)
            return nil, err
        }

        orderID, _ := result.LastInsertId()
        order.ID = orderID

        log.Info("order created",
            "orderID", orderID,
            "userID", userID,
            "attempt", attempt)

        return order, nil
    }

    // 重试次数用完
    return nil, errors.New("create order failed after retries")
}

优点:

  • 性能好,无锁
  • 不会超卖
  • 支持高并发

缺点:

  • 高并发时重试次数多
  • 实现稍复杂
  • 需要处理重试逻辑

性能测试:

并发: 1000 个请求同时购买同一商品
库存: 100 件

结果:
- 吞吐量: 5000 QPS
- 平均延迟: 50ms
- P99 延迟: 200ms
- 成功 100 个,失败 900 个
- 平均重试次数: 1.5 次

适合高并发场景!

方案三:Redis 原子扣减(终极方案)

原理: 利用 Redis 的单线程特性和 Lua 脚本保证原子性

type RedisStockService struct {
    redis *redis.Client
    db    *sql.DB
}

func (s *RedisStockService) DeductStock(
    ctx context.Context,
    productID int64,
    quantity int,
) (bool, error) {
    key := fmt.Sprintf("stock:%d", productID)

    // Lua 脚本保证原子性
    script := `
        local stock = redis.call('GET', KEYS[1])

        if not stock then
            return -1  -- 商品不存在
        end

        stock = tonumber(stock)
        local quantity = tonumber(ARGV[1])

        if stock >= quantity then
            -- 库存足够,扣减
            local remaining = redis.call('DECRBY', KEYS[1], quantity)
            return remaining  -- 返回剩余库存
        else
            return -2  -- 库存不足
        end
    `

    result, err := s.redis.Eval(ctx, script, []string{key}, quantity).Result()
    if err != nil {
        return false, fmt.Errorf("redis eval failed: %w", err)
    }

    remaining := result.(int64)

    switch {
    case remaining == -1:
        return false, errors.New("product not found")
    case remaining == -2:
        return false, errors.New("stock not enough")
    case remaining >= 0:
        log.Info("stock deducted",
            "productID", productID,
            "quantity", quantity,
            "remaining", remaining)

        // 异步更新数据库
        go s.updateDatabaseStock(productID, quantity)

        return true, nil
    default:
        return false, errors.New("unknown error")
    }
}

func (s *RedisStockService) updateDatabaseStock(productID int64, quantity int) {
    ctx := context.Background()

    _, err := s.db.ExecContext(ctx, `
        UPDATE products
        SET stock = stock - ?,
            updated_at = NOW()
        WHERE id = ?
    `, quantity, productID)

    if err != nil {
        log.Error("update database stock failed",
            "productID", productID,
            "err", err)

        // 补偿: 恢复 Redis 库存
        s.redis.IncrBy(ctx,
            fmt.Sprintf("stock:%d", productID),
            int64(quantity))
    }
}

// 初始化 Redis 库存(从数据库同步)
func (s *RedisStockService) SyncStockToRedis(productID int64) error {
    var stock int

    err := s.db.QueryRow(`
        SELECT stock FROM products WHERE id = ?
    `, productID).Scan(&stock)

    if err != nil {
        return err
    }

    key := fmt.Sprintf("stock:%d", productID)
    return s.redis.Set(context.Background(), key, stock, 24*time.Hour).Err()
}

完整的下单流程:

func (s *OrderService) CreateOrderWithRedis(
    ctx context.Context,
    userID int64,
    productID int64,
    quantity int,
) (*Order, error) {
    // 1. Redis 扣减库存(极快)
    success, err := s.stockService.DeductStock(ctx, productID, quantity)
    if err != nil || !success {
        return nil, fmt.Errorf("deduct stock failed: %w", err)
    }

    log.Info("redis stock deducted",
        "productID", productID,
        "quantity", quantity)

    // 2. 查询商品价格
    var price decimal.Decimal
    err = s.db.QueryRowContext(ctx, `
        SELECT price FROM products WHERE id = ?
    `, productID).Scan(&price)

    if err != nil {
        // 扣减失败,恢复库存
        s.stockService.RestoreStock(ctx, productID, quantity)
        return nil, err
    }

    // 3. 创建订单
    totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))

    order := &Order{
        UserID:      userID,
        ProductID:   productID,
        Quantity:    quantity,
        TotalAmount: totalAmount,
        Status:      OrderStatusCreated,
        CreatedAt:   time.Now(),
    }

    result, err := s.db.ExecContext(ctx, `
        INSERT INTO orders (
            user_id, product_id, quantity,
            total_amount, status, created_at
        ) VALUES (?, ?, ?, ?, ?, ?)
    `, order.UserID, order.ProductID, order.Quantity,
       order.TotalAmount, order.Status, order.CreatedAt)

    if err != nil {
        // 创建订单失败,恢复库存
        log.Error("create order failed, restoring stock",
            "productID", productID,
            "err", err)

        s.stockService.RestoreStock(ctx, productID, quantity)
        return nil, err
    }

    orderID, _ := result.LastInsertId()
    order.ID = orderID

    log.Info("order created",
        "orderID", orderID,
        "userID", userID)

    return order, nil
}

func (s *RedisStockService) RestoreStock(
    ctx context.Context,
    productID int64,
    quantity int,
) error {
    key := fmt.Sprintf("stock:%d", productID)
    return s.redis.IncrBy(ctx, key, int64(quantity)).Err()
}

优点:

  • 极高性能,支持 10 万+ QPS
  • 不会超卖
  • 无锁,无重试
  • 延迟极低(<1ms)

缺点:

  • Redis 和数据库需要保持一致
  • Redis 挂了需要降级
  • 需要处理数据同步

性能测试:

并发: 10000 个请求同时购买
库存: 1000 件

结果:
- 吞吐量: 100000 QPS
- 平均延迟: 1ms
- P99 延迟: 5ms
- 成功 1000 个,失败 9000 个
- 无重试

Redis 方案是性能之王!

方案四:预占库存(用户体验最佳)

场景: 用户下单后有 15 分钟支付时间

问题:

  • 如果立即扣减库存,用户不支付,库存被占用
  • 如果不扣减库存,可能超卖

解决方案: 预占库存

type ReservationService struct {
    redis *redis.Client
    db    *sql.DB
}

// 预占库存
func (s *ReservationService) ReserveStock(
    ctx context.Context,
    productID int64,
    quantity int,
    orderID int64,
    ttl time.Duration,
) (bool, error) {
    stockKey := fmt.Sprintf("stock:%d", productID)
    reservedKey := fmt.Sprintf("stock_reserved:%d:%d", productID, orderID)

    // Lua 脚本保证原子性
    script := `
        local stockKey = KEYS[1]
        local reservedKey = KEYS[2]
        local quantity = tonumber(ARGV[1])
        local ttl = tonumber(ARGV[2])

        local stock = redis.call('GET', stockKey)
        if not stock then
            return -1  -- 商品不存在
        end

        stock = tonumber(stock)

        if stock >= quantity then
            -- 库存足够,扣减并记录预占
            redis.call('DECRBY', stockKey, quantity)
            redis.call('SETEX', reservedKey, ttl, quantity)
            return 1  -- 成功
        else
            return 0  -- 库存不足
        end
    `

    result, err := s.redis.Eval(
        ctx,
        script,
        []string{stockKey, reservedKey},
        quantity,
        int(ttl.Seconds()),
    ).Result()

    if err != nil {
        return false, err
    }

    code := result.(int64)

    switch code {
    case -1:
        return false, errors.New("product not found")
    case 0:
        return false, errors.New("stock not enough")
    case 1:
        log.Info("stock reserved",
            "productID", productID,
            "orderID", orderID,
            "quantity", quantity,
            "ttl", ttl)
        return true, nil
    default:
        return false, errors.New("unknown error")
    }
}

// 确认预占(用户支付成功)
func (s *ReservationService) ConfirmReservation(
    ctx context.Context,
    productID int64,
    orderID int64,
) error {
    reservedKey := fmt.Sprintf("stock_reserved:%d:%d", productID, orderID)

    // 获取预占数量
    quantity, err := s.redis.Get(ctx, reservedKey).Int()
    if err == redis.Nil {
        return errors.New("reservation not found or expired")
    }
    if err != nil {
        return err
    }

    // 删除预占记录(已确认)
    s.redis.Del(ctx, reservedKey)

    // 更新数据库
    _, err = s.db.ExecContext(ctx, `
        UPDATE products
        SET stock = stock - ?,
            sold = sold + ?,
            updated_at = NOW()
        WHERE id = ?
    `, quantity, quantity, productID)

    if err != nil {
        log.Error("confirm reservation failed",
            "productID", productID,
            "orderID", orderID,
            "err", err)
        return err
    }

    log.Info("reservation confirmed",
        "productID", productID,
        "orderID", orderID,
        "quantity", quantity)

    return nil
}

// 释放预占(订单取消或超时)
func (s *ReservationService) ReleaseReservation(
    ctx context.Context,
    productID int64,
    orderID int64,
) error {
    stockKey := fmt.Sprintf("stock:%d", productID)
    reservedKey := fmt.Sprintf("stock_reserved:%d:%d", productID, orderID)

    // Lua 脚本保证原子性
    script := `
        local stockKey = KEYS[1]
        local reservedKey = KEYS[2]

        local quantity = redis.call('GET', reservedKey)
        if quantity then
            -- 恢复库存
            redis.call('INCRBY', stockKey, quantity)
            redis.call('DEL', reservedKey)
            return tonumber(quantity)
        end
        return 0
    `

    result, err := s.redis.Eval(
        ctx,
        script,
        []string{stockKey, reservedKey},
    ).Result()

    if err != nil {
        return err
    }

    releasedQty := result.(int64)

    log.Info("reservation released",
        "productID", productID,
        "orderID", orderID,
        "quantity", releasedQty)

    return nil
}

// 定时清理过期预占
func (s *ReservationService) StartCleanupWorker(ctx context.Context) {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            s.cleanupExpiredReservations(ctx)
        case <-ctx.Done():
            return
        }
    }
}

func (s *ReservationService) cleanupExpiredReservations(ctx context.Context) {
    // 查找所有超时未支付的订单
    rows, err := s.db.QueryContext(ctx, `
        SELECT id, product_id, quantity
        FROM orders
        WHERE status = 'Created'
          AND created_at < NOW() - INTERVAL 15 MINUTE
        LIMIT 100
    `)

    if err != nil {
        log.Error("query expired orders failed", "err", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var orderID, productID int64
        var quantity int

        if err := rows.Scan(&orderID, &productID, &quantity); err != nil {
            continue
        }

        // 释放预占库存
        s.ReleaseReservation(ctx, productID, orderID)

        // 更新订单状态为已取消
        _, err := s.db.ExecContext(ctx, `
            UPDATE orders
            SET status = 'Canceled',
                cancel_reason = 'Payment timeout',
                updated_at = NOW()
            WHERE id = ?
        `, orderID)

        if err != nil {
            log.Error("cancel order failed",
                "orderID", orderID,
                "err", err)
        } else {
            log.Info("order auto canceled",
                "orderID", orderID,
                "productID", productID)
        }
    }
}

完整的下单流程:

func (s *OrderService) CreateOrderWithReservation(
    ctx context.Context,
    userID int64,
    productID int64,
    quantity int,
) (*Order, error) {
    // 1. 创建订单(状态: Created)
    var price decimal.Decimal
    err := s.db.QueryRowContext(ctx, `
        SELECT price FROM products WHERE id = ?
    `, productID).Scan(&price)

    if err != nil {
        return nil, err
    }

    totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))

    order := &Order{
        UserID:      userID,
        ProductID:   productID,
        Quantity:    quantity,
        TotalAmount: totalAmount,
        Status:      OrderStatusCreated,
        CreatedAt:   time.Now(),
    }

    result, err := s.db.ExecContext(ctx, `
        INSERT INTO orders (
            user_id, product_id, quantity,
            total_amount, status, created_at
        ) VALUES (?, ?, ?, ?, ?, ?)
    `, order.UserID, order.ProductID, order.Quantity,
       order.TotalAmount, order.Status, order.CreatedAt)

    if err != nil {
        return nil, err
    }

    orderID, _ := result.LastInsertId()
    order.ID = orderID

    // 2. 预占库存(15 分钟)
    success, err := s.reservationService.ReserveStock(
        ctx,
        productID,
        quantity,
        orderID,
        15*time.Minute,
    )

    if err != nil || !success {
        // 预占失败,取消订单
        s.db.ExecContext(ctx, `
            UPDATE orders
            SET status = 'Canceled',
                cancel_reason = 'Stock not enough'
            WHERE id = ?
        `, orderID)

        return nil, fmt.Errorf("reserve stock failed: %w", err)
    }

    log.Info("order created with reservation",
        "orderID", orderID,
        "userID", userID,
        "productID", productID)

    // 3. 返回订单,等待用户支付

    return order, nil
}

// 支付成功回调
func (s *OrderService) HandlePaymentSuccess(
    ctx context.Context,
    orderID int64,
) error {
    // 1. 查询订单
    var order Order
    err := s.db.QueryRowContext(ctx, `
        SELECT id, product_id, quantity, status
        FROM orders
        WHERE id = ?
    `, orderID).Scan(&order.ID, &order.ProductID, &order.Quantity, &order.Status)

    if err != nil {
        return err
    }

    if order.Status != OrderStatusCreated {
        return errors.New("order status invalid")
    }

    // 2. 确认预占
    err = s.reservationService.ConfirmReservation(
        ctx,
        order.ProductID,
        orderID,
    )

    if err != nil {
        log.Error("confirm reservation failed",
            "orderID", orderID,
            "err", err)
        return err
    }

    // 3. 更新订单状态
    _, err = s.db.ExecContext(ctx, `
        UPDATE orders
        SET status = 'Paid',
            paid_at = NOW(),
            updated_at = NOW()
        WHERE id = ?
    `, orderID)

    if err != nil {
        return err
    }

    log.Info("order paid",
        "orderID", orderID)

    return nil
}

优点:

  • 用户体验好(下单即锁定库存)
  • 不会超卖
  • 防止恶意占用库存
  • 自动清理超时订单

缺点:

  • 实现复杂
  • 需要定时清理
  • Redis 和数据库同步复杂

2.2 难点二:分布式事务

问题场景

下单流程涉及多个服务:
1. 订单服务: 创建订单
2. 库存服务: 扣减库存
3. 账户服务: 扣减余额
4. 积分服务: 增加积分
5. 通知服务: 发送通知

问题:
- 第 3 步失败了,怎么办?
- 第 2 步成功,第 3 步失败,库存已扣减!
- 不能使用数据库事务(跨服务、跨数据库)
- 需要自己实现事务机制!

这是分布式系统最难的问题之一!

方案一:Saga 模式(补偿事务)

原理: 每个步骤都有对应的补偿操作,失败时逆序执行补偿

type SagaStep struct {
    Name       string
    Action     func(ctx context.Context, data interface{}) error
    Compensate func(ctx context.Context, data interface{}) error
}

type Saga struct {
    steps    []SagaStep
    executed []int  // 已执行的步骤索引
    data     interface{}
}

type OrderSagaData struct {
    OrderID    int64
    UserID     int64
    ProductID  int64
    Quantity   int
    Amount     decimal.Decimal
}

func NewOrderSaga() *Saga {
    return &Saga{
        steps: []SagaStep{
            {
                Name: "CreateOrder",
                Action: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    orderID, err := orderService.Create(ctx, d)
                    if err != nil {
                        return err
                    }
                    d.OrderID = orderID
                    return nil
                },
                Compensate: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    return orderService.Cancel(ctx, d.OrderID)
                },
            },
            {
                Name: "DeductStock",
                Action: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    return stockService.Deduct(ctx, d.ProductID, d.Quantity)
                },
                Compensate: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    return stockService.Restore(ctx, d.ProductID, d.Quantity)
                },
            },
            {
                Name: "DeductBalance",
                Action: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    return accountService.Deduct(ctx, d.UserID, d.Amount)
                },
                Compensate: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    return accountService.Refund(ctx, d.UserID, d.Amount)
                },
            },
            {
                Name: "AddPoints",
                Action: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    points := int(d.Amount.IntPart())  // 1 元 = 1 积分
                    return pointsService.Add(ctx, d.UserID, points)
                },
                Compensate: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    points := int(d.Amount.IntPart())
                    return pointsService.Deduct(ctx, d.UserID, points)
                },
            },
            {
                Name: "SendNotification",
                Action: func(ctx context.Context, data interface{}) error {
                    d := data.(*OrderSagaData)
                    return notificationService.Send(
                        ctx,
                        d.UserID,
                        "Order placed successfully",
                    )
                },
                Compensate: func(ctx context.Context, data interface{}) error {
                    // 通知无需补偿
                    return nil
                },
            },
        },
    }
}

func (s *Saga) Execute(ctx context.Context, data interface{}) error {
    s.data = data

    log.Info("saga started")

    // 1. 正向执行所有步骤
    for i, step := range s.steps {
        log.Info("executing step",
            "index", i,
            "name", step.Name)

        err := step.Action(ctx, data)
        if err != nil {
            log.Error("step failed",
                "index", i,
                "name", step.Name,
                "err", err)

            // 2. 失败,开始补偿(逆序)
            return s.compensate(ctx)
        }

        s.executed = append(s.executed, i)

        log.Info("step succeeded",
            "index", i,
            "name", step.Name)
    }

    log.Info("saga completed successfully")

    return nil
}

func (s *Saga) compensate(ctx context.Context) error {
    log.Warn("starting compensation",
        "executedSteps", len(s.executed))

    // 逆序执行补偿操作
    for i := len(s.executed) - 1; i >= 0; i-- {
        stepIndex := s.executed[i]
        step := s.steps[stepIndex]

        log.Info("compensating step",
            "index", stepIndex,
            "name", step.Name)

        err := step.Compensate(ctx, s.data)
        if err != nil {
            // 补偿失败,记录日志,人工介入
            log.Error("compensation failed",
                "index", stepIndex,
                "name", step.Name,
                "err", err)

            // 发送告警
            alertService.Send("Saga compensation failed", map[string]interface{}{
                "step":  step.Name,
                "error": err.Error(),
                "data":  s.data,
            })

            // 继续补偿其他步骤
        } else {
            log.Info("compensation succeeded",
                "index", stepIndex,
                "name", step.Name)
        }
    }

    return errors.New("saga failed and compensated")
}

// 使用示例
func PlaceOrder(
    ctx context.Context,
    userID int64,
    productID int64,
    quantity int,
) (*Order, error) {
    // 查询商品价格
    price, err := productService.GetPrice(ctx, productID)
    if err != nil {
        return nil, err
    }

    amount := price.Mul(decimal.NewFromInt(int64(quantity)))

    // 创建 Saga
    saga := NewOrderSaga()

    data := &OrderSagaData{
        UserID:    userID,
        ProductID: productID,
        Quantity:  quantity,
        Amount:    amount,
    }

    // 执行 Saga
    err = saga.Execute(ctx, data)
    if err != nil {
        return nil, err
    }

    // 查询订单
    order, err := orderService.Get(ctx, data.OrderID)
    if err != nil {
        return nil, err
    }

    return order, nil
}

关键点:

  1. 每个步骤都要有补偿操作:

    • CreateOrder -> CancelOrder
    • DeductStock -> RestoreStock
    • DeductBalance -> RefundBalance
  2. 补偿操作必须幂等:

    • 可能被调用多次
    • 每次调用结果相同
  3. 补偿失败需要人工介入:

    • 记录详细日志
    • 发送告警
    • 保留现场数据
  4. 最终一致性:

    • 不是立即一致
    • 通过补偿最终达到一致

方案二:TCC(Try-Confirm-Cancel)

原理: 三阶段提交

阶段 1: Try
- 预留资源(不真正执行)
- 冻结库存、冻结余额

阶段 2: Confirm
- 确认执行
- 真正扣减库存和余额

阶段 3: Cancel
- 取消
- 释放冻结的资源

接口定义:

type TCCService interface {
    Try(ctx context.Context, tx *Transaction) error
    Confirm(ctx context.Context, tx *Transaction) error
    Cancel(ctx context.Context, tx *Transaction) error
}

type Transaction struct {
    ID        string
    UserID    int64
    ProductID int64
    Quantity  int
    Amount    decimal.Decimal
    CreatedAt time.Time
}

库存服务的 TCC 实现:

type StockTCCService struct {
    db    *sql.DB
    redis *redis.Client
}

func (s *StockTCCService) Try(
    ctx context.Context,
    tx *Transaction,
) error {
    // 1. 冻结库存(不真正扣减)
    key := fmt.Sprintf("stock_frozen:%d:%s", tx.ProductID, tx.ID)

    // Lua 脚本保证原子性
    script := `
        local stockKey = 'stock:' .. KEYS[1]
        local frozenKey = KEYS[2]
        local quantity = tonumber(ARGV[1])

        local stock = redis.call('GET', stockKey)
        if not stock then
            return -1  -- 商品不存在
        end

        stock = tonumber(stock)

        if stock >= quantity then
            -- 库存足够,冻结
            redis.call('DECRBY', stockKey, quantity)
            redis.call('SETEX', frozenKey, 600, quantity)  -- 10 分钟过期
            return 1
        else
            return 0  -- 库存不足
        end
    `

    result, err := s.redis.Eval(
        ctx,
        script,
        []string{strconv.FormatInt(tx.ProductID, 10), key},
        tx.Quantity,
    ).Result()

    if err != nil {
        return err
    }

    code := result.(int64)

    switch code {
    case -1:
        return errors.New("product not found")
    case 0:
        return errors.New("stock not enough")
    case 1:
        log.Info("stock frozen",
            "txID", tx.ID,
            "productID", tx.ProductID,
            "quantity", tx.Quantity)
        return nil
    default:
        return errors.New("unknown error")
    }
}

func (s *StockTCCService) Confirm(
    ctx context.Context,
    tx *Transaction,
) error {
    // 2. 确认扣减(已在 Try 阶段完成,只需清理冻结记录)
    key := fmt.Sprintf("stock_frozen:%d:%s", tx.ProductID, tx.ID)

    // 删除冻结记录
    s.redis.Del(ctx, key)

    // 更新数据库
    _, err := s.db.ExecContext(ctx, `
        UPDATE products
        SET stock = stock - ?,
            sold = sold + ?,
            updated_at = NOW()
        WHERE id = ?
    `, tx.Quantity, tx.Quantity, tx.ProductID)

    if err != nil {
        log.Error("confirm stock failed",
            "txID", tx.ID,
            "err", err)
        return err
    }

    log.Info("stock confirmed",
        "txID", tx.ID,
        "productID", tx.ProductID)

    return nil
}

func (s *StockTCCService) Cancel(
    ctx context.Context,
    tx *Transaction,
) error {
    // 3. 取消(释放冻结的库存)
    stockKey := fmt.Sprintf("stock:%d", tx.ProductID)
    frozenKey := fmt.Sprintf("stock_frozen:%d:%s", tx.ProductID, tx.ID)

    script := `
        local stockKey = KEYS[1]
        local frozenKey = KEYS[2]

        local quantity = redis.call('GET', frozenKey)
        if quantity then
            -- 恢复库存
            redis.call('INCRBY', stockKey, quantity)
            redis.call('DEL', frozenKey)
            return tonumber(quantity)
        end
        return 0
    `

    result, err := s.redis.Eval(
        ctx,
        script,
        []string{stockKey, frozenKey},
    ).Result()

    if err != nil {
        return err
    }

    releasedQty := result.(int64)

    log.Info("stock canceled",
        "txID", tx.ID,
        "productID", tx.ProductID,
        "quantity", releasedQty)

    return nil
}

账户服务的 TCC 实现:

type AccountTCCService struct {
    db *sql.DB
}

func (s *AccountTCCService) Try(
    ctx context.Context,
    tx *Transaction,
) error {
    // 1. 冻结余额
    result, err := s.db.ExecContext(ctx, `
        UPDATE accounts
        SET balance = balance - ?,
            frozen = frozen + ?
        WHERE user_id = ?
          AND balance >= ?
    `, tx.Amount, tx.Amount, tx.UserID, tx.Amount)

    if err != nil {
        return err
    }

    affected, _ := result.RowsAffected()
    if affected == 0 {
        return errors.New("balance not enough")
    }

    log.Info("balance frozen",
        "txID", tx.ID,
        "userID", tx.UserID,
        "amount", tx.Amount)

    return nil
}

func (s *AccountTCCService) Confirm(
    ctx context.Context,
    tx *Transaction,
) error {
    // 2. 确认扣减(解冻)
    _, err := s.db.ExecContext(ctx, `
        UPDATE accounts
        SET frozen = frozen - ?
        WHERE user_id = ?
    `, tx.Amount, tx.UserID)

    if err != nil {
        log.Error("confirm balance failed",
            "txID", tx.ID,
            "err", err)
        return err
    }

    log.Info("balance confirmed",
        "txID", tx.ID,
        "userID", tx.UserID)

    return nil
}

func (s *AccountTCCService) Cancel(
    ctx context.Context,
    tx *Transaction,
) error {
    // 3. 取消(恢复余额)
    _, err := s.db.ExecContext(ctx, `
        UPDATE accounts
        SET balance = balance + ?,
            frozen = frozen - ?
        WHERE user_id = ?
    `, tx.Amount, tx.Amount, tx.UserID)

    if err != nil {
        log.Error("cancel balance failed",
            "txID", tx.ID,
            "err", err)
        return err
    }

    log.Info("balance canceled",
        "txID", tx.ID,
        "userID", tx.UserID)

    return nil
}

TCC 协调器:

type TCCCoordinator struct {
    services []TCCService
}

func (tc *TCCCoordinator) Execute(
    ctx context.Context,
    tx *Transaction,
) error {
    log.Info("TCC transaction started", "txID", tx.ID)

    // 阶段 1: Try
    for i, service := range tc.services {
        err := service.Try(ctx, tx)
        if err != nil {
            log.Error("try failed",
                "txID", tx.ID,
                "service", i,
                "err", err)

            // Try 失败,取消所有已执行的 Try
            tc.cancelAll(ctx, tx)
            return err
        }
    }

    log.Info("TCC try phase completed", "txID", tx.ID)

    // 阶段 2: Confirm
    for i, service := range tc.services {
        err := service.Confirm(ctx, tx)
        if err != nil {
            // Confirm 失败,记录日志,异步重试
            log.Error("confirm failed",
                "txID", tx.ID,
                "service", i,
                "err", err)

            // 异步重试(最多 3 次)
            go tc.retryConfirm(ctx, tx, service, 3)
        }
    }

    log.Info("TCC transaction completed", "txID", tx.ID)

    return nil
}

func (tc *TCCCoordinator) cancelAll(
    ctx context.Context,
    tx *Transaction,
) {
    log.Warn("canceling all TCC services", "txID", tx.ID)

    for i, service := range tc.services {
        err := service.Cancel(ctx, tx)
        if err != nil {
            log.Error("cancel failed",
                "txID", tx.ID,
                "service", i,
                "err", err)
        }
    }
}

func (tc *TCCCoordinator) retryConfirm(
    ctx context.Context,
    tx *Transaction,
    service TCCService,
    maxRetries int,
) {
    for attempt := 0; attempt < maxRetries; attempt++ {
        // 指数退避
        backoff := time.Duration(1<<uint(attempt)) * time.Second
        time.Sleep(backoff)

        err := service.Confirm(ctx, tx)
        if err == nil {
            log.Info("confirm retry succeeded",
                "txID", tx.ID,
                "attempt", attempt)
            return
        }

        log.Warn("confirm retry failed",
            "txID", tx.ID,
            "attempt", attempt,
            "err", err)
    }

    // 重试失败,人工介入
    log.Error("confirm retry exhausted", "txID", tx.ID)
    alertService.Send("TCC confirm failed", map[string]interface{}{
        "txID": tx.ID,
        "tx":   tx,
    })
}

使用示例:

func PlaceOrderWithTCC(
    ctx context.Context,
    userID int64,
    productID int64,
    quantity int,
) (*Order, error) {
    // 查询商品价格
    price, err := productService.GetPrice(ctx, productID)
    if err != nil {
        return nil, err
    }

    amount := price.Mul(decimal.NewFromInt(int64(quantity)))

    // 创建事务
    tx := &Transaction{
        ID:        uuid.New().String(),
        UserID:    userID,
        ProductID: productID,
        Quantity:  quantity,
        Amount:    amount,
        CreatedAt: time.Now(),
    }

    // 创建协调器
    coordinator := &TCCCoordinator{
        services: []TCCService{
            stockTCCService,
            accountTCCService,
            pointsTCCService,
        },
    }

    // 执行 TCC
    err = coordinator.Execute(ctx, tx)
    if err != nil {
        return nil, err
    }

    // 创建订单
    order := &Order{
        ID:          tx.ID,
        UserID:      userID,
        ProductID:   productID,
        Quantity:    quantity,
        TotalAmount: amount,
        Status:      OrderStatusPaid,
        CreatedAt:   time.Now(),
    }

    _, err = db.ExecContext(ctx, `
        INSERT INTO orders (
            id, user_id, product_id, quantity,
            total_amount, status, created_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?)
    `, order.ID, order.UserID, order.ProductID, order.Quantity,
       order.TotalAmount, order.Status, order.CreatedAt)

    if err != nil {
        return nil, err
    }

    return order, nil
}

TCC vs Saga 对比:

特性SagaTCC
隔离性弱(中间状态可见)强(Try 阶段锁资源)
性能好较差
复杂度低高
一致性最终一致性强一致性
实现难度中等困难
适用场景不要求强隔离要求强隔离

2.3 难点三:幂等性设计

问题场景

场景: 用户点击"提交订单"按钮
1. 网络抖动,用户以为没反应
2. 用户重复点击 3 次
3. 每次请求都到达服务器
4. 创建了 3 个订单!
5. 扣了 3 次钱!
6. 用户投诉!

或者:
1. 支付成功,回调通知
2. 网络超时,支付平台重试
3. 收到 2 次回调
4. 订单状态混乱
5. 可能重复发货

---幂等 Token

type IdempotentService struct {
    redis *redis.Client
    ttl   time.Duration
}

// 生成幂等 token(前端调用)
func (s *IdempotentService) GenerateToken(userID int64) (string, error) {
    token := fmt.Sprintf("%d_%s", userID, uuid.New().String())

    // 保存到 Redis(防止伪造)
    key := fmt.Sprintf("idempotent_token:%s", token)
    err := s.redis.Set(
        context.Background(),
        key,
        "unused",
        24*time.Hour,
    ).Err()

    return token, err
}

// 检查并消费 token
func (s *IdempotentService) ConsumeToken(
    ctx context.Context,
    token string,
) (bool, error) {
    key := fmt.Sprintf("idempotent_token:%s", token)

    // 使用 Lua 脚本保证原子性
    script := `
        local key = KEYS[1]
        local value = redis.call('GET', key)

        if not value then
            return -1  -- token 不存在或已过期
        end

        if value == 'unused' then
            -- token 未使用,标记为 processing
            redis.call('SET', key, 'processing', 'EX', 300)
            return 1  -- 成功
        elseif value == 'processing' then
            return 0  -- 正在处理中
        else
            return 2  -- 已使用,返回结果
        end
    `

    result, err := s.redis.Eval(ctx, script, []string{key}).Result()
    if err != nil {
        return false, err
    }

    code := result.(int64)

    switch code {
    case -1:
        return false, errors.New("token invalid or expired")
    case 0:
        return false, errors.New("request is being processed")
    case 1:
        return true, nil
    case 2:
        return false, nil  // 已处理,返回缓存结果
    default:
        return false, errors.New("unknown error")
    }
}

// 保存幂等结果
func (s *IdempotentService) SaveResult(
    ctx context.Context,
    token string,
    result interface{},
) error {
    key := fmt.Sprintf("idempotent_token:%s", token)

    data, err := json.Marshal(result)
    if err != nil {
        return err
    }

    // 保存结果(24 小时)
    return s.redis.Set(ctx, key, data, 24*time.Hour).Err()
}

// 获取幂等结果
func (s *IdempotentService) GetResult(
    ctx context.Context,
    token string,
) (interface{}, error) {
    key := fmt.Sprintf("idempotent_token:%s", token)

    data, err := s.redis.Get(ctx, key).Bytes()
    if err != nil {
        return nil, err
    }

    var result interface{}
    err = json.Unmarshal(data, &result)
    return result, err
}

在订单接口中使用:

type CreateOrderRequest struct {
    UserID          int64           `json:"user_id"`
    ProductID       int64           `json:"product_id"`
    Quantity        int             `json:"quantity"`
    IdempotentToken string          `json:"idempotent_token"`
}

func (s *OrderService) CreateOrderIdempotent(
    ctx context.Context,
    req *CreateOrderRequest,
) (*Order, error) {
    // 1. 检查幂等 token
    if req.IdempotentToken == "" {
        return nil, errors.New("idempotent token required")
    }

    canProcess, err := s.idempotentService.ConsumeToken(
        ctx,
        req.IdempotentToken,
    )

    if err != nil {
        return nil, err
    }

    if !canProcess {
        // Token 已使用,返回之前的结果
        result, err := s.idempotentService.GetResult(
            ctx,
            req.IdempotentToken,
        )

        if err != nil {
            return nil, err
        }

        order := result.(*Order)

        log.Info("returning cached order",
            "token", req.IdempotentToken,
            "orderID", order.ID)

        return order, nil
    }

    log.Info("processing new order request",
        "token", req.IdempotentToken)

    // 2. 创建订单
    order, err := s.createOrder(ctx, req)
    if err != nil {
        // 失败,删除 token(允许重试)
        key := fmt.Sprintf("idempotent_token:%s", req.IdempotentToken)
        s.redis.Del(ctx, key)

        return nil, err
    }

    // 3. 保存结果
    err = s.idempotentService.SaveResult(
        ctx,
        req.IdempotentToken,
        order,
    )

    if err != nil {
        log.Error("save idempotent result failed",
            "token", req.IdempotentToken,
            "err", err)
    }

    return order, nil
}

前端使用:

// 1. 获取幂等 token
const token = await fetch('/api/token/generate', {
    method: 'POST',
    body: JSON.stringify({ user_id: userId })
});

// 2. 提交订单(携带 token)
const order = await fetch('/api/orders', {
    method: 'POST',
    body: JSON.stringify({
        user_id: userId,
        product_id: productId,
        quantity: 1,
        idempotent_token: token
    })
});

// 即使用户点击多次,也只会创建一个订单!

2.4 难点四:订单状态机

完整的订单状态

type OrderStatus string

const (
    // 基础状态
    OrderStatusCreated   OrderStatus = "Created"    // 已创建,等待支付
    OrderStatusPaid      OrderStatus = "Paid"       // 已支付
    OrderStatusShipped   OrderStatus = "Shipped"    // 已发货
    OrderStatusDelivered OrderStatus = "Delivered"  // 已送达
    OrderStatusCompleted OrderStatus = "Completed"  // 已完成
    OrderStatusCanceled  OrderStatus = "Canceled"   // 已取消

    // 异常状态
    OrderStatusRefunding OrderStatus = "Refunding"  // 退款中
    OrderStatusRefunded  OrderStatus = "Refunded"   // 已退款
    OrderStatusDisputed  OrderStatus = "Disputed"   // 有纠纷

    // 中间状态
    OrderStatusPacking   OrderStatus = "Packing"    // 打包中
    OrderStatusShipping  OrderStatus = "Shipping"   // 配送中
)

状态转换规则

type OrderTransition struct {
    From   []OrderStatus
    To     OrderStatus
    Action func(*Order) error
}

var orderTransitions = []OrderTransition{
    {
        From:   []OrderStatus{OrderStatusCreated},
        To:     OrderStatusPaid,
        Action: onOrderPaid,
    },
    {
        From:   []OrderStatus{OrderStatusCreated},
        To:     OrderStatusCanceled,
        Action: onOrderCanceled,
    },
    {
        From:   []OrderStatus{OrderStatusPaid},
        To:     OrderStatusPacking,
        Action: onOrderPacking,
    },
    {
        From:   []OrderStatus{OrderStatusPacking},
        To:     OrderStatusShipped,
        Action: onOrderShipped,
    },
    {
        From:   []OrderStatus{OrderStatusShipped},
        To:     OrderStatusDelivered,
        Action: onOrderDelivered,
    },
    {
        From:   []OrderStatus{OrderStatusDelivered},
        To:     OrderStatusCompleted,
        Action: onOrderCompleted,
    },
    {
        From:   []OrderStatus{OrderStatusPaid, OrderStatusShipped},
        To:     OrderStatusRefunding,
        Action: onOrderRefunding,
    },
    {
        From:   []OrderStatus{OrderStatusRefunding},
        To:     OrderStatusRefunded,
        Action: onOrderRefunded,
    },
}

func ApplyOrderTransition(
    ctx context.Context,
    db *sql.DB,
    orderID int64,
    to OrderStatus,
) error {
    // 1. 查询订单
    var order Order
    var version int64

    err := db.QueryRowContext(ctx, `
        SELECT id, user_id, status, version
        FROM orders
        WHERE id = ?
    `, orderID).Scan(&order.ID, &order.UserID, &order.Status, &version)

    if err != nil {
        return err
    }

    // 2. 查找匹配的转换
    var found *OrderTransition
    for i := range orderTransitions {
        t := &orderTransitions[i]
        if t.To != to {
            continue
        }
        for _, from := range t.From {
            if from == order.Status {
                found = t
                break
            }
        }
        if found != nil {
            break
        }
    }

    if found == nil {
        return fmt.Errorf(
            "invalid transition %s -> %s",
            order.Status,
            to,
        )
    }

    log.Info("applying order transition",
        "orderID", orderID,
        "from", order.Status,
        "to", to)

    // 3. 执行副作用
    if err := found.Action(&order); err != nil {
        return fmt.Errorf("action failed: %w", err)
    }

    // 4. 更新状态(CAS)
    result, err := db.ExecContext(ctx, `
        UPDATE orders
        SET status = ?,
            version = version + 1,
            updated_at = NOW()
        WHERE id = ?
          AND version = ?
    `, to, orderID, version)

    if err != nil {
        return err
    }

    affected, _ := result.RowsAffected()
    if affected == 0 {
        return errors.New("order version conflict")
    }

    log.Info("order transition completed",
        "orderID", orderID,
        "newStatus", to)

    return nil
}

副作用实现

func onOrderPaid(order *Order) error {
    log.Info("order paid", "orderID", order.ID)

    // 1. 发送通知
    notificationService.Send(
        order.UserID,
        "Payment successful",
    )

    // 2. 触发物流打包
    warehouseService.CreatePackingTask(order.ID)

    return nil
}

func onOrderShipped(order *Order) error {
    log.Info("order shipped", "orderID", order.ID)

    // 1. 发送物流通知
    notificationService.Send(
        order.UserID,
        fmt.Sprintf("Order shipped, tracking: %s", order.TrackingNumber),
    )

    // 2. 启动自动确认收货定时器(7 天后)
    deliveryService.ScheduleAutoConfirm(order.ID, 7*24*time.Hour)

    return nil
}

func onOrderCompleted(order *Order) error {
    log.Info("order completed", "orderID", order.ID)

    // 1. 增加积分
    points := int(order.TotalAmount.IntPart())
    pointsService.Add(order.UserID, points)

    // 2. 发送评价邀请
    reviewService.SendReviewInvitation(order.ID)

    return nil
}

func onOrderRefunded(order *Order) error {
    log.Info("order refunded", "orderID", order.ID)

    // 1. 退款到账户
    accountService.Refund(order.UserID, order.TotalAmount)

    // 2. 恢复库存
    stockService.Restore(order.ProductID, order.Quantity)

    // 3. 扣除积分
    points := int(order.TotalAmount.IntPart())
    pointsService.Deduct(order.UserID, points)

    return nil
}

三、数据库表设计

3.1 订单表

CREATE TABLE orders (
    id              BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_no        VARCHAR(32) UNIQUE NOT NULL,
    user_id         BIGINT NOT NULL,

    -- 商品信息
    product_id      BIGINT NOT NULL,
    product_name    VARCHAR(256) NOT NULL,
    product_price   DECIMAL(10,2) NOT NULL,
    quantity        INT NOT NULL,
    total_amount    DECIMAL(10,2) NOT NULL,

    -- 状态
    status          VARCHAR(32) NOT NULL,
    version         BIGINT NOT NULL DEFAULT 0,

    -- 支付信息
    payment_method  VARCHAR(32),
    paid_at         TIMESTAMP,
    payment_no      VARCHAR(64),

    -- 物流信息
    shipping_address VARCHAR(512),
    shipping_method  VARCHAR(32),
    tracking_number  VARCHAR(64),
    shipped_at       TIMESTAMP,
    delivered_at     TIMESTAMP,

    -- 退款信息
    refund_reason    TEXT,
    refund_amount    DECIMAL(10,2),
    refunded_at      TIMESTAMP,

    -- 取消信息
    cancel_reason    TEXT,
    canceled_at      TIMESTAMP,

    -- 时间
    created_at       TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at       TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    completed_at     TIMESTAMP,

    INDEX idx_user_status (user_id, status),
    INDEX idx_status_created (status, created_at),
    INDEX idx_order_no (order_no)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3.2 订单详情表(一对多)

CREATE TABLE order_items (
    id              BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id        BIGINT NOT NULL,
    product_id      BIGINT NOT NULL,
    product_name    VARCHAR(256) NOT NULL,
    product_price   DECIMAL(10,2) NOT NULL,
    quantity        INT NOT NULL,
    subtotal        DECIMAL(10,2) NOT NULL,

    created_at      TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_order (order_id),
    INDEX idx_product (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3.3 订单日志表

CREATE TABLE order_logs (
    id              BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id        BIGINT NOT NULL,
    old_status      VARCHAR(32),
    new_status      VARCHAR(32) NOT NULL,
    operator        VARCHAR(128),
    remark          TEXT,
    created_at      TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_order (order_id),
    INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

四、总结

4.1 为什么交易系统排第四?

比普通业务系统难得多:

  1. 钱的事不能错: 一分钱都不能多也不能少
  2. 并发冲突复杂: 超卖、重复扣款、状态混乱
  3. 分布式事务: Saga、TCC 实现复杂
  4. 幂等性要求高: 防止重复提交

但比前三者简单:

  1. 状态相对少: 订单就十几个状态
  2. 不需要资源调度: 不像调度器那么复杂
  3. 单机为主: 不像一致性协议需要多数派
  4. 同步为主: 不像异步系统需要处理背压

4.2 关键技术点

库存扣减:

  • 悲观锁(简单但慢)
  • 乐观锁(快但需要重试)
  • Redis 原子扣减(最快)
  • 预占库存(体验最好)

分布式事务:

  • Saga(最终一致性)
  • TCC(强一致性)
  • 补偿机制

幂等性:

  • 幂等 Token
  • 状态机校验
  • 去重表

状态机:

  • 表驱动
  • CAS 更新
  • 副作用分离

4.3 学习路径

  1. 理解基础: 数据库事务、ACID
  2. 学习模式: Saga、TCC、幂等性
  3. 看源码: 电商开源项目
  4. 压力测试: 模拟高并发场景
  5. 生产使用: 在真实项目中应用

交易系统是电商的核心,值得深入学习!

本系列文章

➤ [NO.1 调度器]

➤ [NO.2 一致性协议](Paxos / Raft)

➤ [NO.3 高性能异步系统](消息队列、回调、重试)

➤ NO.4 交易系统(钱的事不能错)

➤ [NO.5 普通业务系统](绝大多数人做的)