HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 系统设计实战

    • 系统设计面试教程
    • 系统设计方法论
    • 01-短链系统设计
    • 02 - 秒杀系统设计
    • 03 - IM 即时通讯系统设计
    • 04 - Feed 流系统设计
    • 05 - 分布式 ID 生成器设计
    • 06 - 限流系统设计
    • 第7章:搜索引擎设计
    • 08 - 推荐系统设计
    • 09 - 支付系统设计
    • 10 - 电商系统设计
    • 11 - 直播系统设计
    • 第12章:缓存系统设计
    • 第13章:消息队列设计
    • 第14章:分布式事务
    • 15 - 监控系统设计

第14章:分布式事务

面试频率: \ │ │ / | \ │ │ / | \ │ │ / | \ │ │ / | \ │ │ / | \ │ │ / CP | CA \ │ │ / | \ │ │ /|\ │ │ A P P │ │ │ │ AP: 最终一致性(常用) │ │ CP: 强一致性 │ │ CA: 不存在(网络一定会分区) │ └─────────────────────────────────┘

实际选择:

  • 金融系统:CP(强一致性)
  • 互联网系统:AP(最终一致性)

### BASE理论

BASE = Basically Available, Soft state, Eventually consistent

BA (Basically Available)

  • 基本可用:系统允许损失部分可用性
  • 例:响应时间增加、降级服务

S (Soft state)

  • 软状态:允许中间状态
  • 例:订单"处理中"状态

E (Eventually consistent)

  • 最终一致性:不保证实时一致,但最终会一致
  • 例:余额更新延迟几秒

对比: ACID(强一致性) vs BASE(最终一致性) 单机数据库 vs 分布式系统


---

## 2PC两阶段提交

### 2PC原理

两阶段提交(Two-Phase Commit)

角色:

  • 协调者(Coordinator):事务管理器
  • 参与者(Participant):各个资源管理器

流程: ┌────────────┐ │ Coordinator│ └─────┬──────┘ │ │ Phase 1: Prepare(准备阶段) ├──────> 参与者A: "能提交吗?" ├──────> 参与者B: "能提交吗?" ├──────> 参与者C: "能提交吗?" │ ↓ 收到所有YES │ │ Phase 2: Commit(提交阶段) ├──────> 参与者A: "提交!" ├──────> 参与者B: "提交!" └──────> 参与者C: "提交!"

详细流程:

  1. Prepare阶段

    • Coordinator发送prepare请求
    • Participant执行事务操作,但不提交
    • Participant锁定资源
    • Participant返回YES/NO
  2. Commit阶段

    • 如果所有Participant都返回YES → Coordinator发送commit → Participant提交事务
    • 如果任何Participant返回NO → Coordinator发送abort → Participant回滚事务

### 2PC实现

```go
package transaction

import (
	"context"
	"errors"
	"sync"
	"time"
)

// TwoPhaseCommit 两阶段提交协调器
type TwoPhaseCommit struct {
	participants []Participant
	timeout      time.Duration
}

// Participant 参与者接口
type Participant interface {
	Prepare(ctx context.Context, txID string) error
	Commit(ctx context.Context, txID string) error
	Abort(ctx context.Context, txID string) error
}

// Execute 执行2PC事务
func (tpc *TwoPhaseCommit) Execute(ctx context.Context, txID string) error {
	// Phase 1: Prepare
	if err := tpc.preparePhase(ctx, txID); err != nil {
		// Prepare失败,执行回滚
		tpc.abortPhase(ctx, txID)
		return err
	}

	// Phase 2: Commit
	if err := tpc.commitPhase(ctx, txID); err != nil {
		// Commit失败,尝试回滚(可能部分成功)
		tpc.abortPhase(ctx, txID)
		return err
	}

	return nil
}

// preparePhase Prepare阶段
func (tpc *TwoPhaseCommit) preparePhase(ctx context.Context, txID string) error {
	ctx, cancel := context.WithTimeout(ctx, tpc.timeout)
	defer cancel()

	errChan := make(chan error, len(tpc.participants))
	var wg sync.WaitGroup

	// 并发发送Prepare请求
	for _, p := range tpc.participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			err := participant.Prepare(ctx, txID)
			if err != nil {
				errChan <- err
			}
		}(p)
	}

	wg.Wait()
	close(errChan)

	// 检查是否所有Participant都Prepare成功
	for err := range errChan {
		if err != nil {
			return err
		}
	}

	return nil
}

// commitPhase Commit阶段
func (tpc *TwoPhaseCommit) commitPhase(ctx context.Context, txID string) error {
	ctx, cancel := context.WithTimeout(ctx, tpc.timeout)
	defer cancel()

	errChan := make(chan error, len(tpc.participants))
	var wg sync.WaitGroup

	for _, p := range tpc.participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			err := participant.Commit(ctx, txID)
			if err != nil {
				errChan <- err
			}
		}(p)
	}

	wg.Wait()
	close(errChan)

	// 收集错误
	var errs []error
	for err := range errChan {
		if err != nil {
			errs = append(errs, err)
		}
	}

	if len(errs) > 0 {
		return errors.New("commit phase failed")
	}

	return nil
}

// abortPhase 回滚阶段
func (tpc *TwoPhaseCommit) abortPhase(ctx context.Context, txID string) {
	ctx, cancel := context.WithTimeout(ctx, tpc.timeout)
	defer cancel()

	var wg sync.WaitGroup
	for _, p := range tpc.participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			participant.Abort(ctx, txID)
		}(p)
	}

	wg.Wait()
}

// 具体参与者实现示例:订单服务
type OrderService struct {
	db *sql.DB
}

func (os *OrderService) Prepare(ctx context.Context, txID string) error {
	// 1. 开启事务
	tx, err := os.db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}

	// 2. 执行业务逻辑(但不提交)
	_, err = tx.ExecContext(ctx, 
		"INSERT INTO orders (order_id, status) VALUES (?, 'pending')", 
		txID)
	if err != nil {
		tx.Rollback()
		return err
	}

	// 3. 保存事务句柄,等待Commit/Abort
	saveTx(txID, tx)

	return nil
}

func (os *OrderService) Commit(ctx context.Context, txID string) error {
	tx := getTx(txID)
	if tx == nil {
		return errors.New("transaction not found")
	}

	return tx.Commit()
}

func (os *OrderService) Abort(ctx context.Context, txID string) error {
	tx := getTx(txID)
	if tx == nil {
		return nil
	}

	return tx.Rollback()
}

2PC的问题

问题1:同步阻塞
┌────────────┐
│Participant │
│ 等待Commit │ → 资源被锁定,无法处理其他请求
└────────────┘

问题2:单点故障
Coordinator宕机 → 所有Participant无限等待

问题3:数据不一致
Phase 2网络分区:
- 部分Participant收到Commit
- 部分Participant未收到
→ 数据不一致

问题4:太过保守
任何一个Participant失败 → 整个事务回滚

3PC三阶段提交

3PC原理

三阶段提交(Three-Phase Commit)
改进:增加超时机制 + Pre-Commit阶段

Phase 1: CanCommit(询问)
  Coordinator: "能提交吗?"
  Participant: "YES/NO"

Phase 2: PreCommit(预提交)
  如果所有YES:
    Coordinator: "预提交!"
    Participant: 执行事务,锁定资源,但不提交
  如果有NO:
    Coordinator: "Abort!"

Phase 3: DoCommit(提交)
  Coordinator: "正式提交!"
  Participant: 提交事务

超时机制:
- Participant在PreCommit后超时 → 自动提交
- 降低阻塞时间

3PC vs 2PC

特性2PC3PC
阶段数2个3个
超时无有
阻塞同步阻塞非阻塞
一致性强一致性可能不一致
性能较差稍好
复杂度低高

选择建议:

  • 实际生产很少用2PC/3PC
  • 原因:性能差、阻塞、单点故障
  • 更多使用:TCC、Saga、最终一致性

TCC补偿模式

TCC原理

TCC = Try-Confirm-Cancel

Try阶段:尝试执行,预留资源
Confirm阶段:确认执行,使用预留资源
Cancel阶段:取消执行,释放预留资源

示例:转账
┌─────────────────────────────────────────────────────┐
│                     转账 A → B                      │
├─────────────────────────────────────────────────────┤
│                                                     │
│  Try阶段:                                          │
│  ┌────────────┐              ┌────────────┐        │
│  │  账户A     │              │  账户B     │        │
│  │  余额:1000 │              │  余额:0    │        │
│  │  冻结:100  │ → 冻结100    │  预留:100  │        │
│  └────────────┘              └────────────┘        │
│                                                     │
│  Confirm阶段:                                      │
│  ┌────────────┐              ┌────────────┐        │
│  │  账户A     │              │  账户B     │        │
│  │  余额:900  │ → 扣除冻结   │  余额:100  │        │
│  │  冻结:0    │              │  预留:0    │        │
│  └────────────┘              └────────────┘        │
│                                                     │
│  Cancel阶段(失败时):                              │
│  ┌────────────┐              ┌────────────┐        │
│  │  账户A     │              │  账户B     │        │
│  │  余额:1000 │ → 解冻       │  余额:0    │        │
│  │  冻结:0    │              │  预留:0    │        │
│  └────────────┘              └────────────┘        │
└─────────────────────────────────────────────────────┘

TCC完整实现

package tcc

import (
	"context"
	"errors"
	"sync"
)

// TCCTransaction TCC事务
type TCCTransaction struct {
	id           string
	participants []*TCCParticipant
}

// TCCParticipant TCC参与者
type TCCParticipant struct {
	serviceName string
	tryFunc     func(ctx context.Context) error
	confirmFunc func(ctx context.Context) error
	cancelFunc  func(ctx context.Context) error
}

// TCCCoordinator TCC协调器
type TCCCoordinator struct {
	transactions sync.Map
}

func NewTCCCoordinator() *TCCCoordinator {
	return &TCCCoordinator{}
}

// Begin 开启TCC事务
func (tc *TCCCoordinator) Begin(txID string) *TCCTransaction {
	tx := &TCCTransaction{
		id:           txID,
		participants: make([]*TCCParticipant, 0),
	}
	tc.transactions.Store(txID, tx)
	return tx
}

// AddParticipant 添加参与者
func (tx *TCCTransaction) AddParticipant(p *TCCParticipant) {
	tx.participants = append(tx.participants, p)
}

// Execute 执行TCC事务
func (tc *TCCCoordinator) Execute(ctx context.Context, txID string) error {
	txInterface, ok := tc.transactions.Load(txID)
	if !ok {
		return errors.New("transaction not found")
	}

	tx := txInterface.(*TCCTransaction)

	// Phase 1: Try
	if err := tc.tryPhase(ctx, tx); err != nil {
		// Try失败,执行Cancel
		tc.cancelPhase(ctx, tx)
		return err
	}

	// Phase 2: Confirm
	if err := tc.confirmPhase(ctx, tx); err != nil {
		// Confirm失败,执行Cancel
		tc.cancelPhase(ctx, tx)
		return err
	}

	return nil
}

// tryPhase Try阶段
func (tc *TCCCoordinator) tryPhase(ctx context.Context, tx *TCCTransaction) error {
	errChan := make(chan error, len(tx.participants))
	var wg sync.WaitGroup

	for _, p := range tx.participants {
		wg.Add(1)
		go func(participant *TCCParticipant) {
			defer wg.Done()
			if err := participant.tryFunc(ctx); err != nil {
				errChan <- err
			}
		}(p)
	}

	wg.Wait()
	close(errChan)

	for err := range errChan {
		if err != nil {
			return err
		}
	}

	return nil
}

// confirmPhase Confirm阶段
func (tc *TCCCoordinator) confirmPhase(ctx context.Context, tx *TCCTransaction) error {
	errChan := make(chan error, len(tx.participants))
	var wg sync.WaitGroup

	for _, p := range tx.participants {
		wg.Add(1)
		go func(participant *TCCParticipant) {
			defer wg.Done()
			if err := participant.confirmFunc(ctx); err != nil {
				errChan <- err
			}
		}(p)
	}

	wg.Wait()
	close(errChan)

	for err := range errChan {
		if err != nil {
			return err
		}
	}

	return nil
}

// cancelPhase Cancel阶段
func (tc *TCCCoordinator) cancelPhase(ctx context.Context, tx *TCCTransaction) {
	var wg sync.WaitGroup

	for _, p := range tx.participants {
		wg.Add(1)
		go func(participant *TCCParticipant) {
			defer wg.Done()
			participant.cancelFunc(ctx)
		}(p)
	}

	wg.Wait()
}

// 使用示例:下单扣库存
type OrderTCCService struct {
	db *sql.DB
}

// Try: 创建订单(pending状态)
func (s *OrderTCCService) TryCreateOrder(ctx context.Context, order *Order) error {
	_, err := s.db.ExecContext(ctx,
		"INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, 'try')",
		order.OrderID, order.UserID, order.Amount)
	return err
}

// Confirm: 确认订单
func (s *OrderTCCService) ConfirmOrder(ctx context.Context, orderID string) error {
	_, err := s.db.ExecContext(ctx,
		"UPDATE orders SET status = 'confirmed' WHERE order_id = ? AND status = 'try'",
		orderID)
	return err
}

// Cancel: 取消订单
func (s *OrderTCCService) CancelOrder(ctx context.Context, orderID string) error {
	_, err := s.db.ExecContext(ctx,
		"DELETE FROM orders WHERE order_id = ? AND status = 'try'",
		orderID)
	return err
}

// 库存TCC服务
type InventoryTCCService struct {
	db *sql.DB
}

// Try: 冻结库存
func (s *InventoryTCCService) TryFreezeInventory(ctx context.Context, productID string, quantity int) error {
	// 检查库存
	var available int
	err := s.db.QueryRowContext(ctx,
		"SELECT available FROM inventory WHERE product_id = ?", productID).Scan(&available)
	if err != nil {
		return err
	}

	if available < quantity {
		return errors.New("insufficient inventory")
	}

	// 冻结库存
	_, err = s.db.ExecContext(ctx,
		"UPDATE inventory SET available = available - ?, frozen = frozen + ? WHERE product_id = ?",
		quantity, quantity, productID)
	return err
}

// Confirm: 扣减库存
func (s *InventoryTCCService) ConfirmInventory(ctx context.Context, productID string, quantity int) error {
	_, err := s.db.ExecContext(ctx,
		"UPDATE inventory SET frozen = frozen - ? WHERE product_id = ?",
		quantity, productID)
	return err
}

// Cancel: 解冻库存
func (s *InventoryTCCService) CancelInventory(ctx context.Context, productID string, quantity int) error {
	_, err := s.db.ExecContext(ctx,
		"UPDATE inventory SET available = available + ?, frozen = frozen - ? WHERE product_id = ?",
		quantity, quantity, productID)
	return err
}

// 完整流程
func PlaceOrderWithTCC(order *Order) error {
	ctx := context.Background()
	txID := generateTxID()

	coordinator := NewTCCCoordinator()
	tx := coordinator.Begin(txID)

	orderService := &OrderTCCService{db: db}
	inventoryService := &InventoryTCCService{db: db}

	// 添加订单参与者
	tx.AddParticipant(&TCCParticipant{
		serviceName: "order",
		tryFunc: func(ctx context.Context) error {
			return orderService.TryCreateOrder(ctx, order)
		},
		confirmFunc: func(ctx context.Context) error {
			return orderService.ConfirmOrder(ctx, order.OrderID)
		},
		cancelFunc: func(ctx context.Context) error {
			return orderService.CancelOrder(ctx, order.OrderID)
		},
	})

	// 添加库存参与者
	tx.AddParticipant(&TCCParticipant{
		serviceName: "inventory",
		tryFunc: func(ctx context.Context) error {
			return inventoryService.TryFreezeInventory(ctx, order.ProductID, order.Quantity)
		},
		confirmFunc: func(ctx context.Context) error {
			return inventoryService.ConfirmInventory(ctx, order.ProductID, order.Quantity)
		},
		cancelFunc: func(ctx context.Context) error {
			return inventoryService.CancelInventory(ctx, order.ProductID, order.Quantity)
		},
	})

	// 执行TCC事务
	return coordinator.Execute(ctx, txID)
}

TCC注意事项

1. 幂等性
   - Try/Confirm/Cancel都可能重复调用
   - 必须保证幂等

2. 空回滚
   - Try超时,直接收到Cancel
   - Cancel需要判断Try是否执行

3. 悬挂
   - Try超时后Cancel执行
   - Try后续才到达
   - 需要记录Cancel状态,拒绝Try

4. 资源预留
   - Try阶段必须预留资源
   - 不能等到Confirm才检查

数据库设计:
CREATE TABLE tcc_transaction (
  tx_id VARCHAR(64) PRIMARY KEY,
  status ENUM('try', 'confirm', 'cancel'),
  create_time DATETIME,
  update_time DATETIME
);

-- 防止空回滚和悬挂

Saga模式

Saga原理

Saga模式 = 长事务拆分 + 补偿

特点:
- 每个子事务独立提交
- 失败时执行补偿事务

示例:订单流程
┌─────────────────────────────────────────────────┐
│            正向流程                              │
├─────────────────────────────────────────────────┤
│  T1: 创建订单 → T2: 扣库存 → T3: 扣款 → T4: 发货│
│      ↓ 提交       ↓ 提交      ↓ 提交    ↓ 提交 │
└─────────────────────────────────────────────────┘

如果T3失败:
┌─────────────────────────────────────────────────┐
│            补偿流程                              │
├─────────────────────────────────────────────────┤
│  T1: 创建订单 → T2: 扣库存 → T3: 扣款(失败)     │
│      ↓ 提交       ↓ 提交      ↓ 失败           │
│                                ↓                │
│              C2: 恢复库存 ← C1: 取消订单         │
│                  ↓ 补偿        ↓ 补偿           │
└─────────────────────────────────────────────────┘

Saga实现(协调)

package saga

import (
	"context"
	"errors"
)

// SagaStep Saga步骤
type SagaStep struct {
	name         string
	action       func(ctx context.Context) error  // 正向操作
	compensation func(ctx context.Context) error  // 补偿操作
}

// Saga事务
type Saga struct {
	steps          []*SagaStep
	executedSteps  []*SagaStep
}

func NewSaga() *Saga {
	return &Saga{
		steps:         make([]*SagaStep, 0),
		executedSteps: make([]*SagaStep, 0),
	}
}

// AddStep 添加步骤
func (s *Saga) AddStep(name string, action, compensation func(ctx context.Context) error) {
	s.steps = append(s.steps, &SagaStep{
		name:         name,
		action:       action,
		compensation: compensation,
	})
}

// Execute 执行Saga
func (s *Saga) Execute(ctx context.Context) error {
	// 正向执行所有步骤
	for _, step := range s.steps {
		if err := step.action(ctx); err != nil {
			// 失败,执行补偿
			s.compensate(ctx)
			return err
		}

		// 记录已执行步骤
		s.executedSteps = append(s.executedSteps, step)
	}

	return nil
}

// compensate 补偿
func (s *Saga) compensate(ctx context.Context) {
	// 逆序执行补偿
	for i := len(s.executedSteps) - 1; i >= 0; i-- {
		step := s.executedSteps[i]
		if err := step.compensation(ctx); err != nil {
			// 补偿失败,记录日志,人工介入
			log.Printf("Compensation failed for step %s: %v", step.name, err)
		}
	}
}

// 使用示例
func PlaceOrderWithSaga(order *Order) error {
	ctx := context.Background()
	saga := NewSaga()

	// Step 1: 创建订单
	saga.AddStep("create_order",
		func(ctx context.Context) error {
			return createOrder(ctx, order)
		},
		func(ctx context.Context) error {
			return cancelOrder(ctx, order.OrderID)
		})

	// Step 2: 扣减库存
	saga.AddStep("deduct_inventory",
		func(ctx context.Context) error {
			return deductInventory(ctx, order.ProductID, order.Quantity)
		},
		func(ctx context.Context) error {
			return restoreInventory(ctx, order.ProductID, order.Quantity)
		})

	// Step 3: 扣款
	saga.AddStep("deduct_balance",
		func(ctx context.Context) error {
			return deductBalance(ctx, order.UserID, order.Amount)
		},
		func(ctx context.Context) error {
			return refund(ctx, order.UserID, order.Amount)
		})

	// Step 4: 通知发货
	saga.AddStep("notify_shipping",
		func(ctx context.Context) error {
			return notifyShipping(ctx, order.OrderID)
		},
		func(ctx context.Context) error {
			return cancelShipping(ctx, order.OrderID)
		})

	return saga.Execute(ctx)
}

Saga vs TCC

特性SagaTCC
一致性最终一致性强一致性
实现复杂度低高
性能高(无锁)中(有锁)
补偿事后补偿预留资源
适用场景长事务短事务
隔离性弱强

选择建议:

  • 短事务、强一致性 → TCC
  • 长事务、最终一致性 → Saga

本地消息表

原理

本地消息表 = 本地事务 + 消息队列

流程:
┌─────────────────────────────────────────────┐
│  服务A                                      │
│  ┌─────────────────────────────┐           │
│  │ 开启事务                     │           │
│  │ 1. 执行业务(创建订单)       │           │
│  │ 2. 插入消息表(待发送)       │           │
│  │ 提交事务                     │           │
│  └─────────────────────────────┘           │
│         ↓                                   │
│  定时任务扫描消息表                          │
│         ↓                                   │
│  发送消息到MQ                               │
│         ↓                                   │
│  标记消息为已发送                            │
└─────────────────────────────────────────────┘
         ↓
┌─────────────────────────────────────────────┐
│  消息队列(Kafka)                          │
└─────────────────────────────────────────────┘
         ↓
┌─────────────────────────────────────────────┐
│  服务B                                      │
│  消费消息 → 扣减库存                        │
└─────────────────────────────────────────────┘

本地消息表实现

// 消息表
type LocalMessage struct {
	ID          int64
	MessageID   string
	Topic       string
	Payload     string
	Status      string  // pending/sent/failed
	RetryCount  int
	NextRetry   time.Time
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

// 创建订单 + 插入消息表(本地事务)
func CreateOrderWithMessage(order *Order) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}
	defer tx.Rollback()

	// 1. 创建订单
	_, err = tx.Exec(
		"INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, 'created')",
		order.OrderID, order.UserID, order.Amount)
	if err != nil {
		return err
	}

	// 2. 插入消息表
	message := &LocalMessage{
		MessageID: generateMessageID(),
		Topic:     "order-created",
		Payload:   marshalJSON(order),
		Status:    "pending",
		CreatedAt: time.Now(),
	}

	_, err = tx.Exec(
		"INSERT INTO local_messages (message_id, topic, payload, status, created_at) VALUES (?, ?, ?, ?, ?)",
		message.MessageID, message.Topic, message.Payload, message.Status, message.CreatedAt)
	if err != nil {
		return err
	}

	// 3. 提交事务
	return tx.Commit()
}

// 定时任务:扫描并发送消息
func MessageSender() {
	ticker := time.NewTicker(1 * time.Second)

	for range ticker.C {
		// 查询待发送消息
		rows, _ := db.Query(
			"SELECT id, message_id, topic, payload FROM local_messages WHERE status = 'pending' AND next_retry < NOW() LIMIT 100")

		for rows.Next() {
			var msg LocalMessage
			rows.Scan(&msg.ID, &msg.MessageID, &msg.Topic, &msg.Payload)

			// 发送到Kafka
			err := sendToKafka(msg.Topic, msg.MessageID, []byte(msg.Payload))
			if err != nil {
				// 发送失败,更新重试时间
				updateMessageStatus(msg.ID, "pending", msg.RetryCount+1)
				continue
			}

			// 发送成功,标记为已发送
			updateMessageStatus(msg.ID, "sent", msg.RetryCount)
		}

		rows.Close()
	}
}

func sendToKafka(topic, key string, value []byte) error {
	return producer.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{
			Topic:     &topic,
			Partition: kafka.PartitionAny,
		},
		Key:   []byte(key),
		Value: value,
	}, nil)
}

func updateMessageStatus(id int64, status string, retryCount int) {
	nextRetry := time.Now().Add(time.Duration(math.Pow(2, float64(retryCount))) * time.Second)

	db.Exec(
		"UPDATE local_messages SET status = ?, retry_count = ?, next_retry = ?, updated_at = NOW() WHERE id = ?",
		status, retryCount, nextRetry, id)
}

可靠消息最终一致性

RocketMQ事务消息

RocketMQ事务消息流程:

1. 发送Half消息(半消息)
2. 执行本地事务
3. Commit/Rollback Half消息
4. 消费者消费消息

┌─────────────┐
│ Producer    │
└──────┬──────┘
       │
       │ 1. sendHalfMessage
       ├────────────────────────────> ┌─────────────┐
       │                               │  RocketMQ   │
       │ 2. 执行本地事务                │  (Half Msg) │
       │     成功                     └─────────────┘
       │
       │ 3. commitMessage
       ├────────────────────────────>
       │                              ┌─────────────┐
       │                              │  RocketMQ   │
       │                              │  (Normal)   │
       │                              └──────┬──────┘
       │                                     │
       │                                     ↓
       │                              ┌─────────────┐
       │                              │  Consumer   │
       │                              └─────────────┘

如果Producer宕机:
RocketMQ回查事务状态 → Producer检查本地事务 → Commit/Rollback

事务消息实现

// RocketMQ事务消息Producer
type TransactionProducer struct {
	producer *rocketmq.TransactionProducer
}

// 事务监听器
type OrderTransactionListener struct{}

// ExecuteLocalTransaction 执行本地事务
func (l *OrderTransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
	// 解析消息
	var order Order
	json.Unmarshal(msg.Body, &order)

	// 执行本地事务
	err := createOrder(&order)
	if err != nil {
		return primitive.RollbackMessageState
	}

	return primitive.CommitMessageState
}

// CheckLocalTransaction 回查本地事务
func (l *OrderTransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
	// 解析消息
	var order Order
	json.Unmarshal(msg.Body, &order)

	// 检查订单是否存在
	exists, _ := checkOrderExists(order.OrderID)
	if exists {
		return primitive.CommitMessageState
	}

	return primitive.RollbackMessageState
}

// 发送事务消息
func SendTransactionMessage(order *Order) error {
	// 创建事务Producer
	p, err := rocketmq.NewTransactionProducer(
		&OrderTransactionListener{},
		producer.WithNameServer([]string{"localhost:9876"}),
	)
	if err != nil {
		return err
	}

	p.Start()
	defer p.Shutdown()

	// 构造消息
	msg := primitive.NewMessage("order-created", marshalJSON(order))

	// 发送事务消息
	res, err := p.SendMessageInTransaction(context.Background(), msg)
	if err != nil {
		return err
	}

	log.Printf("Send transaction message: %s", res.MsgID)
	return nil
}

最大努力通知

原理

最大努力通知 = 尽力而为 + 定时重试

适用场景:
- 对一致性要求不高
- 允许短期不一致
- 例:支付通知、积分通知

流程:
1. 服务A执行操作
2. 同步调用服务B(失败不回滚)
3. 如果失败,异步重试(指数退避)
4. 重试N次后放弃

重试策略:
第1次:立即
第2次:1分钟后
第3次:5分钟后
第4次:30分钟后
第5次:2小时后
...
放弃:24小时后

实现

type NotificationService struct {
	db       *sql.DB
	maxRetry int
}

// 发送通知(最大努力)
func (ns *NotificationService) SendNotification(event *Event) error {
	// 1. 保存通知记录
	notification := &Notification{
		ID:         generateID(),
		EventType:  event.Type,
		Payload:    marshalJSON(event),
		Status:     "pending",
		RetryCount: 0,
		CreatedAt:  time.Now(),
	}

	ns.db.Exec(
		"INSERT INTO notifications (id, event_type, payload, status, retry_count, created_at) VALUES (?, ?, ?, ?, ?, ?)",
		notification.ID, notification.EventType, notification.Payload, notification.Status, notification.RetryCount, notification.CreatedAt)

	// 2. 尝试发送
	return ns.trySend(notification)
}

func (ns *NotificationService) trySend(notification *Notification) error {
	// 发送HTTP请求
	err := sendHTTPNotification(notification.Payload)
	if err == nil {
		// 成功
		ns.db.Exec("UPDATE notifications SET status = 'success' WHERE id = ?", notification.ID)
		return nil
	}

	// 失败,异步重试
	go ns.scheduleRetry(notification)

	return err
}

func (ns *NotificationService) scheduleRetry(notification *Notification) {
	if notification.RetryCount >= ns.maxRetry {
		// 超过最大重试次数,放弃
		ns.db.Exec("UPDATE notifications SET status = 'failed' WHERE id = ?", notification.ID)
		return
	}

	// 指数退避
	delay := time.Duration(math.Pow(2, float64(notification.RetryCount))) * time.Minute
	time.Sleep(delay)

	// 更新重试次数
	notification.RetryCount++
	ns.db.Exec("UPDATE notifications SET retry_count = ? WHERE id = ?", notification.RetryCount, notification.ID)

	// 重试
	ns.trySend(notification)
}

监控告警

分布式事务监控

# Prometheus指标
distributed_transaction_metrics:
  # TCC事务
  - tcc_transaction_total
  - tcc_transaction_duration_seconds
  - tcc_transaction_success_total
  - tcc_transaction_failure_total
  - tcc_compensation_total

  # Saga事务
  - saga_transaction_total
  - saga_step_duration_seconds
  - saga_compensation_triggered_total

  # 本地消息表
  - local_message_pending_total
  - local_message_send_duration_seconds
  - local_message_retry_count

告警规则

groups:
  - name: transaction_alerts
    rules:
      # 补偿率过高
      - alert: HighCompensationRate
        expr: |
          rate(tcc_compensation_total[5m]) /
          rate(tcc_transaction_total[5m]) > 0.1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "TCC补偿率超过10%"

      # 消息积压
      - alert: MessageBacklog
        expr: local_message_pending_total > 1000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "本地消息表积压超过1000条"

面试问答

什么是分布式事务?为什么需要?

单机事务:

BEGIN;
UPDATE account SET balance = balance - 100 WHERE id = 1;
UPDATE account SET balance = balance + 100 WHERE id = 2;
COMMIT;

数据库保证ACID

分布式事务:

服务A: 创建订单
服务B: 扣减库存
服务C: 扣款

如何保证三个操作要么都成功,要么都失败?

需要分布式事务的原因:

  • 微服务拆分
  • 数据库分库分表
  • 跨系统调用

CAP定理是什么?

CAP = Consistency + Availability + Partition tolerance

C (一致性): 所有节点看到相同数据
A (可用性): 系统一直可用
P (分区容错性): 网络分区时仍能工作

不可能同时满足三个,只能选两个:
- CP: 牺牲可用性,保证一致性(金融系统)
- AP: 牺牲一致性,保证可用性(互联网系统)
- CA: 不存在(网络一定会分区)

实际选择:

  • 银行转账 → CP(强一致性)
  • 微博点赞 → AP(最终一致性)

2PC有什么问题?

问题1:同步阻塞

Prepare阶段,Participant锁定资源
等待Coordinator的Commit/Abort
→ 资源长时间锁定,吞吐量低

问题2:单点故障

Coordinator宕机 → Participant无限等待

问题3:数据不一致

Phase 2: Commit阶段网络分区
部分Participant收到Commit → 提交
部分Participant未收到 → 无法提交
→ 数据不一致

问题4:太过保守

任何一个Participant失败 → 整个事务回滚

TCC和2PC有什么区别?

特性2PCTCC
阶段Prepare + CommitTry + Confirm + Cancel
资源锁定数据库锁业务逻辑锁定
性能差(同步阻塞)好(业务控制)
实现复杂度低高
隔离性数据库保证业务保证

TCC优势:

  • 业务层面控制
  • 不依赖数据库锁
  • 性能更好

TCC劣势:

  • 实现复杂
  • 需要编写Try/Confirm/Cancel三个方法

Saga和TCC如何选择?

TCC:

  • 适合短事务
  • 需要强一致性
  • 资源预留

示例:转账(需要立即锁定资金)

Saga:

  • 适合长事务
  • 最终一致性
  • 事后补偿

示例:订单流程(创建订单→扣库存→支付→发货)

选择建议:

  • 核心业务、强一致性 → TCC
  • 长流程、最终一致性 → Saga

本地消息表如何保证消息一定发送?

1. 本地事务保证消息一定写入
   BEGIN;
   - 执行业务
   - 插入消息表
   COMMIT;

2. 定时任务扫描未发送消息
   SELECT * FROM messages WHERE status = 'pending'

3. 发送到MQ后标记为已发送
   UPDATE messages SET status = 'sent'

4. 失败重试(指数退避)
   第1次:立即
   第2次:1秒后
   第3次:2秒后
   第4次:4秒后
   ...

优点:

  • 消息一定会发送
  • 不丢失

缺点:

  • 需要定时任务
  • 有延迟

如何保证分布式事务的幂等性?

方案1:唯一键约束

CREATE TABLE orders (
  order_id VARCHAR(64) PRIMARY KEY
);
-- 重复INSERT会失败

方案2:状态机

-- 只允许pending → confirmed
UPDATE orders 
SET status = 'confirmed' 
WHERE order_id = ? AND status = 'pending'

方案3:版本号

UPDATE orders 
SET amount = ?, version = version + 1
WHERE order_id = ? AND version = ?

方案4:分布式锁

lock := redis.SetNX("order:"+orderID, 1, 10*time.Second)
if !lock {
  return errors.New("duplicate request")
}
defer redis.Del("order:"+orderID)

分布式事务如何做补偿?

补偿原则:

正向操作 → 补偿操作

创建订单 → 取消订单
扣减库存 → 恢复库存
扣款 → 退款
发送消息 → 无需补偿(幂等)

补偿实现:

// Saga补偿
saga := NewSaga()
saga.AddStep("create_order",
  action: createOrder,
  compensation: cancelOrder)  // 补偿

saga.AddStep("deduct_inventory",
  action: deductInventory,
  compensation: restoreInventory)  // 补偿

if err := saga.Execute(); err != nil {
  // 自动执行补偿
}

注意事项:

  • 补偿操作必须幂等
  • 补偿可能失败,需要重试
  • 记录补偿日志

如何设计一个下单流程的分布式事务?

场景:下单 → 扣库存 → 扣款 → 通知发货

方案1:TCC(强一致性)

Try:
  - 创建订单(pending)
  - 冻结库存
  - 冻结余额
  - 预留发货单

Confirm:
  - 订单confirmed
  - 扣减库存
  - 扣减余额
  - 通知发货

Cancel:
  - 取消订单
  - 解冻库存
  - 解冻余额
  - 取消发货

方案2:Saga(最终一致性,推荐)

Step1: 创建订单
  补偿: 取消订单

Step2: 扣减库存
  补偿: 恢复库存

Step3: 扣款
  补偿: 退款

Step4: 通知发货
  补偿: 取消发货

方案3:本地消息表

1. 本地事务(创建订单 + 插入消息)
2. 发送消息到MQ
3. 库存服务消费消息 → 扣库存
4. 支付服务消费消息 → 扣款
5. 物流服务消费消息 → 发货

选择:

  • 核心订单 → Saga
  • 积分、优惠券 → 最大努力通知

分布式事务性能如何优化?

优化1:减少参与者

不要:订单服务 → 库存服务 → 支付服务 → 物流服务
改为:订单服务 → 库存服务,异步通知支付和物流

优化2:异步化

核心流程同步,非核心流程异步
同步:创建订单、扣库存、扣款
异步:发送短信、积分、优惠券

优化3:降级

高峰期:跳过非核心步骤
- 跳过积分
- 跳过优惠券
- 保证核心流程

优化4:缓存

库存缓存:减少库存服务调用

优化5:批量

批量确认、批量补偿

Prev
第13章:消息队列设计
Next
15 - 监控系统设计