【编程难度第四名】复杂交易系统 - 钱的事,一分都不能错
本系列文章
➤ [NO.1 调度器]
➤ [NO.2 一致性协议](Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ NO.4 交易系统(钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)
一、为什么交易系统如此困难?
1.1 核心挑战
代表系统:
- 淘宝订单系统: 日均千万级订单,双 11 峰值 58 万笔/秒
- 支付宝支付系统: 全球最大的支付平台,零容错
- 美团外卖订单: 实时库存、骑手调度、超时取消
- 券商交易系统: 毫秒级撮合,强一致性要求
- 银行核心系统: 账务处理,必须保证强一致性
为什么难:
- 钱的事不能错: 一分钱都不能多也不能少
- 高并发冲突: 两个人同时买最后一件商品
- 分布式事务: 订单、库存、账户、积分在不同系统
- 状态机复杂: 订单有十几种状态和转换
- 补偿机制: 失败了要能回滚,但不能用数据库事务
与普通系统的差异:
普通系统:
- 丢一条评论: 用户可能不在意
- 推荐不准: 用户可以刷新重试
- 数据延迟: 几秒钟无所谓
交易系统:
- 扣错钱: 用户投诉,公司赔钱
- 超卖: 商家亏损,用户投诉
- 订单丢失: 用户下单了但没记录
- 重复扣款: 用户支付了两次
每一个 Bug 都可能是几百万的损失!
二、核心难点深度解析
2.1 难点一:库存扣减的并发问题
经典超卖场景
商品库存: 1 件
用户 A: 下单(同时)
用户 B: 下单(同时)
时序图:
时刻 T1: A 查询库存 = 1
时刻 T2: B 查询库存 = 1 (问题!)
时刻 T3: A 扣减库存,剩余 = 0
时刻 T4: B 扣减库存,剩余 = -1 ✗ (超卖!)
时刻 T5: A 和 B 都下单成功
时刻 T6: 但是只有 1 件商品!
时刻 T7: 商家亏损,用户投诉
这是电商系统最常见的 Bug!
方案一:悲观锁(SELECT FOR UPDATE)
原理: 先锁住库存行,再扣减
-- 开启事务
BEGIN;
-- 锁定库存行(其他事务必须等待)
SELECT stock, version
FROM products
WHERE id = 123
FOR UPDATE;
-- 检查库存
IF stock >= 1 THEN
-- 扣减库存
UPDATE products
SET stock = stock - 1,
updated_at = NOW()
WHERE id = 123;
-- 创建订单
INSERT INTO orders (
user_id,
product_id,
quantity,
total_amount,
status
) VALUES (
1001,
123,
1,
99.99,
'Created'
);
COMMIT;
ELSE
ROLLBACK;
RETURN 'Stock not enough';
END IF;
Go 代码实现:
type OrderService struct {
db *sql.DB
}
func (s *OrderService) CreateOrderWithLock(
ctx context.Context,
userID int64,
productID int64,
quantity int,
) (*Order, error) {
// 开启事务
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback() // 失败自动回滚
// 锁定库存行
var stock int
var price decimal.Decimal
err = tx.QueryRowContext(ctx, `
SELECT stock, price
FROM products
WHERE id = ?
FOR UPDATE
`, productID).Scan(&stock, &price)
if err != nil {
return nil, fmt.Errorf("query product failed: %w", err)
}
log.Info("locked product",
"productID", productID,
"stock", stock)
// 检查库存
if stock < quantity {
return nil, errors.New("stock not enough")
}
// 扣减库存
result, err := tx.ExecContext(ctx, `
UPDATE products
SET stock = stock - ?,
updated_at = NOW()
WHERE id = ?
`, quantity, productID)
if err != nil {
return nil, fmt.Errorf("deduct stock failed: %w", err)
}
affected, _ := result.RowsAffected()
log.Info("stock deducted",
"productID", productID,
"quantity", quantity,
"affected", affected)
// 创建订单
totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))
order := &Order{
UserID: userID,
ProductID: productID,
Quantity: quantity,
TotalAmount: totalAmount,
Status: OrderStatusCreated,
CreatedAt: time.Now(),
}
result, err = tx.ExecContext(ctx, `
INSERT INTO orders (
user_id, product_id, quantity,
total_amount, status, created_at
) VALUES (?, ?, ?, ?, ?, ?)
`, order.UserID, order.ProductID, order.Quantity,
order.TotalAmount, order.Status, order.CreatedAt)
if err != nil {
return nil, fmt.Errorf("create order failed: %w", err)
}
orderID, _ := result.LastInsertId()
order.ID = orderID
// 提交事务
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("commit failed: %w", err)
}
log.Info("order created",
"orderID", orderID,
"userID", userID,
"productID", productID)
return order, nil
}
优点:
- 简单直接
- 不会超卖
- 数据库保证一致性
缺点:
- 性能差,高并发时大量请求等待锁
- 可能死锁(如果有多个商品)
- 锁范围大,影响其他操作
性能测试:
并发: 1000 个请求同时购买同一商品
结果:
- 吞吐量: 500 QPS
- 平均延迟: 2 秒
- P99 延迟: 5 秒
- 大量请求超时
不适合高并发场景!
方案二:乐观锁(CAS - Compare And Swap)
原理: 使用版本号,更新时检查版本是否变化
-- 1. 查询库存和版本号
SELECT stock, version, price
FROM products
WHERE id = 123;
-- 结果: stock=10, version=5, price=99.99
-- 2. 尝试扣减(CAS)
UPDATE products
SET stock = stock - 1,
version = version + 1,
updated_at = NOW()
WHERE id = 123
AND version = 5 -- 关键! 版本号必须匹配
AND stock >= 1; -- 库存必须足够
-- 3. 检查影响行数
affected_rows = ?
IF affected_rows = 0 THEN
-- 失败: 版本号变了(被别人抢先)或库存不足
-- 重试或返回失败
RETRY;
ELSE
-- 成功: 创建订单
INSERT INTO orders (...) VALUES (...);
END IF;
Go 代码实现:
func (s *OrderService) CreateOrderWithOptimisticLock(
ctx context.Context,
userID int64,
productID int64,
quantity int,
) (*Order, error) {
// 最多重试 3 次
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
// 1. 查询商品信息
var stock int
var version int64
var price decimal.Decimal
err := s.db.QueryRowContext(ctx, `
SELECT stock, version, price
FROM products
WHERE id = ?
`, productID).Scan(&stock, &version, &price)
if err != nil {
return nil, fmt.Errorf("query product failed: %w", err)
}
log.Debug("read product",
"productID", productID,
"stock", stock,
"version", version,
"attempt", attempt)
// 2. 检查库存
if stock < quantity {
return nil, errors.New("stock not enough")
}
// 3. 尝试扣减库存(CAS)
result, err := s.db.ExecContext(ctx, `
UPDATE products
SET stock = stock - ?,
version = version + 1,
updated_at = NOW()
WHERE id = ?
AND version = ?
AND stock >= ?
`, quantity, productID, version, quantity)
if err != nil {
return nil, fmt.Errorf("deduct stock failed: %w", err)
}
affected, _ := result.RowsAffected()
if affected == 0 {
// CAS 失败,重试
log.Warn("CAS conflict, retrying",
"productID", productID,
"attempt", attempt)
// 指数退避
backoff := time.Duration(1<<uint(attempt)) * 10 * time.Millisecond
time.Sleep(backoff)
continue
}
log.Info("stock deducted",
"productID", productID,
"quantity", quantity,
"newVersion", version+1)
// 4. 创建订单
totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))
order := &Order{
UserID: userID,
ProductID: productID,
Quantity: quantity,
TotalAmount: totalAmount,
Status: OrderStatusCreated,
CreatedAt: time.Now(),
}
result, err = s.db.ExecContext(ctx, `
INSERT INTO orders (
user_id, product_id, quantity,
total_amount, status, created_at
) VALUES (?, ?, ?, ?, ?, ?)
`, order.UserID, order.ProductID, order.Quantity,
order.TotalAmount, order.Status, order.CreatedAt)
if err != nil {
// 订单创建失败,但库存已扣减!
// 需要补偿机制(稍后讲解)
log.Error("create order failed, stock already deducted",
"productID", productID,
"err", err)
return nil, err
}
orderID, _ := result.LastInsertId()
order.ID = orderID
log.Info("order created",
"orderID", orderID,
"userID", userID,
"attempt", attempt)
return order, nil
}
// 重试次数用完
return nil, errors.New("create order failed after retries")
}
优点:
- 性能好,无锁
- 不会超卖
- 支持高并发
缺点:
- 高并发时重试次数多
- 实现稍复杂
- 需要处理重试逻辑
性能测试:
并发: 1000 个请求同时购买同一商品
库存: 100 件
结果:
- 吞吐量: 5000 QPS
- 平均延迟: 50ms
- P99 延迟: 200ms
- 成功 100 个,失败 900 个
- 平均重试次数: 1.5 次
适合高并发场景!
方案三:Redis 原子扣减(终极方案)
原理: 利用 Redis 的单线程特性和 Lua 脚本保证原子性
type RedisStockService struct {
redis *redis.Client
db *sql.DB
}
func (s *RedisStockService) DeductStock(
ctx context.Context,
productID int64,
quantity int,
) (bool, error) {
key := fmt.Sprintf("stock:%d", productID)
// Lua 脚本保证原子性
script := `
local stock = redis.call('GET', KEYS[1])
if not stock then
return -1 -- 商品不存在
end
stock = tonumber(stock)
local quantity = tonumber(ARGV[1])
if stock >= quantity then
-- 库存足够,扣减
local remaining = redis.call('DECRBY', KEYS[1], quantity)
return remaining -- 返回剩余库存
else
return -2 -- 库存不足
end
`
result, err := s.redis.Eval(ctx, script, []string{key}, quantity).Result()
if err != nil {
return false, fmt.Errorf("redis eval failed: %w", err)
}
remaining := result.(int64)
switch {
case remaining == -1:
return false, errors.New("product not found")
case remaining == -2:
return false, errors.New("stock not enough")
case remaining >= 0:
log.Info("stock deducted",
"productID", productID,
"quantity", quantity,
"remaining", remaining)
// 异步更新数据库
go s.updateDatabaseStock(productID, quantity)
return true, nil
default:
return false, errors.New("unknown error")
}
}
func (s *RedisStockService) updateDatabaseStock(productID int64, quantity int) {
ctx := context.Background()
_, err := s.db.ExecContext(ctx, `
UPDATE products
SET stock = stock - ?,
updated_at = NOW()
WHERE id = ?
`, quantity, productID)
if err != nil {
log.Error("update database stock failed",
"productID", productID,
"err", err)
// 补偿: 恢复 Redis 库存
s.redis.IncrBy(ctx,
fmt.Sprintf("stock:%d", productID),
int64(quantity))
}
}
// 初始化 Redis 库存(从数据库同步)
func (s *RedisStockService) SyncStockToRedis(productID int64) error {
var stock int
err := s.db.QueryRow(`
SELECT stock FROM products WHERE id = ?
`, productID).Scan(&stock)
if err != nil {
return err
}
key := fmt.Sprintf("stock:%d", productID)
return s.redis.Set(context.Background(), key, stock, 24*time.Hour).Err()
}
完整的下单流程:
func (s *OrderService) CreateOrderWithRedis(
ctx context.Context,
userID int64,
productID int64,
quantity int,
) (*Order, error) {
// 1. Redis 扣减库存(极快)
success, err := s.stockService.DeductStock(ctx, productID, quantity)
if err != nil || !success {
return nil, fmt.Errorf("deduct stock failed: %w", err)
}
log.Info("redis stock deducted",
"productID", productID,
"quantity", quantity)
// 2. 查询商品价格
var price decimal.Decimal
err = s.db.QueryRowContext(ctx, `
SELECT price FROM products WHERE id = ?
`, productID).Scan(&price)
if err != nil {
// 扣减失败,恢复库存
s.stockService.RestoreStock(ctx, productID, quantity)
return nil, err
}
// 3. 创建订单
totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))
order := &Order{
UserID: userID,
ProductID: productID,
Quantity: quantity,
TotalAmount: totalAmount,
Status: OrderStatusCreated,
CreatedAt: time.Now(),
}
result, err := s.db.ExecContext(ctx, `
INSERT INTO orders (
user_id, product_id, quantity,
total_amount, status, created_at
) VALUES (?, ?, ?, ?, ?, ?)
`, order.UserID, order.ProductID, order.Quantity,
order.TotalAmount, order.Status, order.CreatedAt)
if err != nil {
// 创建订单失败,恢复库存
log.Error("create order failed, restoring stock",
"productID", productID,
"err", err)
s.stockService.RestoreStock(ctx, productID, quantity)
return nil, err
}
orderID, _ := result.LastInsertId()
order.ID = orderID
log.Info("order created",
"orderID", orderID,
"userID", userID)
return order, nil
}
func (s *RedisStockService) RestoreStock(
ctx context.Context,
productID int64,
quantity int,
) error {
key := fmt.Sprintf("stock:%d", productID)
return s.redis.IncrBy(ctx, key, int64(quantity)).Err()
}
优点:
- 极高性能,支持 10 万+ QPS
- 不会超卖
- 无锁,无重试
- 延迟极低(<1ms)
缺点:
- Redis 和数据库需要保持一致
- Redis 挂了需要降级
- 需要处理数据同步
性能测试:
并发: 10000 个请求同时购买
库存: 1000 件
结果:
- 吞吐量: 100000 QPS
- 平均延迟: 1ms
- P99 延迟: 5ms
- 成功 1000 个,失败 9000 个
- 无重试
Redis 方案是性能之王!
方案四:预占库存(用户体验最佳)
场景: 用户下单后有 15 分钟支付时间
问题:
- 如果立即扣减库存,用户不支付,库存被占用
- 如果不扣减库存,可能超卖
解决方案: 预占库存
type ReservationService struct {
redis *redis.Client
db *sql.DB
}
// 预占库存
func (s *ReservationService) ReserveStock(
ctx context.Context,
productID int64,
quantity int,
orderID int64,
ttl time.Duration,
) (bool, error) {
stockKey := fmt.Sprintf("stock:%d", productID)
reservedKey := fmt.Sprintf("stock_reserved:%d:%d", productID, orderID)
// Lua 脚本保证原子性
script := `
local stockKey = KEYS[1]
local reservedKey = KEYS[2]
local quantity = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])
local stock = redis.call('GET', stockKey)
if not stock then
return -1 -- 商品不存在
end
stock = tonumber(stock)
if stock >= quantity then
-- 库存足够,扣减并记录预占
redis.call('DECRBY', stockKey, quantity)
redis.call('SETEX', reservedKey, ttl, quantity)
return 1 -- 成功
else
return 0 -- 库存不足
end
`
result, err := s.redis.Eval(
ctx,
script,
[]string{stockKey, reservedKey},
quantity,
int(ttl.Seconds()),
).Result()
if err != nil {
return false, err
}
code := result.(int64)
switch code {
case -1:
return false, errors.New("product not found")
case 0:
return false, errors.New("stock not enough")
case 1:
log.Info("stock reserved",
"productID", productID,
"orderID", orderID,
"quantity", quantity,
"ttl", ttl)
return true, nil
default:
return false, errors.New("unknown error")
}
}
// 确认预占(用户支付成功)
func (s *ReservationService) ConfirmReservation(
ctx context.Context,
productID int64,
orderID int64,
) error {
reservedKey := fmt.Sprintf("stock_reserved:%d:%d", productID, orderID)
// 获取预占数量
quantity, err := s.redis.Get(ctx, reservedKey).Int()
if err == redis.Nil {
return errors.New("reservation not found or expired")
}
if err != nil {
return err
}
// 删除预占记录(已确认)
s.redis.Del(ctx, reservedKey)
// 更新数据库
_, err = s.db.ExecContext(ctx, `
UPDATE products
SET stock = stock - ?,
sold = sold + ?,
updated_at = NOW()
WHERE id = ?
`, quantity, quantity, productID)
if err != nil {
log.Error("confirm reservation failed",
"productID", productID,
"orderID", orderID,
"err", err)
return err
}
log.Info("reservation confirmed",
"productID", productID,
"orderID", orderID,
"quantity", quantity)
return nil
}
// 释放预占(订单取消或超时)
func (s *ReservationService) ReleaseReservation(
ctx context.Context,
productID int64,
orderID int64,
) error {
stockKey := fmt.Sprintf("stock:%d", productID)
reservedKey := fmt.Sprintf("stock_reserved:%d:%d", productID, orderID)
// Lua 脚本保证原子性
script := `
local stockKey = KEYS[1]
local reservedKey = KEYS[2]
local quantity = redis.call('GET', reservedKey)
if quantity then
-- 恢复库存
redis.call('INCRBY', stockKey, quantity)
redis.call('DEL', reservedKey)
return tonumber(quantity)
end
return 0
`
result, err := s.redis.Eval(
ctx,
script,
[]string{stockKey, reservedKey},
).Result()
if err != nil {
return err
}
releasedQty := result.(int64)
log.Info("reservation released",
"productID", productID,
"orderID", orderID,
"quantity", releasedQty)
return nil
}
// 定时清理过期预占
func (s *ReservationService) StartCleanupWorker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.cleanupExpiredReservations(ctx)
case <-ctx.Done():
return
}
}
}
func (s *ReservationService) cleanupExpiredReservations(ctx context.Context) {
// 查找所有超时未支付的订单
rows, err := s.db.QueryContext(ctx, `
SELECT id, product_id, quantity
FROM orders
WHERE status = 'Created'
AND created_at < NOW() - INTERVAL 15 MINUTE
LIMIT 100
`)
if err != nil {
log.Error("query expired orders failed", "err", err)
return
}
defer rows.Close()
for rows.Next() {
var orderID, productID int64
var quantity int
if err := rows.Scan(&orderID, &productID, &quantity); err != nil {
continue
}
// 释放预占库存
s.ReleaseReservation(ctx, productID, orderID)
// 更新订单状态为已取消
_, err := s.db.ExecContext(ctx, `
UPDATE orders
SET status = 'Canceled',
cancel_reason = 'Payment timeout',
updated_at = NOW()
WHERE id = ?
`, orderID)
if err != nil {
log.Error("cancel order failed",
"orderID", orderID,
"err", err)
} else {
log.Info("order auto canceled",
"orderID", orderID,
"productID", productID)
}
}
}
完整的下单流程:
func (s *OrderService) CreateOrderWithReservation(
ctx context.Context,
userID int64,
productID int64,
quantity int,
) (*Order, error) {
// 1. 创建订单(状态: Created)
var price decimal.Decimal
err := s.db.QueryRowContext(ctx, `
SELECT price FROM products WHERE id = ?
`, productID).Scan(&price)
if err != nil {
return nil, err
}
totalAmount := price.Mul(decimal.NewFromInt(int64(quantity)))
order := &Order{
UserID: userID,
ProductID: productID,
Quantity: quantity,
TotalAmount: totalAmount,
Status: OrderStatusCreated,
CreatedAt: time.Now(),
}
result, err := s.db.ExecContext(ctx, `
INSERT INTO orders (
user_id, product_id, quantity,
total_amount, status, created_at
) VALUES (?, ?, ?, ?, ?, ?)
`, order.UserID, order.ProductID, order.Quantity,
order.TotalAmount, order.Status, order.CreatedAt)
if err != nil {
return nil, err
}
orderID, _ := result.LastInsertId()
order.ID = orderID
// 2. 预占库存(15 分钟)
success, err := s.reservationService.ReserveStock(
ctx,
productID,
quantity,
orderID,
15*time.Minute,
)
if err != nil || !success {
// 预占失败,取消订单
s.db.ExecContext(ctx, `
UPDATE orders
SET status = 'Canceled',
cancel_reason = 'Stock not enough'
WHERE id = ?
`, orderID)
return nil, fmt.Errorf("reserve stock failed: %w", err)
}
log.Info("order created with reservation",
"orderID", orderID,
"userID", userID,
"productID", productID)
// 3. 返回订单,等待用户支付
return order, nil
}
// 支付成功回调
func (s *OrderService) HandlePaymentSuccess(
ctx context.Context,
orderID int64,
) error {
// 1. 查询订单
var order Order
err := s.db.QueryRowContext(ctx, `
SELECT id, product_id, quantity, status
FROM orders
WHERE id = ?
`, orderID).Scan(&order.ID, &order.ProductID, &order.Quantity, &order.Status)
if err != nil {
return err
}
if order.Status != OrderStatusCreated {
return errors.New("order status invalid")
}
// 2. 确认预占
err = s.reservationService.ConfirmReservation(
ctx,
order.ProductID,
orderID,
)
if err != nil {
log.Error("confirm reservation failed",
"orderID", orderID,
"err", err)
return err
}
// 3. 更新订单状态
_, err = s.db.ExecContext(ctx, `
UPDATE orders
SET status = 'Paid',
paid_at = NOW(),
updated_at = NOW()
WHERE id = ?
`, orderID)
if err != nil {
return err
}
log.Info("order paid",
"orderID", orderID)
return nil
}
优点:
- 用户体验好(下单即锁定库存)
- 不会超卖
- 防止恶意占用库存
- 自动清理超时订单
缺点:
- 实现复杂
- 需要定时清理
- Redis 和数据库同步复杂
2.2 难点二:分布式事务
问题场景
下单流程涉及多个服务:
1. 订单服务: 创建订单
2. 库存服务: 扣减库存
3. 账户服务: 扣减余额
4. 积分服务: 增加积分
5. 通知服务: 发送通知
问题:
- 第 3 步失败了,怎么办?
- 第 2 步成功,第 3 步失败,库存已扣减!
- 不能使用数据库事务(跨服务、跨数据库)
- 需要自己实现事务机制!
这是分布式系统最难的问题之一!
方案一:Saga 模式(补偿事务)
原理: 每个步骤都有对应的补偿操作,失败时逆序执行补偿
type SagaStep struct {
Name string
Action func(ctx context.Context, data interface{}) error
Compensate func(ctx context.Context, data interface{}) error
}
type Saga struct {
steps []SagaStep
executed []int // 已执行的步骤索引
data interface{}
}
type OrderSagaData struct {
OrderID int64
UserID int64
ProductID int64
Quantity int
Amount decimal.Decimal
}
func NewOrderSaga() *Saga {
return &Saga{
steps: []SagaStep{
{
Name: "CreateOrder",
Action: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
orderID, err := orderService.Create(ctx, d)
if err != nil {
return err
}
d.OrderID = orderID
return nil
},
Compensate: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
return orderService.Cancel(ctx, d.OrderID)
},
},
{
Name: "DeductStock",
Action: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
return stockService.Deduct(ctx, d.ProductID, d.Quantity)
},
Compensate: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
return stockService.Restore(ctx, d.ProductID, d.Quantity)
},
},
{
Name: "DeductBalance",
Action: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
return accountService.Deduct(ctx, d.UserID, d.Amount)
},
Compensate: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
return accountService.Refund(ctx, d.UserID, d.Amount)
},
},
{
Name: "AddPoints",
Action: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
points := int(d.Amount.IntPart()) // 1 元 = 1 积分
return pointsService.Add(ctx, d.UserID, points)
},
Compensate: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
points := int(d.Amount.IntPart())
return pointsService.Deduct(ctx, d.UserID, points)
},
},
{
Name: "SendNotification",
Action: func(ctx context.Context, data interface{}) error {
d := data.(*OrderSagaData)
return notificationService.Send(
ctx,
d.UserID,
"Order placed successfully",
)
},
Compensate: func(ctx context.Context, data interface{}) error {
// 通知无需补偿
return nil
},
},
},
}
}
func (s *Saga) Execute(ctx context.Context, data interface{}) error {
s.data = data
log.Info("saga started")
// 1. 正向执行所有步骤
for i, step := range s.steps {
log.Info("executing step",
"index", i,
"name", step.Name)
err := step.Action(ctx, data)
if err != nil {
log.Error("step failed",
"index", i,
"name", step.Name,
"err", err)
// 2. 失败,开始补偿(逆序)
return s.compensate(ctx)
}
s.executed = append(s.executed, i)
log.Info("step succeeded",
"index", i,
"name", step.Name)
}
log.Info("saga completed successfully")
return nil
}
func (s *Saga) compensate(ctx context.Context) error {
log.Warn("starting compensation",
"executedSteps", len(s.executed))
// 逆序执行补偿操作
for i := len(s.executed) - 1; i >= 0; i-- {
stepIndex := s.executed[i]
step := s.steps[stepIndex]
log.Info("compensating step",
"index", stepIndex,
"name", step.Name)
err := step.Compensate(ctx, s.data)
if err != nil {
// 补偿失败,记录日志,人工介入
log.Error("compensation failed",
"index", stepIndex,
"name", step.Name,
"err", err)
// 发送告警
alertService.Send("Saga compensation failed", map[string]interface{}{
"step": step.Name,
"error": err.Error(),
"data": s.data,
})
// 继续补偿其他步骤
} else {
log.Info("compensation succeeded",
"index", stepIndex,
"name", step.Name)
}
}
return errors.New("saga failed and compensated")
}
// 使用示例
func PlaceOrder(
ctx context.Context,
userID int64,
productID int64,
quantity int,
) (*Order, error) {
// 查询商品价格
price, err := productService.GetPrice(ctx, productID)
if err != nil {
return nil, err
}
amount := price.Mul(decimal.NewFromInt(int64(quantity)))
// 创建 Saga
saga := NewOrderSaga()
data := &OrderSagaData{
UserID: userID,
ProductID: productID,
Quantity: quantity,
Amount: amount,
}
// 执行 Saga
err = saga.Execute(ctx, data)
if err != nil {
return nil, err
}
// 查询订单
order, err := orderService.Get(ctx, data.OrderID)
if err != nil {
return nil, err
}
return order, nil
}
关键点:
每个步骤都要有补偿操作:
- CreateOrder -> CancelOrder
- DeductStock -> RestoreStock
- DeductBalance -> RefundBalance
补偿操作必须幂等:
- 可能被调用多次
- 每次调用结果相同
补偿失败需要人工介入:
- 记录详细日志
- 发送告警
- 保留现场数据
最终一致性:
- 不是立即一致
- 通过补偿最终达到一致
方案二:TCC(Try-Confirm-Cancel)
原理: 三阶段提交
阶段 1: Try
- 预留资源(不真正执行)
- 冻结库存、冻结余额
阶段 2: Confirm
- 确认执行
- 真正扣减库存和余额
阶段 3: Cancel
- 取消
- 释放冻结的资源
接口定义:
type TCCService interface {
Try(ctx context.Context, tx *Transaction) error
Confirm(ctx context.Context, tx *Transaction) error
Cancel(ctx context.Context, tx *Transaction) error
}
type Transaction struct {
ID string
UserID int64
ProductID int64
Quantity int
Amount decimal.Decimal
CreatedAt time.Time
}
库存服务的 TCC 实现:
type StockTCCService struct {
db *sql.DB
redis *redis.Client
}
func (s *StockTCCService) Try(
ctx context.Context,
tx *Transaction,
) error {
// 1. 冻结库存(不真正扣减)
key := fmt.Sprintf("stock_frozen:%d:%s", tx.ProductID, tx.ID)
// Lua 脚本保证原子性
script := `
local stockKey = 'stock:' .. KEYS[1]
local frozenKey = KEYS[2]
local quantity = tonumber(ARGV[1])
local stock = redis.call('GET', stockKey)
if not stock then
return -1 -- 商品不存在
end
stock = tonumber(stock)
if stock >= quantity then
-- 库存足够,冻结
redis.call('DECRBY', stockKey, quantity)
redis.call('SETEX', frozenKey, 600, quantity) -- 10 分钟过期
return 1
else
return 0 -- 库存不足
end
`
result, err := s.redis.Eval(
ctx,
script,
[]string{strconv.FormatInt(tx.ProductID, 10), key},
tx.Quantity,
).Result()
if err != nil {
return err
}
code := result.(int64)
switch code {
case -1:
return errors.New("product not found")
case 0:
return errors.New("stock not enough")
case 1:
log.Info("stock frozen",
"txID", tx.ID,
"productID", tx.ProductID,
"quantity", tx.Quantity)
return nil
default:
return errors.New("unknown error")
}
}
func (s *StockTCCService) Confirm(
ctx context.Context,
tx *Transaction,
) error {
// 2. 确认扣减(已在 Try 阶段完成,只需清理冻结记录)
key := fmt.Sprintf("stock_frozen:%d:%s", tx.ProductID, tx.ID)
// 删除冻结记录
s.redis.Del(ctx, key)
// 更新数据库
_, err := s.db.ExecContext(ctx, `
UPDATE products
SET stock = stock - ?,
sold = sold + ?,
updated_at = NOW()
WHERE id = ?
`, tx.Quantity, tx.Quantity, tx.ProductID)
if err != nil {
log.Error("confirm stock failed",
"txID", tx.ID,
"err", err)
return err
}
log.Info("stock confirmed",
"txID", tx.ID,
"productID", tx.ProductID)
return nil
}
func (s *StockTCCService) Cancel(
ctx context.Context,
tx *Transaction,
) error {
// 3. 取消(释放冻结的库存)
stockKey := fmt.Sprintf("stock:%d", tx.ProductID)
frozenKey := fmt.Sprintf("stock_frozen:%d:%s", tx.ProductID, tx.ID)
script := `
local stockKey = KEYS[1]
local frozenKey = KEYS[2]
local quantity = redis.call('GET', frozenKey)
if quantity then
-- 恢复库存
redis.call('INCRBY', stockKey, quantity)
redis.call('DEL', frozenKey)
return tonumber(quantity)
end
return 0
`
result, err := s.redis.Eval(
ctx,
script,
[]string{stockKey, frozenKey},
).Result()
if err != nil {
return err
}
releasedQty := result.(int64)
log.Info("stock canceled",
"txID", tx.ID,
"productID", tx.ProductID,
"quantity", releasedQty)
return nil
}
账户服务的 TCC 实现:
type AccountTCCService struct {
db *sql.DB
}
func (s *AccountTCCService) Try(
ctx context.Context,
tx *Transaction,
) error {
// 1. 冻结余额
result, err := s.db.ExecContext(ctx, `
UPDATE accounts
SET balance = balance - ?,
frozen = frozen + ?
WHERE user_id = ?
AND balance >= ?
`, tx.Amount, tx.Amount, tx.UserID, tx.Amount)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("balance not enough")
}
log.Info("balance frozen",
"txID", tx.ID,
"userID", tx.UserID,
"amount", tx.Amount)
return nil
}
func (s *AccountTCCService) Confirm(
ctx context.Context,
tx *Transaction,
) error {
// 2. 确认扣减(解冻)
_, err := s.db.ExecContext(ctx, `
UPDATE accounts
SET frozen = frozen - ?
WHERE user_id = ?
`, tx.Amount, tx.UserID)
if err != nil {
log.Error("confirm balance failed",
"txID", tx.ID,
"err", err)
return err
}
log.Info("balance confirmed",
"txID", tx.ID,
"userID", tx.UserID)
return nil
}
func (s *AccountTCCService) Cancel(
ctx context.Context,
tx *Transaction,
) error {
// 3. 取消(恢复余额)
_, err := s.db.ExecContext(ctx, `
UPDATE accounts
SET balance = balance + ?,
frozen = frozen - ?
WHERE user_id = ?
`, tx.Amount, tx.Amount, tx.UserID)
if err != nil {
log.Error("cancel balance failed",
"txID", tx.ID,
"err", err)
return err
}
log.Info("balance canceled",
"txID", tx.ID,
"userID", tx.UserID)
return nil
}
TCC 协调器:
type TCCCoordinator struct {
services []TCCService
}
func (tc *TCCCoordinator) Execute(
ctx context.Context,
tx *Transaction,
) error {
log.Info("TCC transaction started", "txID", tx.ID)
// 阶段 1: Try
for i, service := range tc.services {
err := service.Try(ctx, tx)
if err != nil {
log.Error("try failed",
"txID", tx.ID,
"service", i,
"err", err)
// Try 失败,取消所有已执行的 Try
tc.cancelAll(ctx, tx)
return err
}
}
log.Info("TCC try phase completed", "txID", tx.ID)
// 阶段 2: Confirm
for i, service := range tc.services {
err := service.Confirm(ctx, tx)
if err != nil {
// Confirm 失败,记录日志,异步重试
log.Error("confirm failed",
"txID", tx.ID,
"service", i,
"err", err)
// 异步重试(最多 3 次)
go tc.retryConfirm(ctx, tx, service, 3)
}
}
log.Info("TCC transaction completed", "txID", tx.ID)
return nil
}
func (tc *TCCCoordinator) cancelAll(
ctx context.Context,
tx *Transaction,
) {
log.Warn("canceling all TCC services", "txID", tx.ID)
for i, service := range tc.services {
err := service.Cancel(ctx, tx)
if err != nil {
log.Error("cancel failed",
"txID", tx.ID,
"service", i,
"err", err)
}
}
}
func (tc *TCCCoordinator) retryConfirm(
ctx context.Context,
tx *Transaction,
service TCCService,
maxRetries int,
) {
for attempt := 0; attempt < maxRetries; attempt++ {
// 指数退避
backoff := time.Duration(1<<uint(attempt)) * time.Second
time.Sleep(backoff)
err := service.Confirm(ctx, tx)
if err == nil {
log.Info("confirm retry succeeded",
"txID", tx.ID,
"attempt", attempt)
return
}
log.Warn("confirm retry failed",
"txID", tx.ID,
"attempt", attempt,
"err", err)
}
// 重试失败,人工介入
log.Error("confirm retry exhausted", "txID", tx.ID)
alertService.Send("TCC confirm failed", map[string]interface{}{
"txID": tx.ID,
"tx": tx,
})
}
使用示例:
func PlaceOrderWithTCC(
ctx context.Context,
userID int64,
productID int64,
quantity int,
) (*Order, error) {
// 查询商品价格
price, err := productService.GetPrice(ctx, productID)
if err != nil {
return nil, err
}
amount := price.Mul(decimal.NewFromInt(int64(quantity)))
// 创建事务
tx := &Transaction{
ID: uuid.New().String(),
UserID: userID,
ProductID: productID,
Quantity: quantity,
Amount: amount,
CreatedAt: time.Now(),
}
// 创建协调器
coordinator := &TCCCoordinator{
services: []TCCService{
stockTCCService,
accountTCCService,
pointsTCCService,
},
}
// 执行 TCC
err = coordinator.Execute(ctx, tx)
if err != nil {
return nil, err
}
// 创建订单
order := &Order{
ID: tx.ID,
UserID: userID,
ProductID: productID,
Quantity: quantity,
TotalAmount: amount,
Status: OrderStatusPaid,
CreatedAt: time.Now(),
}
_, err = db.ExecContext(ctx, `
INSERT INTO orders (
id, user_id, product_id, quantity,
total_amount, status, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
`, order.ID, order.UserID, order.ProductID, order.Quantity,
order.TotalAmount, order.Status, order.CreatedAt)
if err != nil {
return nil, err
}
return order, nil
}
TCC vs Saga 对比:
| 特性 | Saga | TCC |
|---|---|---|
| 隔离性 | 弱(中间状态可见) | 强(Try 阶段锁资源) |
| 性能 | 好 | 较差 |
| 复杂度 | 低 | 高 |
| 一致性 | 最终一致性 | 强一致性 |
| 实现难度 | 中等 | 困难 |
| 适用场景 | 不要求强隔离 | 要求强隔离 |
2.3 难点三:幂等性设计
问题场景
场景: 用户点击"提交订单"按钮
1. 网络抖动,用户以为没反应
2. 用户重复点击 3 次
3. 每次请求都到达服务器
4. 创建了 3 个订单!
5. 扣了 3 次钱!
6. 用户投诉!
或者:
1. 支付成功,回调通知
2. 网络超时,支付平台重试
3. 收到 2 次回调
4. 订单状态混乱
5. 可能重复发货
---幂等 Token
type IdempotentService struct {
redis *redis.Client
ttl time.Duration
}
// 生成幂等 token(前端调用)
func (s *IdempotentService) GenerateToken(userID int64) (string, error) {
token := fmt.Sprintf("%d_%s", userID, uuid.New().String())
// 保存到 Redis(防止伪造)
key := fmt.Sprintf("idempotent_token:%s", token)
err := s.redis.Set(
context.Background(),
key,
"unused",
24*time.Hour,
).Err()
return token, err
}
// 检查并消费 token
func (s *IdempotentService) ConsumeToken(
ctx context.Context,
token string,
) (bool, error) {
key := fmt.Sprintf("idempotent_token:%s", token)
// 使用 Lua 脚本保证原子性
script := `
local key = KEYS[1]
local value = redis.call('GET', key)
if not value then
return -1 -- token 不存在或已过期
end
if value == 'unused' then
-- token 未使用,标记为 processing
redis.call('SET', key, 'processing', 'EX', 300)
return 1 -- 成功
elseif value == 'processing' then
return 0 -- 正在处理中
else
return 2 -- 已使用,返回结果
end
`
result, err := s.redis.Eval(ctx, script, []string{key}).Result()
if err != nil {
return false, err
}
code := result.(int64)
switch code {
case -1:
return false, errors.New("token invalid or expired")
case 0:
return false, errors.New("request is being processed")
case 1:
return true, nil
case 2:
return false, nil // 已处理,返回缓存结果
default:
return false, errors.New("unknown error")
}
}
// 保存幂等结果
func (s *IdempotentService) SaveResult(
ctx context.Context,
token string,
result interface{},
) error {
key := fmt.Sprintf("idempotent_token:%s", token)
data, err := json.Marshal(result)
if err != nil {
return err
}
// 保存结果(24 小时)
return s.redis.Set(ctx, key, data, 24*time.Hour).Err()
}
// 获取幂等结果
func (s *IdempotentService) GetResult(
ctx context.Context,
token string,
) (interface{}, error) {
key := fmt.Sprintf("idempotent_token:%s", token)
data, err := s.redis.Get(ctx, key).Bytes()
if err != nil {
return nil, err
}
var result interface{}
err = json.Unmarshal(data, &result)
return result, err
}
在订单接口中使用:
type CreateOrderRequest struct {
UserID int64 `json:"user_id"`
ProductID int64 `json:"product_id"`
Quantity int `json:"quantity"`
IdempotentToken string `json:"idempotent_token"`
}
func (s *OrderService) CreateOrderIdempotent(
ctx context.Context,
req *CreateOrderRequest,
) (*Order, error) {
// 1. 检查幂等 token
if req.IdempotentToken == "" {
return nil, errors.New("idempotent token required")
}
canProcess, err := s.idempotentService.ConsumeToken(
ctx,
req.IdempotentToken,
)
if err != nil {
return nil, err
}
if !canProcess {
// Token 已使用,返回之前的结果
result, err := s.idempotentService.GetResult(
ctx,
req.IdempotentToken,
)
if err != nil {
return nil, err
}
order := result.(*Order)
log.Info("returning cached order",
"token", req.IdempotentToken,
"orderID", order.ID)
return order, nil
}
log.Info("processing new order request",
"token", req.IdempotentToken)
// 2. 创建订单
order, err := s.createOrder(ctx, req)
if err != nil {
// 失败,删除 token(允许重试)
key := fmt.Sprintf("idempotent_token:%s", req.IdempotentToken)
s.redis.Del(ctx, key)
return nil, err
}
// 3. 保存结果
err = s.idempotentService.SaveResult(
ctx,
req.IdempotentToken,
order,
)
if err != nil {
log.Error("save idempotent result failed",
"token", req.IdempotentToken,
"err", err)
}
return order, nil
}
前端使用:
// 1. 获取幂等 token
const token = await fetch('/api/token/generate', {
method: 'POST',
body: JSON.stringify({ user_id: userId })
});
// 2. 提交订单(携带 token)
const order = await fetch('/api/orders', {
method: 'POST',
body: JSON.stringify({
user_id: userId,
product_id: productId,
quantity: 1,
idempotent_token: token
})
});
// 即使用户点击多次,也只会创建一个订单!
2.4 难点四:订单状态机
完整的订单状态
type OrderStatus string
const (
// 基础状态
OrderStatusCreated OrderStatus = "Created" // 已创建,等待支付
OrderStatusPaid OrderStatus = "Paid" // 已支付
OrderStatusShipped OrderStatus = "Shipped" // 已发货
OrderStatusDelivered OrderStatus = "Delivered" // 已送达
OrderStatusCompleted OrderStatus = "Completed" // 已完成
OrderStatusCanceled OrderStatus = "Canceled" // 已取消
// 异常状态
OrderStatusRefunding OrderStatus = "Refunding" // 退款中
OrderStatusRefunded OrderStatus = "Refunded" // 已退款
OrderStatusDisputed OrderStatus = "Disputed" // 有纠纷
// 中间状态
OrderStatusPacking OrderStatus = "Packing" // 打包中
OrderStatusShipping OrderStatus = "Shipping" // 配送中
)
状态转换规则
type OrderTransition struct {
From []OrderStatus
To OrderStatus
Action func(*Order) error
}
var orderTransitions = []OrderTransition{
{
From: []OrderStatus{OrderStatusCreated},
To: OrderStatusPaid,
Action: onOrderPaid,
},
{
From: []OrderStatus{OrderStatusCreated},
To: OrderStatusCanceled,
Action: onOrderCanceled,
},
{
From: []OrderStatus{OrderStatusPaid},
To: OrderStatusPacking,
Action: onOrderPacking,
},
{
From: []OrderStatus{OrderStatusPacking},
To: OrderStatusShipped,
Action: onOrderShipped,
},
{
From: []OrderStatus{OrderStatusShipped},
To: OrderStatusDelivered,
Action: onOrderDelivered,
},
{
From: []OrderStatus{OrderStatusDelivered},
To: OrderStatusCompleted,
Action: onOrderCompleted,
},
{
From: []OrderStatus{OrderStatusPaid, OrderStatusShipped},
To: OrderStatusRefunding,
Action: onOrderRefunding,
},
{
From: []OrderStatus{OrderStatusRefunding},
To: OrderStatusRefunded,
Action: onOrderRefunded,
},
}
func ApplyOrderTransition(
ctx context.Context,
db *sql.DB,
orderID int64,
to OrderStatus,
) error {
// 1. 查询订单
var order Order
var version int64
err := db.QueryRowContext(ctx, `
SELECT id, user_id, status, version
FROM orders
WHERE id = ?
`, orderID).Scan(&order.ID, &order.UserID, &order.Status, &version)
if err != nil {
return err
}
// 2. 查找匹配的转换
var found *OrderTransition
for i := range orderTransitions {
t := &orderTransitions[i]
if t.To != to {
continue
}
for _, from := range t.From {
if from == order.Status {
found = t
break
}
}
if found != nil {
break
}
}
if found == nil {
return fmt.Errorf(
"invalid transition %s -> %s",
order.Status,
to,
)
}
log.Info("applying order transition",
"orderID", orderID,
"from", order.Status,
"to", to)
// 3. 执行副作用
if err := found.Action(&order); err != nil {
return fmt.Errorf("action failed: %w", err)
}
// 4. 更新状态(CAS)
result, err := db.ExecContext(ctx, `
UPDATE orders
SET status = ?,
version = version + 1,
updated_at = NOW()
WHERE id = ?
AND version = ?
`, to, orderID, version)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("order version conflict")
}
log.Info("order transition completed",
"orderID", orderID,
"newStatus", to)
return nil
}
副作用实现
func onOrderPaid(order *Order) error {
log.Info("order paid", "orderID", order.ID)
// 1. 发送通知
notificationService.Send(
order.UserID,
"Payment successful",
)
// 2. 触发物流打包
warehouseService.CreatePackingTask(order.ID)
return nil
}
func onOrderShipped(order *Order) error {
log.Info("order shipped", "orderID", order.ID)
// 1. 发送物流通知
notificationService.Send(
order.UserID,
fmt.Sprintf("Order shipped, tracking: %s", order.TrackingNumber),
)
// 2. 启动自动确认收货定时器(7 天后)
deliveryService.ScheduleAutoConfirm(order.ID, 7*24*time.Hour)
return nil
}
func onOrderCompleted(order *Order) error {
log.Info("order completed", "orderID", order.ID)
// 1. 增加积分
points := int(order.TotalAmount.IntPart())
pointsService.Add(order.UserID, points)
// 2. 发送评价邀请
reviewService.SendReviewInvitation(order.ID)
return nil
}
func onOrderRefunded(order *Order) error {
log.Info("order refunded", "orderID", order.ID)
// 1. 退款到账户
accountService.Refund(order.UserID, order.TotalAmount)
// 2. 恢复库存
stockService.Restore(order.ProductID, order.Quantity)
// 3. 扣除积分
points := int(order.TotalAmount.IntPart())
pointsService.Deduct(order.UserID, points)
return nil
}
三、数据库表设计
3.1 订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_no VARCHAR(32) UNIQUE NOT NULL,
user_id BIGINT NOT NULL,
-- 商品信息
product_id BIGINT NOT NULL,
product_name VARCHAR(256) NOT NULL,
product_price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
-- 状态
status VARCHAR(32) NOT NULL,
version BIGINT NOT NULL DEFAULT 0,
-- 支付信息
payment_method VARCHAR(32),
paid_at TIMESTAMP,
payment_no VARCHAR(64),
-- 物流信息
shipping_address VARCHAR(512),
shipping_method VARCHAR(32),
tracking_number VARCHAR(64),
shipped_at TIMESTAMP,
delivered_at TIMESTAMP,
-- 退款信息
refund_reason TEXT,
refund_amount DECIMAL(10,2),
refunded_at TIMESTAMP,
-- 取消信息
cancel_reason TEXT,
canceled_at TIMESTAMP,
-- 时间
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
INDEX idx_user_status (user_id, status),
INDEX idx_status_created (status, created_at),
INDEX idx_order_no (order_no)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.2 订单详情表(一对多)
CREATE TABLE order_items (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
product_name VARCHAR(256) NOT NULL,
product_price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL,
subtotal DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_order (order_id),
INDEX idx_product (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.3 订单日志表
CREATE TABLE order_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
old_status VARCHAR(32),
new_status VARCHAR(32) NOT NULL,
operator VARCHAR(128),
remark TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_order (order_id),
INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
四、总结
4.1 为什么交易系统排第四?
比普通业务系统难得多:
- 钱的事不能错: 一分钱都不能多也不能少
- 并发冲突复杂: 超卖、重复扣款、状态混乱
- 分布式事务: Saga、TCC 实现复杂
- 幂等性要求高: 防止重复提交
但比前三者简单:
- 状态相对少: 订单就十几个状态
- 不需要资源调度: 不像调度器那么复杂
- 单机为主: 不像一致性协议需要多数派
- 同步为主: 不像异步系统需要处理背压
4.2 关键技术点
库存扣减:
- 悲观锁(简单但慢)
- 乐观锁(快但需要重试)
- Redis 原子扣减(最快)
- 预占库存(体验最好)
分布式事务:
- Saga(最终一致性)
- TCC(强一致性)
- 补偿机制
幂等性:
- 幂等 Token
- 状态机校验
- 去重表
状态机:
- 表驱动
- CAS 更新
- 副作用分离
4.3 学习路径
- 理解基础: 数据库事务、ACID
- 学习模式: Saga、TCC、幂等性
- 看源码: 电商开源项目
- 压力测试: 模拟高并发场景
- 生产使用: 在真实项目中应用
交易系统是电商的核心,值得深入学习!
本系列文章
➤ [NO.1 调度器]
➤ [NO.2 一致性协议](Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ NO.4 交易系统(钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)