HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 微服务架构实战

    • 微服务架构设计手册
    • 第1章:微服务架构概述
    • 第2章:服务拆分与边界
    • 第3章:服务间通信
    • 第4章:数据一致性方案

第4章:数据一致性方案

分布式事务挑战

单体应用的事务

数据库事务(ACID):

// 单体应用:一个数据库事务
func (s *OrderService) CreateOrder(order Order) error {
    tx, _ := s.db.Begin()
    defer tx.Rollback()

    // 1. 创建订单
    if err := tx.Exec("INSERT INTO orders ..."); err != nil {
        return err
    }

    // 2. 扣减库存
    if err := tx.Exec("UPDATE inventory SET stock = stock - ? WHERE product_id = ?"); err != nil {
        return err
    }

    // 3. 扣减余额
    if err := tx.Exec("UPDATE accounts SET balance = balance - ? WHERE user_id = ?"); err != nil {
        return err
    }

    // 全部成功才提交
    return tx.Commit()
}

// 特点:
//  ACID保证
//  原子性(要么全成功,要么全失败)
//  实现简单

微服务的事务挑战

问题场景:

┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│Order Service │    │Inventory     │    │Payment       │
│(订单数据库)   │    │Service       │    │Service       │
│              │    │(库存数据库)   │    │(支付数据库)   │
└──────────────┘    └──────────────┘    └──────────────┘

创建订单流程:
1. Order Service创建订单
2. Inventory Service扣减库存
3. Payment Service扣减余额

问题:
 3个独立数据库,无法用一个事务
 如果步骤2失败,步骤1已执行,如何回滚?
 如果步骤3超时,不知道是否成功?

核心挑战:

1. 原子性:如何保证多个服务的操作要么全成功,要么全失败?
2. 一致性:如何保证最终数据一致?
3. 隔离性:并发操作如何处理?
4. 持久性:操作是否已持久化?

两阶段提交(2PC)

原理

Two-Phase Commit:协调者协调所有参与者完成分布式事务

角色:

Coordinator(协调者):事务协调者
Participants(参与者):各个服务

阶段:

阶段1:准备阶段(Prepare)

Coordinator → Participant1: 准备提交?
Coordinator → Participant2: 准备提交?
Coordinator → Participant3: 准备提交?

Participant1 → Coordinator: Yes(锁定资源)
Participant2 → Coordinator: Yes(锁定资源)
Participant3 → Coordinator: No(失败)

阶段2:提交阶段(Commit/Abort)

如果所有Participant都返回Yes:
Coordinator → 所有Participant: Commit

如果有任何Participant返回No:
Coordinator → 所有Participant: Abort(回滚)

代码实现

Coordinator(协调者):

package main

import (
    "context"
    "errors"
    "log"
    "time"
)

// 2PC协调者
type TwoPhaseCoordinator struct {
    participants []Participant
}

// 参与者接口
type Participant interface {
    Prepare(ctx context.Context, txID string) error
    Commit(ctx context.Context, txID string) error
    Abort(ctx context.Context, txID string) error
}

// 执行分布式事务
func (c *TwoPhaseCoordinator) Execute(txID string) error {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // ========== 阶段1:准备阶段 ==========
    log.Printf("[2PC] Phase 1: Prepare (txID: %s)", txID)

    for i, participant := range c.participants {
        if err := participant.Prepare(ctx, txID); err != nil {
            log.Printf("[2PC] Participant %d prepare failed: %v", i, err)

            // 有一个失败,全部回滚
            c.abortAll(ctx, txID)
            return err
        }

        log.Printf("[2PC] Participant %d prepared", i)
    }

    log.Printf("[2PC] All participants prepared")

    // ========== 阶段2:提交阶段 ==========
    log.Printf("[2PC] Phase 2: Commit")

    for i, participant := range c.participants {
        if err := participant.Commit(ctx, txID); err != nil {
            log.Printf("[2PC] Participant %d commit failed: %v", i, err)
            // 提交阶段失败,事务状态不确定!
            return err
        }

        log.Printf("[2PC] Participant %d committed", i)
    }

    log.Printf("[2PC] Transaction committed successfully")
    return nil
}

// 回滚所有参与者
func (c *TwoPhaseCoordinator) abortAll(ctx context.Context, txID string) {
    log.Printf("[2PC] Aborting all participants")

    for i, participant := range c.participants {
        if err := participant.Abort(ctx, txID); err != nil {
            log.Printf("[2PC] Participant %d abort failed: %v", i, err)
        }
    }
}

Participant(参与者)示例:

// Order Service参与者
type OrderServiceParticipant struct {
    db *sql.DB
}

func (p *OrderServiceParticipant) Prepare(ctx context.Context, txID string) error {
    // 1. 开启本地事务
    tx, err := p.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }

    // 2. 执行业务逻辑(创建订单)
    _, err = tx.ExecContext(ctx, `
        INSERT INTO orders (id, user_id, amount, status, tx_id)
        VALUES (?, ?, ?, 'PREPARED', ?)
    `, generateID(), "USER-001", 100.0, txID)

    if err != nil {
        tx.Rollback()
        return err
    }

    // 3. 不提交,保持锁定状态
    // 将事务句柄保存起来,等待Commit/Abort
    saveTx(txID, tx)

    log.Printf("[OrderService] Prepared (txID: %s)", txID)
    return nil
}

func (p *OrderServiceParticipant) Commit(ctx context.Context, txID string) error {
    // 1. 获取事务句柄
    tx := getTx(txID)
    if tx == nil {
        return errors.New("transaction not found")
    }

    // 2. 更新状态为COMMITTED
    _, err := tx.ExecContext(ctx, `
        UPDATE orders SET status = 'COMMITTED' WHERE tx_id = ?
    `, txID)

    if err != nil {
        tx.Rollback()
        return err
    }

    // 3. 提交事务
    if err := tx.Commit(); err != nil {
        return err
    }

    log.Printf("[OrderService] Committed (txID: %s)", txID)
    return nil
}

func (p *OrderServiceParticipant) Abort(ctx context.Context, txID string) error {
    // 1. 获取事务句柄
    tx := getTx(txID)
    if tx == nil {
        return errors.New("transaction not found")
    }

    // 2. 回滚事务
    if err := tx.Rollback(); err != nil {
        return err
    }

    log.Printf("[OrderService] Aborted (txID: %s)", txID)
    return nil
}

使用示例:

func main() {
    // 创建参与者
    orderService := &OrderServiceParticipant{db: orderDB}
    inventoryService := &InventoryServiceParticipant{db: inventoryDB}
    paymentService := &PaymentServiceParticipant{db: paymentDB}

    // 创建协调者
    coordinator := &TwoPhaseCoordinator{
        participants: []Participant{
            orderService,
            inventoryService,
            paymentService,
        },
    }

    // 执行分布式事务
    txID := "TX-" + generateID()
    if err := coordinator.Execute(txID); err != nil {
        log.Printf("Transaction failed: %v", err)
    } else {
        log.Printf("Transaction succeeded")
    }
}

2PC的问题

1. 同步阻塞

准备阶段:所有参与者锁定资源,等待Coordinator指令
问题:资源被长时间锁定,其他事务无法访问

2. 单点故障

如果Coordinator故障:
- 准备阶段:参与者一直等待,资源被锁定
- 提交阶段:参与者不知道提交还是回滚

3. 数据不一致

场景:
1. Coordinator发送Commit给Participant1(成功)
2. Coordinator故障,未发送给Participant2/3

结果:
Participant1已提交
Participant2/3不知道是提交还是回滚
→ 数据不一致!

4. 性能差

两次网络往返
资源长时间锁定
不适合高并发场景

三阶段提交(3PC)

原理

改进2PC的阻塞问题,增加超时机制

阶段:

阶段1:CanCommit(询问)

Coordinator → Participant: 能否提交?
Participant → Coordinator: Yes/No

特点:不锁定资源,只询问

阶段2:PreCommit(预提交)

如果所有Participant都Yes:
Coordinator → Participant: PreCommit
Participant锁定资源,执行但不提交

如果有No:
Coordinator → Participant: Abort

阶段3:DoCommit(提交)

Coordinator → Participant: DoCommit
Participant提交事务

超时机制:
如果Participant长时间未收到DoCommit,自动提交
(假设Coordinator已决定提交)

3PC vs 2PC

对比维度2PC3PC
阶段数23
超时机制
阻塞问题严重有所改善
数据一致性可能不一致仍可能不一致
性能差更差

结论:

 2PC/3PC都不推荐用于微服务
 性能差、阻塞、单点故障
 推荐:Saga、TCC、本地消息表

TCC补偿模式

原理

TCC(Try-Confirm-Cancel):业务层面的分布式事务

三个阶段:

1. Try(尝试)

预留资源
不是真正执行,而是检查并冻结资源

示例:
订单服务:创建订单(状态:待确认)
库存服务:冻结库存(减少可用库存,不减少总库存)
支付服务:冻结余额

2. Confirm(确认)

真正执行业务
使用Try阶段冻结的资源

示例:
订单服务:订单状态改为已确认
库存服务:扣减库存(真正减少)
支付服务:扣减余额

3. Cancel(取消)

释放Try阶段冻结的资源

示例:
订单服务:删除订单
库存服务:释放冻结的库存
支付服务:释放冻结的余额

代码实现

TCC协调者:

package main

import (
    "context"
    "errors"
    "log"
)

// TCC参与者接口
type TCCParticipant interface {
    Try(ctx context.Context, txID string) error
    Confirm(ctx context.Context, txID string) error
    Cancel(ctx context.Context, txID string) error
}

// TCC协调者
type TCCCoordinator struct {
    participants []TCCParticipant
}

// 执行TCC事务
func (c *TCCCoordinator) Execute(ctx context.Context, txID string) error {
    // ========== 阶段1:Try ==========
    log.Printf("[TCC] Phase 1: Try (txID: %s)", txID)

    for i, participant := range c.participants {
        if err := participant.Try(ctx, txID); err != nil {
            log.Printf("[TCC] Participant %d try failed: %v", i, err)

            // Try失败,执行Cancel
            c.cancelAll(ctx, txID)
            return err
        }

        log.Printf("[TCC] Participant %d try succeeded", i)
    }

    log.Printf("[TCC] All participants try succeeded")

    // ========== 阶段2:Confirm ==========
    log.Printf("[TCC] Phase 2: Confirm")

    for i, participant := range c.participants {
        if err := participant.Confirm(ctx, txID); err != nil {
            log.Printf("[TCC] Participant %d confirm failed: %v", i, err)

            // Confirm失败,执行Cancel(补偿)
            c.cancelAll(ctx, txID)
            return err
        }

        log.Printf("[TCC] Participant %d confirmed", i)
    }

    log.Printf("[TCC] Transaction confirmed successfully")
    return nil
}

// 取消所有参与者
func (c *TCCCoordinator) cancelAll(ctx context.Context, txID string) {
    log.Printf("[TCC] Canceling all participants")

    for i, participant := range c.participants {
        if err := participant.Cancel(ctx, txID); err != nil {
            log.Printf("[TCC] Participant %d cancel failed: %v", i, err)
            // Cancel失败需要重试,直到成功
        }
    }
}

库存服务TCC实现:

// 库存TCC参与者
type InventoryTCCParticipant struct {
    db *sql.DB
}

// Try:冻结库存
func (p *InventoryTCCParticipant) Try(ctx context.Context, txID string) error {
    tx, _ := p.db.BeginTx(ctx, nil)
    defer tx.Rollback()

    // 1. 检查库存是否足够
    var availableStock int
    err := tx.QueryRowContext(ctx, `
        SELECT stock - frozen_stock FROM inventory WHERE product_id = ?
    `, "PROD-001").Scan(&availableStock)

    if err != nil {
        return err
    }

    if availableStock < 10 {
        return errors.New("insufficient stock")
    }

    // 2. 冻结库存(增加frozen_stock)
    _, err = tx.ExecContext(ctx, `
        UPDATE inventory
        SET frozen_stock = frozen_stock + ?
        WHERE product_id = ?
    `, 10, "PROD-001")

    if err != nil {
        return err
    }

    // 3. 记录冻结记录
    _, err = tx.ExecContext(ctx, `
        INSERT INTO frozen_stock_records (tx_id, product_id, quantity, status)
        VALUES (?, ?, ?, 'FROZEN')
    `, txID, "PROD-001", 10)

    if err != nil {
        return err
    }

    tx.Commit()
    log.Printf("[Inventory] Stock frozen (txID: %s)", txID)
    return nil
}

// Confirm:真正扣减库存
func (p *InventoryTCCParticipant) Confirm(ctx context.Context, txID string) error {
    tx, _ := p.db.BeginTx(ctx, nil)
    defer tx.Rollback()

    // 1. 获取冻结数量
    var quantity int
    err := tx.QueryRowContext(ctx, `
        SELECT quantity FROM frozen_stock_records
        WHERE tx_id = ? AND status = 'FROZEN'
    `, txID).Scan(&quantity)

    if err != nil {
        return err
    }

    // 2. 扣减总库存,减少冻结库存
    _, err = tx.ExecContext(ctx, `
        UPDATE inventory
        SET stock = stock - ?,
            frozen_stock = frozen_stock - ?
        WHERE product_id = ?
    `, quantity, quantity, "PROD-001")

    if err != nil {
        return err
    }

    // 3. 更新冻结记录状态
    _, err = tx.ExecContext(ctx, `
        UPDATE frozen_stock_records
        SET status = 'CONFIRMED'
        WHERE tx_id = ?
    `, txID)

    if err != nil {
        return err
    }

    tx.Commit()
    log.Printf("[Inventory] Stock deducted (txID: %s)", txID)
    return nil
}

// Cancel:释放冻结的库存
func (p *InventoryTCCParticipant) Cancel(ctx context.Context, txID string) error {
    tx, _ := p.db.BeginTx(ctx, nil)
    defer tx.Rollback()

    // 1. 获取冻结数量
    var quantity int
    err := tx.QueryRowContext(ctx, `
        SELECT quantity FROM frozen_stock_records
        WHERE tx_id = ? AND status = 'FROZEN'
    `, txID).Scan(&quantity)

    if err != nil {
        return err
    }

    // 2. 释放冻结库存
    _, err = tx.ExecContext(ctx, `
        UPDATE inventory
        SET frozen_stock = frozen_stock - ?
        WHERE product_id = ?
    `, quantity, "PROD-001")

    if err != nil {
        return err
    }

    // 3. 更新冻结记录状态
    _, err = tx.ExecContext(ctx, `
        UPDATE frozen_stock_records
        SET status = 'CANCELLED'
        WHERE tx_id = ?
    `, txID)

    if err != nil {
        return err
    }

    tx.Commit()
    log.Printf("[Inventory] Stock released (txID: %s)", txID)
    return nil
}

支付服务TCC实现:

// 支付TCC参与者
type PaymentTCCParticipant struct {
    db *sql.DB
}

// Try:冻结余额
func (p *PaymentTCCParticipant) Try(ctx context.Context, txID string) error {
    tx, _ := p.db.BeginTx(ctx, nil)
    defer tx.Rollback()

    // 1. 检查余额
    var availableBalance float64
    err := tx.QueryRowContext(ctx, `
        SELECT balance - frozen_balance FROM accounts WHERE user_id = ?
    `, "USER-001").Scan(&availableBalance)

    if err != nil {
        return err
    }

    if availableBalance < 100.0 {
        return errors.New("insufficient balance")
    }

    // 2. 冻结余额
    _, err = tx.ExecContext(ctx, `
        UPDATE accounts
        SET frozen_balance = frozen_balance + ?
        WHERE user_id = ?
    `, 100.0, "USER-001")

    if err != nil {
        return err
    }

    // 3. 记录冻结
    _, err = tx.ExecContext(ctx, `
        INSERT INTO frozen_balance_records (tx_id, user_id, amount, status)
        VALUES (?, ?, ?, 'FROZEN')
    `, txID, "USER-001", 100.0)

    if err != nil {
        return err
    }

    tx.Commit()
    log.Printf("[Payment] Balance frozen (txID: %s)", txID)
    return nil
}

// Confirm:扣减余额
func (p *PaymentTCCParticipant) Confirm(ctx context.Context, txID string) error {
    tx, _ := p.db.BeginTx(ctx, nil)
    defer tx.Rollback()

    // 1. 获取冻结金额
    var amount float64
    err := tx.QueryRowContext(ctx, `
        SELECT amount FROM frozen_balance_records
        WHERE tx_id = ? AND status = 'FROZEN'
    `, txID).Scan(&amount)

    if err != nil {
        return err
    }

    // 2. 扣减余额
    _, err = tx.ExecContext(ctx, `
        UPDATE accounts
        SET balance = balance - ?,
            frozen_balance = frozen_balance - ?
        WHERE user_id = ?
    `, amount, amount, "USER-001")

    if err != nil {
        return err
    }

    // 3. 更新记录
    _, err = tx.ExecContext(ctx, `
        UPDATE frozen_balance_records
        SET status = 'CONFIRMED'
        WHERE tx_id = ?
    `, txID)

    if err != nil {
        return err
    }

    tx.Commit()
    log.Printf("[Payment] Balance deducted (txID: %s)", txID)
    return nil
}

// Cancel:释放余额
func (p *PaymentTCCParticipant) Cancel(ctx context.Context, txID string) error {
    tx, _ := p.db.BeginTx(ctx, nil)
    defer tx.Rollback()

    // 1. 获取冻结金额
    var amount float64
    err := tx.QueryRowContext(ctx, `
        SELECT amount FROM frozen_balance_records
        WHERE tx_id = ? AND status = 'FROZEN'
    `, txID).Scan(&amount)

    if err != nil {
        return err
    }

    // 2. 释放余额
    _, err = tx.ExecContext(ctx, `
        UPDATE accounts
        SET frozen_balance = frozen_balance - ?
        WHERE user_id = ?
    `, amount, "USER-001")

    if err != nil {
        return err
    }

    // 3. 更新记录
    _, err = tx.ExecContext(ctx, `
        UPDATE frozen_balance_records
        SET status = 'CANCELLED'
        WHERE tx_id = ?
    `, txID)

    if err != nil {
        return err
    }

    tx.Commit()
    log.Printf("[Payment] Balance released (txID: %s)", txID)
    return nil
}

TCC特点

优点:

 无阻塞:Try阶段不锁定资源太久
 性能好:不需要长时间持有锁
 业务控制:业务层面实现补偿

缺点:

 侵入性强:需要实现Try/Confirm/Cancel三个方法
 复杂度高:业务代码复杂
 幂等性:Confirm/Cancel必须幂等(可能重试)

适用场景:

 金融系统(严格一致性要求)
 订单系统
 交易系统

Saga模式

原理

Saga:长事务,将大事务拆分为多个本地事务,通过补偿机制保证最终一致性

核心思想:

T1 → T2 → T3 → ... → Tn

如果Ti失败:
执行补偿:C(Ti-1) → C(Ti-2) → ... → C(T1)

两种实现方式

1. Choreography(编舞模式)

定义:去中心化,服务通过事件协作

流程:

Order Service → 创建订单 → 发布OrderCreated事件
                                ↓
Inventory Service → 监听事件 → 扣减库存 → 发布StockDeducted事件
                                              ↓
Payment Service → 监听事件 → 扣款 → 发布PaymentCompleted事件
                                        ↓
Order Service → 监听事件 → 订单完成

失败场景:
Payment Service扣款失败 → 发布PaymentFailed事件
                              ↓
Inventory Service → 监听事件 → 补偿(恢复库存)
                                  ↓
Order Service → 监听事件 → 取消订单

代码实现:

// Order Service
type OrderService struct {
    eventBus EventBus
}

// 创建订单
func (s *OrderService) CreateOrder(req CreateOrderRequest) (*Order, error) {
    // 1. 创建订单
    order := &Order{
        ID:     generateID(),
        UserID: req.UserID,
        Items:  req.Items,
        Status: OrderStatusPending,
    }

    if err := s.orderRepo.Save(order); err != nil {
        return nil, err
    }

    // 2. 发布事件
    s.eventBus.Publish(OrderCreatedEvent{
        OrderID: order.ID,
        UserID:  order.UserID,
        Items:   order.Items,
        Amount:  calculateTotal(order.Items),
    })

    return order, nil
}

// 监听库存扣减失败事件
func (s *OrderService) HandleStockDeductionFailed(event StockDeductionFailedEvent) {
    // 取消订单
    order, _ := s.orderRepo.FindByID(event.OrderID)
    order.Status = OrderStatusCancelled

    s.orderRepo.Update(order)
    log.Printf("Order cancelled: %s", event.OrderID)
}

// 监听支付完成事件
func (s *OrderService) HandlePaymentCompleted(event PaymentCompletedEvent) {
    // 订单完成
    order, _ := s.orderRepo.FindByID(event.OrderID)
    order.Status = OrderStatusCompleted

    s.orderRepo.Update(order)
    log.Printf("Order completed: %s", event.OrderID)
}
// Inventory Service
type InventoryService struct {
    eventBus EventBus
}

// 监听订单创建事件
func (s *InventoryService) HandleOrderCreated(event OrderCreatedEvent) {
    // 扣减库存
    for _, item := range event.Items {
        if err := s.inventoryRepo.DeductStock(item.ProductID, item.Quantity); err != nil {
            // 扣减失败,发布失败事件
            s.eventBus.Publish(StockDeductionFailedEvent{
                OrderID:   event.OrderID,
                ProductID: item.ProductID,
                Reason:    err.Error(),
            })
            return
        }
    }

    // 扣减成功,发布成功事件
    s.eventBus.Publish(StockDeductedEvent{
        OrderID: event.OrderID,
        Items:   event.Items,
    })

    log.Printf("Stock deducted for order: %s", event.OrderID)
}

// 监听支付失败事件
func (s *InventoryService) HandlePaymentFailed(event PaymentFailedEvent) {
    // 补偿:恢复库存
    order, _ := s.getOrderItems(event.OrderID)

    for _, item := range order.Items {
        s.inventoryRepo.RestoreStock(item.ProductID, item.Quantity)
    }

    log.Printf("Stock restored for order: %s", event.OrderID)
}
// Payment Service
type PaymentService struct {
    eventBus EventBus
}

// 监听库存扣减成功事件
func (s *PaymentService) HandleStockDeducted(event StockDeductedEvent) {
    // 扣款
    if err := s.paymentRepo.Deduct(event.OrderID, event.Amount); err != nil {
        // 扣款失败,发布失败事件
        s.eventBus.Publish(PaymentFailedEvent{
            OrderID: event.OrderID,
            Reason:  err.Error(),
        })
        return
    }

    // 扣款成功,发布成功事件
    s.eventBus.Publish(PaymentCompletedEvent{
        OrderID: event.OrderID,
        Amount:  event.Amount,
    })

    log.Printf("Payment completed for order: %s", event.OrderID)
}

特点:

 去中心化,无单点故障
 服务自治
 复杂度高(事件依赖复杂)
 难以追踪(分散在各个服务)

2. Orchestration(编排模式)

定义:中心化,由Orchestrator协调所有服务

流程:

                 ┌──────────────┐
                 │ Orchestrator │
                 └──────────────┘
                    ↓    ↓    ↓
         ┌──────────┼────┼────┼──────────┐
         ↓          ↓    ↓    ↓          ↓
    Order      Inventory Payment    Notification
    Service    Service   Service    Service

Orchestrator控制流程:
1. 调用Order Service创建订单
2. 调用Inventory Service扣减库存
3. 调用Payment Service扣款
4. 调用Notification Service发送通知

如果步骤3失败:
- 调用Inventory Service恢复库存
- 调用Order Service取消订单

代码实现:

// Saga Orchestrator
type OrderSagaOrchestrator struct {
    orderService     *OrderService
    inventoryService *InventoryService
    paymentService   *PaymentService
}

// 执行订单Saga
func (o *OrderSagaOrchestrator) Execute(req CreateOrderRequest) error {
    sagaID := generateID()

    // 1. 创建订单
    order, err := o.orderService.CreateOrder(sagaID, req)
    if err != nil {
        log.Printf("[Saga] Step 1 failed: create order")
        return err
    }

    log.Printf("[Saga] Step 1 completed: order created (%s)", order.ID)

    // 2. 扣减库存
    if err := o.inventoryService.DeductStock(sagaID, req.Items); err != nil {
        log.Printf("[Saga] Step 2 failed: deduct stock, compensating...")

        // 补偿:取消订单
        o.orderService.CancelOrder(sagaID, order.ID)
        return err
    }

    log.Printf("[Saga] Step 2 completed: stock deducted")

    // 3. 扣款
    if err := o.paymentService.Deduct(sagaID, order.ID, order.Amount); err != nil {
        log.Printf("[Saga] Step 3 failed: payment, compensating...")

        // 补偿:恢复库存
        o.inventoryService.RestoreStock(sagaID, req.Items)

        // 补偿:取消订单
        o.orderService.CancelOrder(sagaID, order.ID)

        return err
    }

    log.Printf("[Saga] Step 3 completed: payment succeeded")

    // 4. 订单完成
    o.orderService.CompleteOrder(sagaID, order.ID)

    log.Printf("[Saga] Transaction completed (sagaID: %s)", sagaID)
    return nil
}

Order Service:

type OrderService struct {
    db *sql.DB
}

// 创建订单
func (s *OrderService) CreateOrder(sagaID string, req CreateOrderRequest) (*Order, error) {
    order := &Order{
        ID:     generateID(),
        UserID: req.UserID,
        Items:  req.Items,
        Amount: calculateTotal(req.Items),
        Status: OrderStatusPending,
        SagaID: sagaID,
    }

    _, err := s.db.Exec(`
        INSERT INTO orders (id, user_id, amount, status, saga_id)
        VALUES (?, ?, ?, ?, ?)
    `, order.ID, order.UserID, order.Amount, order.Status, sagaID)

    if err != nil {
        return nil, err
    }

    return order, nil
}

// 取消订单(补偿)
func (s *OrderService) CancelOrder(sagaID, orderID string) error {
    _, err := s.db.Exec(`
        UPDATE orders SET status = 'CANCELLED'
        WHERE id = ? AND saga_id = ?
    `, orderID, sagaID)

    log.Printf("[OrderService] Order cancelled (compensation): %s", orderID)
    return err
}

// 完成订单
func (s *OrderService) CompleteOrder(sagaID, orderID string) error {
    _, err := s.db.Exec(`
        UPDATE orders SET status = 'COMPLETED'
        WHERE id = ? AND saga_id = ?
    `, orderID, sagaID)

    log.Printf("[OrderService] Order completed: %s", orderID)
    return err
}

Inventory Service:

type InventoryService struct {
    db *sql.DB
}

// 扣减库存
func (s *InventoryService) DeductStock(sagaID string, items []OrderItem) error {
    tx, _ := s.db.Begin()
    defer tx.Rollback()

    for _, item := range items {
        // 检查库存
        var stock int
        err := tx.QueryRow(`
            SELECT stock FROM inventory WHERE product_id = ?
        `, item.ProductID).Scan(&stock)

        if err != nil {
            return err
        }

        if stock < item.Quantity {
            return errors.New("insufficient stock")
        }

        // 扣减库存
        _, err = tx.Exec(`
            UPDATE inventory SET stock = stock - ?
            WHERE product_id = ?
        `, item.Quantity, item.ProductID)

        if err != nil {
            return err
        }

        // 记录Saga日志
        _, err = tx.Exec(`
            INSERT INTO saga_inventory_log (saga_id, product_id, quantity, operation)
            VALUES (?, ?, ?, 'DEDUCT')
        `, sagaID, item.ProductID, item.Quantity)
    }

    return tx.Commit()
}

// 恢复库存(补偿)
func (s *InventoryService) RestoreStock(sagaID string, items []OrderItem) error {
    tx, _ := s.db.Begin()
    defer tx.Rollback()

    for _, item := range items {
        // 恢复库存
        _, err := tx.Exec(`
            UPDATE inventory SET stock = stock + ?
            WHERE product_id = ?
        `, item.Quantity, item.ProductID)

        if err != nil {
            return err
        }

        // 记录补偿日志
        _, err = tx.Exec(`
            INSERT INTO saga_inventory_log (saga_id, product_id, quantity, operation)
            VALUES (?, ?, ?, 'RESTORE')
        `, sagaID, item.ProductID, item.Quantity)
    }

    log.Printf("[InventoryService] Stock restored (compensation)")
    return tx.Commit()
}

特点:

 中心化控制,易于追踪
 流程清晰
 易于实现复杂补偿逻辑
 Orchestrator是单点
 服务耦合到Orchestrator

Choreography vs Orchestration

维度ChoreographyOrchestration
控制方式去中心化中心化
协调机制事件驱动协调者控制
单点故障无Orchestrator是单点
流程追踪困难容易
服务自治高低
复杂度事件依赖复杂Orchestrator逻辑复杂
适用场景简单流程复杂流程

本地消息表

原理

核心思想:利用本地事务保证消息可靠发送

流程:

1. Order Service:
   BEGIN TRANSACTION
     - 创建订单
     - 插入消息表(OrderCreated事件)
   COMMIT

2. 定时任务:
   - 扫描消息表中未发送的消息
   - 发送到消息队列
   - 标记为已发送

3. Inventory Service:
   - 消费消息
   - 扣减库存

代码实现

Order Service(生产者):

// Order Service
type OrderService struct {
    db       *sql.DB
    eventBus *KafkaProducer
}

// 创建订单
func (s *OrderService) CreateOrder(req CreateOrderRequest) (*Order, error) {
    tx, _ := s.db.Begin()
    defer tx.Rollback()

    // 1. 创建订单
    order := &Order{
        ID:     generateID(),
        UserID: req.UserID,
        Items:  req.Items,
        Amount: calculateTotal(req.Items),
        Status: OrderStatusPending,
    }

    _, err := tx.Exec(`
        INSERT INTO orders (id, user_id, amount, status)
        VALUES (?, ?, ?, ?)
    `, order.ID, order.UserID, order.Amount, order.Status)

    if err != nil {
        return nil, err
    }

    // 2. 插入消息表(本地事务保证)
    event := OrderCreatedEvent{
        OrderID: order.ID,
        UserID:  order.UserID,
        Items:   order.Items,
        Amount:  order.Amount,
    }

    eventData, _ := json.Marshal(event)

    _, err = tx.Exec(`
        INSERT INTO local_message_table (id, event_type, event_data, status)
        VALUES (?, ?, ?, 'PENDING')
    `, generateID(), "OrderCreated", eventData)

    if err != nil {
        return nil, err
    }

    // 3. 提交事务(订单和消息要么都成功,要么都失败)
    if err := tx.Commit(); err != nil {
        return nil, err
    }

    log.Printf("Order created with local message: %s", order.ID)
    return order, nil
}

消息发送器(定时任务):

// 消息发送器
type MessageSender struct {
    db       *sql.DB
    eventBus *KafkaProducer
}

// 定时扫描并发送消息
func (s *MessageSender) Start() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        s.sendPendingMessages()
    }
}

func (s *MessageSender) sendPendingMessages() {
    // 1. 查询未发送的消息
    rows, err := s.db.Query(`
        SELECT id, event_type, event_data
        FROM local_message_table
        WHERE status = 'PENDING'
        LIMIT 100
    `)

    if err != nil {
        log.Printf("Query failed: %v", err)
        return
    }
    defer rows.Close()

    // 2. 发送每条消息
    for rows.Next() {
        var id, eventType string
        var eventData []byte

        rows.Scan(&id, &eventType, &eventData)

        // 发送到Kafka
        if err := s.eventBus.PublishRaw(eventType, eventData); err != nil {
            log.Printf("Failed to publish message %s: %v", id, err)
            continue
        }

        // 3. 标记为已发送
        _, err := s.db.Exec(`
            UPDATE local_message_table
            SET status = 'SENT', sent_at = NOW()
            WHERE id = ?
        `, id)

        if err != nil {
            log.Printf("Failed to update message status: %v", err)
        } else {
            log.Printf("Message sent: %s", id)
        }
    }
}

Inventory Service(消费者):

// Inventory Service
type InventoryService struct {
    db              *sql.DB
    processedEvents map[string]bool  // 幂等性:记录已处理的事件
}

// 监听订单创建事件
func (s *InventoryService) HandleOrderCreated(event OrderCreatedEvent) error {
    // 幂等性检查
    if s.isEventProcessed(event.OrderID) {
        log.Printf("Event already processed: %s", event.OrderID)
        return nil
    }

    tx, _ := s.db.Begin()
    defer tx.Rollback()

    // 1. 扣减库存
    for _, item := range event.Items {
        _, err := tx.Exec(`
            UPDATE inventory SET stock = stock - ?
            WHERE product_id = ?
        `, item.Quantity, item.ProductID)

        if err != nil {
            return err
        }
    }

    // 2. 记录已处理(幂等性)
    _, err := tx.Exec(`
        INSERT INTO processed_events (event_id, event_type, processed_at)
        VALUES (?, ?, NOW())
    `, event.OrderID, "OrderCreated")

    if err != nil {
        return err
    }

    tx.Commit()

    s.markEventProcessed(event.OrderID)
    log.Printf("Stock deducted for order: %s", event.OrderID)
    return nil
}

func (s *InventoryService) isEventProcessed(eventID string) bool {
    var count int
    s.db.QueryRow(`
        SELECT COUNT(*) FROM processed_events WHERE event_id = ?
    `, eventID).Scan(&count)

    return count > 0
}

本地消息表特点

优点:

 实现简单
 利用本地事务保证可靠性
 不依赖外部分布式事务组件
 性能好

缺点:

 最终一致性(有延迟)
 需要定时任务扫描
 需要实现幂等性

适用场景:

 高并发场景
 可接受最终一致性
 事件驱动架构

最佳实践

方案选择

场景推荐方案原因
金融交易TCC严格一致性要求
订单系统Saga(Orchestration)流程清晰,易于追踪
事件通知本地消息表 + 消息队列高性能,最终一致性
简单流程Saga(Choreography)服务自治,去中心化
强一致性不用微服务(或用2PC)2PC性能差,不推荐

设计原则

1. 幂等性

// 每个操作必须幂等
func (s *Service) HandleEvent(event Event) error {
    // 1. 幂等性检查
    if s.isProcessed(event.ID) {
        return nil  // 已处理,直接返回
    }

    // 2. 执行业务逻辑
    if err := s.doSomething(); err != nil {
        return err
    }

    // 3. 标记已处理
    s.markProcessed(event.ID)
    return nil
}

2. 超时和重试

// 超时控制
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 重试
err := retry.Do(
    func() error {
        return callService(ctx)
    },
    retry.Attempts(3),
    retry.Delay(time.Second),
)

3. 补偿操作必须成功

// 补偿操作必须重试直到成功
func (s *Service) Compensate(ctx context.Context) error {
    for {
        err := s.doCompensate()
        if err == nil {
            return nil
        }

        log.Printf("Compensation failed: %v, retrying...", err)
        time.Sleep(5 * time.Second)

        // 检查context是否取消
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
    }
}

4. 监控和告警

// 记录每个步骤
func (o *Orchestrator) Execute() error {
    metrics.SagaStarted.Inc()

    if err := o.step1(); err != nil {
        metrics.SagaFailed.WithLabelValues("step1").Inc()
        return err
    }

    if err := o.step2(); err != nil {
        metrics.SagaFailed.WithLabelValues("step2").Inc()
        o.compensateStep1()  // 补偿
        return err
    }

    metrics.SagaCompleted.Inc()
    return nil
}

面试问答

什么是分布式事务?有哪些解决方案?

答案:

定义:

分布式事务:跨多个数据库/服务的事务操作
目标:保证多个操作的原子性和一致性

挑战:

单体:一个数据库事务(ACID)
微服务:多个独立数据库,无法用一个事务

示例:
订单服务创建订单 → 库存服务扣库存 → 支付服务扣款
如何保证三个操作的一致性?

解决方案:

方案一致性性能复杂度推荐度
2PC强一致差高不推荐
3PC强一致差高不推荐
TCC最终一致好高金融场景
Saga最终一致好中推荐
本地消息表最终一致好低推荐

实践建议:

1. 优先避免分布式事务(服务拆分时注意边界)
2. 可接受最终一致性 → Saga或本地消息表
3. 严格一致性 → TCC(代价:复杂度高)
4. 不要用2PC/3PC(性能差、阻塞)

Saga的Choreography和Orchestration如何选择?

答案:

Choreography(编舞):

定义:去中心化,服务通过事件协作

优点:
 去中心化,无单点故障
 服务自治
 松耦合

缺点:
 事件依赖复杂
 难以追踪整个流程
 测试困难

适用场景:
 简单流程(2-3个服务)
 服务高度自治
 事件驱动架构

Orchestration(编排):

定义:中心化,Orchestrator协调所有服务

优点:
 流程清晰,易于追踪
 易于实现复杂补偿逻辑
 集中管理

缺点:
 Orchestrator是单点
 服务耦合到Orchestrator

适用场景:
 复杂流程(3+服务)
 需要集中管理和监控
 频繁变化的业务流程

选择建议:

简单流程(如订单通知):
→ Choreography

复杂流程(如订单-库存-支付-物流):
→ Orchestration

混合使用:
核心流程用Orchestration
非关键流程用Choreography

如何保证分布式事务的幂等性?

答案:

问题:

网络不稳定,消息可能重复
Saga补偿可能重试
如何保证操作只执行一次?

方案1:唯一ID + 数据库唯一约束

// 使用唯一事件ID
type Event struct {
    EventID string  // 全局唯一
    OrderID string
}

// 数据库唯一约束
CREATE TABLE processed_events (
    event_id VARCHAR(64) PRIMARY KEY,
    processed_at TIMESTAMP
);

// 处理事件
func HandleEvent(event Event) error {
    // 插入processed_events(唯一约束)
    _, err := db.Exec(`
        INSERT INTO processed_events (event_id, processed_at)
        VALUES (?, NOW())
    `, event.EventID)

    if isDuplicateKeyError(err) {
        // 已处理过,直接返回
        return nil
    }

    // 执行业务逻辑
    doBusinessLogic(event)
    return nil
}

方案2:版本号

// 乐观锁
func UpdateStock(productID string, quantity int, version int) error {
    result, err := db.Exec(`
        UPDATE inventory
        SET stock = stock - ?, version = version + 1
        WHERE product_id = ? AND version = ?
    `, quantity, productID, version)

    if err != nil {
        return err
    }

    affected, _ := result.RowsAffected()
    if affected == 0 {
        return errors.New("version conflict")
    }

    return nil
}

方案3:状态机

// 订单状态机
const (
    OrderStatusPending   = "PENDING"
    OrderStatusProcessing = "PROCESSING"
    OrderStatusCompleted = "COMPLETED"
)

func ProcessOrder(orderID string) error {
    // 只有PENDING状态才能转为PROCESSING
    result, err := db.Exec(`
        UPDATE orders
        SET status = 'PROCESSING'
        WHERE id = ? AND status = 'PENDING'
    `, orderID)

    if err != nil {
        return err
    }

    affected, _ := result.RowsAffected()
    if affected == 0 {
        // 已经处理过或状态不对
        return nil
    }

    // 执行业务逻辑
    doBusinessLogic(orderID)

    // 更新为COMPLETED
    db.Exec(`
        UPDATE orders SET status = 'COMPLETED' WHERE id = ?
    `, orderID)

    return nil
}

本地消息表的可靠性如何保证?

答案:

核心机制:

1. 本地事务保证

// 订单和消息在同一个事务
tx.Begin()
  tx.Exec("INSERT INTO orders ...")
  tx.Exec("INSERT INTO local_message_table ...")
tx.Commit()

// 要么都成功,要么都失败

2. 定时扫描发送

func sendPendingMessages() {
    // 每秒扫描一次
    ticker := time.NewTicker(1 * time.Second)

    for range ticker.C {
        messages := queryPendingMessages()

        for _, msg := range messages {
            // 发送到Kafka
            kafka.Publish(msg)

            // 标记为已发送
            updateMessageStatus(msg.ID, "SENT")
        }
    }
}

3. 消费幂等

func HandleMessage(event Event) error {
    // 检查是否已处理
    if isProcessed(event.ID) {
        return nil
    }

    // 处理业务逻辑
    doBusinessLogic(event)

    // 标记已处理
    markProcessed(event.ID)
    return nil
}

4. 失败重试

// Kafka消费失败,自动重试
consumer.Consume(func(msg Message) error {
    if err := handle(msg); err != nil {
        // 返回错误,消息不确认,会自动重试
        return err
    }

    // 确认消息
    msg.Ack()
    return nil
})

可靠性保证:

1. 本地事务 → 消息一定会写入消息表
2. 定时扫描 → 消息一定会发送(最终)
3. Kafka持久化 → 消息不会丢失
4. 消费幂等 → 重复消费不影响结果

最终一致性:有延迟(秒级),但一定会达成一致

微服务数据一致性的最佳实践是什么?

答案:

1. 优先避免分布式事务

原则:服务拆分时注意边界

反例:
Order Service管订单
Inventory Service管库存
→ 创建订单必须扣库存(分布式事务)

正例:
Order Service管订单+库存
→ 一个服务,本地事务

结论:合理拆分服务,避免频繁跨服务事务

2. 接受最终一致性

大多数场景不需要强一致性

示例:
订单创建后,发送确认邮件
→ 不需要强一致,最终发送即可

3. 使用Saga或本地消息表

必须跨服务事务时:

简单流程:
→ 本地消息表 + 消息队列

复杂流程:
→ Saga(Orchestration)

金融场景:
→ TCC

4. 保证幂等性

// 所有操作必须幂等
func HandleEvent(event Event) error {
    if isProcessed(event.ID) {
        return nil  // 已处理
    }

    doBusinessLogic(event)
    markProcessed(event.ID)
    return nil
}

5. 监控和告警

监控指标:
- Saga成功率
- 补偿次数
- 消息积压
- 处理延迟

告警:
- Saga失败超过阈值
- 消息堆积
- 补偿失败

参考资料

  • Saga Pattern
  • Chris Richardson - Microservices Patterns
  • Martin Fowler - Event Sourcing
  • 阿里云 - 分布式事务解决方案
  • Seata - 分布式事务框架
Prev
第3章:服务间通信