第4章:数据一致性方案
分布式事务挑战
单体应用的事务
数据库事务(ACID):
// 单体应用:一个数据库事务
func (s *OrderService) CreateOrder(order Order) error {
tx, _ := s.db.Begin()
defer tx.Rollback()
// 1. 创建订单
if err := tx.Exec("INSERT INTO orders ..."); err != nil {
return err
}
// 2. 扣减库存
if err := tx.Exec("UPDATE inventory SET stock = stock - ? WHERE product_id = ?"); err != nil {
return err
}
// 3. 扣减余额
if err := tx.Exec("UPDATE accounts SET balance = balance - ? WHERE user_id = ?"); err != nil {
return err
}
// 全部成功才提交
return tx.Commit()
}
// 特点:
// ACID保证
// 原子性(要么全成功,要么全失败)
// 实现简单
微服务的事务挑战
问题场景:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│Order Service │ │Inventory │ │Payment │
│(订单数据库) │ │Service │ │Service │
│ │ │(库存数据库) │ │(支付数据库) │
└──────────────┘ └──────────────┘ └──────────────┘
创建订单流程:
1. Order Service创建订单
2. Inventory Service扣减库存
3. Payment Service扣减余额
问题:
3个独立数据库,无法用一个事务
如果步骤2失败,步骤1已执行,如何回滚?
如果步骤3超时,不知道是否成功?
核心挑战:
1. 原子性:如何保证多个服务的操作要么全成功,要么全失败?
2. 一致性:如何保证最终数据一致?
3. 隔离性:并发操作如何处理?
4. 持久性:操作是否已持久化?
两阶段提交(2PC)
原理
Two-Phase Commit:协调者协调所有参与者完成分布式事务
角色:
Coordinator(协调者):事务协调者
Participants(参与者):各个服务
阶段:
阶段1:准备阶段(Prepare)
Coordinator → Participant1: 准备提交?
Coordinator → Participant2: 准备提交?
Coordinator → Participant3: 准备提交?
Participant1 → Coordinator: Yes(锁定资源)
Participant2 → Coordinator: Yes(锁定资源)
Participant3 → Coordinator: No(失败)
阶段2:提交阶段(Commit/Abort)
如果所有Participant都返回Yes:
Coordinator → 所有Participant: Commit
如果有任何Participant返回No:
Coordinator → 所有Participant: Abort(回滚)
代码实现
Coordinator(协调者):
package main
import (
"context"
"errors"
"log"
"time"
)
// 2PC协调者
type TwoPhaseCoordinator struct {
participants []Participant
}
// 参与者接口
type Participant interface {
Prepare(ctx context.Context, txID string) error
Commit(ctx context.Context, txID string) error
Abort(ctx context.Context, txID string) error
}
// 执行分布式事务
func (c *TwoPhaseCoordinator) Execute(txID string) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// ========== 阶段1:准备阶段 ==========
log.Printf("[2PC] Phase 1: Prepare (txID: %s)", txID)
for i, participant := range c.participants {
if err := participant.Prepare(ctx, txID); err != nil {
log.Printf("[2PC] Participant %d prepare failed: %v", i, err)
// 有一个失败,全部回滚
c.abortAll(ctx, txID)
return err
}
log.Printf("[2PC] Participant %d prepared", i)
}
log.Printf("[2PC] All participants prepared")
// ========== 阶段2:提交阶段 ==========
log.Printf("[2PC] Phase 2: Commit")
for i, participant := range c.participants {
if err := participant.Commit(ctx, txID); err != nil {
log.Printf("[2PC] Participant %d commit failed: %v", i, err)
// 提交阶段失败,事务状态不确定!
return err
}
log.Printf("[2PC] Participant %d committed", i)
}
log.Printf("[2PC] Transaction committed successfully")
return nil
}
// 回滚所有参与者
func (c *TwoPhaseCoordinator) abortAll(ctx context.Context, txID string) {
log.Printf("[2PC] Aborting all participants")
for i, participant := range c.participants {
if err := participant.Abort(ctx, txID); err != nil {
log.Printf("[2PC] Participant %d abort failed: %v", i, err)
}
}
}
Participant(参与者)示例:
// Order Service参与者
type OrderServiceParticipant struct {
db *sql.DB
}
func (p *OrderServiceParticipant) Prepare(ctx context.Context, txID string) error {
// 1. 开启本地事务
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return err
}
// 2. 执行业务逻辑(创建订单)
_, err = tx.ExecContext(ctx, `
INSERT INTO orders (id, user_id, amount, status, tx_id)
VALUES (?, ?, ?, 'PREPARED', ?)
`, generateID(), "USER-001", 100.0, txID)
if err != nil {
tx.Rollback()
return err
}
// 3. 不提交,保持锁定状态
// 将事务句柄保存起来,等待Commit/Abort
saveTx(txID, tx)
log.Printf("[OrderService] Prepared (txID: %s)", txID)
return nil
}
func (p *OrderServiceParticipant) Commit(ctx context.Context, txID string) error {
// 1. 获取事务句柄
tx := getTx(txID)
if tx == nil {
return errors.New("transaction not found")
}
// 2. 更新状态为COMMITTED
_, err := tx.ExecContext(ctx, `
UPDATE orders SET status = 'COMMITTED' WHERE tx_id = ?
`, txID)
if err != nil {
tx.Rollback()
return err
}
// 3. 提交事务
if err := tx.Commit(); err != nil {
return err
}
log.Printf("[OrderService] Committed (txID: %s)", txID)
return nil
}
func (p *OrderServiceParticipant) Abort(ctx context.Context, txID string) error {
// 1. 获取事务句柄
tx := getTx(txID)
if tx == nil {
return errors.New("transaction not found")
}
// 2. 回滚事务
if err := tx.Rollback(); err != nil {
return err
}
log.Printf("[OrderService] Aborted (txID: %s)", txID)
return nil
}
使用示例:
func main() {
// 创建参与者
orderService := &OrderServiceParticipant{db: orderDB}
inventoryService := &InventoryServiceParticipant{db: inventoryDB}
paymentService := &PaymentServiceParticipant{db: paymentDB}
// 创建协调者
coordinator := &TwoPhaseCoordinator{
participants: []Participant{
orderService,
inventoryService,
paymentService,
},
}
// 执行分布式事务
txID := "TX-" + generateID()
if err := coordinator.Execute(txID); err != nil {
log.Printf("Transaction failed: %v", err)
} else {
log.Printf("Transaction succeeded")
}
}
2PC的问题
1. 同步阻塞
准备阶段:所有参与者锁定资源,等待Coordinator指令
问题:资源被长时间锁定,其他事务无法访问
2. 单点故障
如果Coordinator故障:
- 准备阶段:参与者一直等待,资源被锁定
- 提交阶段:参与者不知道提交还是回滚
3. 数据不一致
场景:
1. Coordinator发送Commit给Participant1(成功)
2. Coordinator故障,未发送给Participant2/3
结果:
Participant1已提交
Participant2/3不知道是提交还是回滚
→ 数据不一致!
4. 性能差
两次网络往返
资源长时间锁定
不适合高并发场景
三阶段提交(3PC)
原理
改进2PC的阻塞问题,增加超时机制
阶段:
阶段1:CanCommit(询问)
Coordinator → Participant: 能否提交?
Participant → Coordinator: Yes/No
特点:不锁定资源,只询问
阶段2:PreCommit(预提交)
如果所有Participant都Yes:
Coordinator → Participant: PreCommit
Participant锁定资源,执行但不提交
如果有No:
Coordinator → Participant: Abort
阶段3:DoCommit(提交)
Coordinator → Participant: DoCommit
Participant提交事务
超时机制:
如果Participant长时间未收到DoCommit,自动提交
(假设Coordinator已决定提交)
3PC vs 2PC
| 对比维度 | 2PC | 3PC |
|---|---|---|
| 阶段数 | 2 | 3 |
| 超时机制 | ||
| 阻塞问题 | 严重 | 有所改善 |
| 数据一致性 | 可能不一致 | 仍可能不一致 |
| 性能 | 差 | 更差 |
结论:
2PC/3PC都不推荐用于微服务
性能差、阻塞、单点故障
推荐:Saga、TCC、本地消息表
TCC补偿模式
原理
TCC(Try-Confirm-Cancel):业务层面的分布式事务
三个阶段:
1. Try(尝试)
预留资源
不是真正执行,而是检查并冻结资源
示例:
订单服务:创建订单(状态:待确认)
库存服务:冻结库存(减少可用库存,不减少总库存)
支付服务:冻结余额
2. Confirm(确认)
真正执行业务
使用Try阶段冻结的资源
示例:
订单服务:订单状态改为已确认
库存服务:扣减库存(真正减少)
支付服务:扣减余额
3. Cancel(取消)
释放Try阶段冻结的资源
示例:
订单服务:删除订单
库存服务:释放冻结的库存
支付服务:释放冻结的余额
代码实现
TCC协调者:
package main
import (
"context"
"errors"
"log"
)
// TCC参与者接口
type TCCParticipant interface {
Try(ctx context.Context, txID string) error
Confirm(ctx context.Context, txID string) error
Cancel(ctx context.Context, txID string) error
}
// TCC协调者
type TCCCoordinator struct {
participants []TCCParticipant
}
// 执行TCC事务
func (c *TCCCoordinator) Execute(ctx context.Context, txID string) error {
// ========== 阶段1:Try ==========
log.Printf("[TCC] Phase 1: Try (txID: %s)", txID)
for i, participant := range c.participants {
if err := participant.Try(ctx, txID); err != nil {
log.Printf("[TCC] Participant %d try failed: %v", i, err)
// Try失败,执行Cancel
c.cancelAll(ctx, txID)
return err
}
log.Printf("[TCC] Participant %d try succeeded", i)
}
log.Printf("[TCC] All participants try succeeded")
// ========== 阶段2:Confirm ==========
log.Printf("[TCC] Phase 2: Confirm")
for i, participant := range c.participants {
if err := participant.Confirm(ctx, txID); err != nil {
log.Printf("[TCC] Participant %d confirm failed: %v", i, err)
// Confirm失败,执行Cancel(补偿)
c.cancelAll(ctx, txID)
return err
}
log.Printf("[TCC] Participant %d confirmed", i)
}
log.Printf("[TCC] Transaction confirmed successfully")
return nil
}
// 取消所有参与者
func (c *TCCCoordinator) cancelAll(ctx context.Context, txID string) {
log.Printf("[TCC] Canceling all participants")
for i, participant := range c.participants {
if err := participant.Cancel(ctx, txID); err != nil {
log.Printf("[TCC] Participant %d cancel failed: %v", i, err)
// Cancel失败需要重试,直到成功
}
}
}
库存服务TCC实现:
// 库存TCC参与者
type InventoryTCCParticipant struct {
db *sql.DB
}
// Try:冻结库存
func (p *InventoryTCCParticipant) Try(ctx context.Context, txID string) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 1. 检查库存是否足够
var availableStock int
err := tx.QueryRowContext(ctx, `
SELECT stock - frozen_stock FROM inventory WHERE product_id = ?
`, "PROD-001").Scan(&availableStock)
if err != nil {
return err
}
if availableStock < 10 {
return errors.New("insufficient stock")
}
// 2. 冻结库存(增加frozen_stock)
_, err = tx.ExecContext(ctx, `
UPDATE inventory
SET frozen_stock = frozen_stock + ?
WHERE product_id = ?
`, 10, "PROD-001")
if err != nil {
return err
}
// 3. 记录冻结记录
_, err = tx.ExecContext(ctx, `
INSERT INTO frozen_stock_records (tx_id, product_id, quantity, status)
VALUES (?, ?, ?, 'FROZEN')
`, txID, "PROD-001", 10)
if err != nil {
return err
}
tx.Commit()
log.Printf("[Inventory] Stock frozen (txID: %s)", txID)
return nil
}
// Confirm:真正扣减库存
func (p *InventoryTCCParticipant) Confirm(ctx context.Context, txID string) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 1. 获取冻结数量
var quantity int
err := tx.QueryRowContext(ctx, `
SELECT quantity FROM frozen_stock_records
WHERE tx_id = ? AND status = 'FROZEN'
`, txID).Scan(&quantity)
if err != nil {
return err
}
// 2. 扣减总库存,减少冻结库存
_, err = tx.ExecContext(ctx, `
UPDATE inventory
SET stock = stock - ?,
frozen_stock = frozen_stock - ?
WHERE product_id = ?
`, quantity, quantity, "PROD-001")
if err != nil {
return err
}
// 3. 更新冻结记录状态
_, err = tx.ExecContext(ctx, `
UPDATE frozen_stock_records
SET status = 'CONFIRMED'
WHERE tx_id = ?
`, txID)
if err != nil {
return err
}
tx.Commit()
log.Printf("[Inventory] Stock deducted (txID: %s)", txID)
return nil
}
// Cancel:释放冻结的库存
func (p *InventoryTCCParticipant) Cancel(ctx context.Context, txID string) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 1. 获取冻结数量
var quantity int
err := tx.QueryRowContext(ctx, `
SELECT quantity FROM frozen_stock_records
WHERE tx_id = ? AND status = 'FROZEN'
`, txID).Scan(&quantity)
if err != nil {
return err
}
// 2. 释放冻结库存
_, err = tx.ExecContext(ctx, `
UPDATE inventory
SET frozen_stock = frozen_stock - ?
WHERE product_id = ?
`, quantity, "PROD-001")
if err != nil {
return err
}
// 3. 更新冻结记录状态
_, err = tx.ExecContext(ctx, `
UPDATE frozen_stock_records
SET status = 'CANCELLED'
WHERE tx_id = ?
`, txID)
if err != nil {
return err
}
tx.Commit()
log.Printf("[Inventory] Stock released (txID: %s)", txID)
return nil
}
支付服务TCC实现:
// 支付TCC参与者
type PaymentTCCParticipant struct {
db *sql.DB
}
// Try:冻结余额
func (p *PaymentTCCParticipant) Try(ctx context.Context, txID string) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 1. 检查余额
var availableBalance float64
err := tx.QueryRowContext(ctx, `
SELECT balance - frozen_balance FROM accounts WHERE user_id = ?
`, "USER-001").Scan(&availableBalance)
if err != nil {
return err
}
if availableBalance < 100.0 {
return errors.New("insufficient balance")
}
// 2. 冻结余额
_, err = tx.ExecContext(ctx, `
UPDATE accounts
SET frozen_balance = frozen_balance + ?
WHERE user_id = ?
`, 100.0, "USER-001")
if err != nil {
return err
}
// 3. 记录冻结
_, err = tx.ExecContext(ctx, `
INSERT INTO frozen_balance_records (tx_id, user_id, amount, status)
VALUES (?, ?, ?, 'FROZEN')
`, txID, "USER-001", 100.0)
if err != nil {
return err
}
tx.Commit()
log.Printf("[Payment] Balance frozen (txID: %s)", txID)
return nil
}
// Confirm:扣减余额
func (p *PaymentTCCParticipant) Confirm(ctx context.Context, txID string) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 1. 获取冻结金额
var amount float64
err := tx.QueryRowContext(ctx, `
SELECT amount FROM frozen_balance_records
WHERE tx_id = ? AND status = 'FROZEN'
`, txID).Scan(&amount)
if err != nil {
return err
}
// 2. 扣减余额
_, err = tx.ExecContext(ctx, `
UPDATE accounts
SET balance = balance - ?,
frozen_balance = frozen_balance - ?
WHERE user_id = ?
`, amount, amount, "USER-001")
if err != nil {
return err
}
// 3. 更新记录
_, err = tx.ExecContext(ctx, `
UPDATE frozen_balance_records
SET status = 'CONFIRMED'
WHERE tx_id = ?
`, txID)
if err != nil {
return err
}
tx.Commit()
log.Printf("[Payment] Balance deducted (txID: %s)", txID)
return nil
}
// Cancel:释放余额
func (p *PaymentTCCParticipant) Cancel(ctx context.Context, txID string) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 1. 获取冻结金额
var amount float64
err := tx.QueryRowContext(ctx, `
SELECT amount FROM frozen_balance_records
WHERE tx_id = ? AND status = 'FROZEN'
`, txID).Scan(&amount)
if err != nil {
return err
}
// 2. 释放余额
_, err = tx.ExecContext(ctx, `
UPDATE accounts
SET frozen_balance = frozen_balance - ?
WHERE user_id = ?
`, amount, "USER-001")
if err != nil {
return err
}
// 3. 更新记录
_, err = tx.ExecContext(ctx, `
UPDATE frozen_balance_records
SET status = 'CANCELLED'
WHERE tx_id = ?
`, txID)
if err != nil {
return err
}
tx.Commit()
log.Printf("[Payment] Balance released (txID: %s)", txID)
return nil
}
TCC特点
优点:
无阻塞:Try阶段不锁定资源太久
性能好:不需要长时间持有锁
业务控制:业务层面实现补偿
缺点:
侵入性强:需要实现Try/Confirm/Cancel三个方法
复杂度高:业务代码复杂
幂等性:Confirm/Cancel必须幂等(可能重试)
适用场景:
金融系统(严格一致性要求)
订单系统
交易系统
Saga模式
原理
Saga:长事务,将大事务拆分为多个本地事务,通过补偿机制保证最终一致性
核心思想:
T1 → T2 → T3 → ... → Tn
如果Ti失败:
执行补偿:C(Ti-1) → C(Ti-2) → ... → C(T1)
两种实现方式
1. Choreography(编舞模式)
定义:去中心化,服务通过事件协作
流程:
Order Service → 创建订单 → 发布OrderCreated事件
↓
Inventory Service → 监听事件 → 扣减库存 → 发布StockDeducted事件
↓
Payment Service → 监听事件 → 扣款 → 发布PaymentCompleted事件
↓
Order Service → 监听事件 → 订单完成
失败场景:
Payment Service扣款失败 → 发布PaymentFailed事件
↓
Inventory Service → 监听事件 → 补偿(恢复库存)
↓
Order Service → 监听事件 → 取消订单
代码实现:
// Order Service
type OrderService struct {
eventBus EventBus
}
// 创建订单
func (s *OrderService) CreateOrder(req CreateOrderRequest) (*Order, error) {
// 1. 创建订单
order := &Order{
ID: generateID(),
UserID: req.UserID,
Items: req.Items,
Status: OrderStatusPending,
}
if err := s.orderRepo.Save(order); err != nil {
return nil, err
}
// 2. 发布事件
s.eventBus.Publish(OrderCreatedEvent{
OrderID: order.ID,
UserID: order.UserID,
Items: order.Items,
Amount: calculateTotal(order.Items),
})
return order, nil
}
// 监听库存扣减失败事件
func (s *OrderService) HandleStockDeductionFailed(event StockDeductionFailedEvent) {
// 取消订单
order, _ := s.orderRepo.FindByID(event.OrderID)
order.Status = OrderStatusCancelled
s.orderRepo.Update(order)
log.Printf("Order cancelled: %s", event.OrderID)
}
// 监听支付完成事件
func (s *OrderService) HandlePaymentCompleted(event PaymentCompletedEvent) {
// 订单完成
order, _ := s.orderRepo.FindByID(event.OrderID)
order.Status = OrderStatusCompleted
s.orderRepo.Update(order)
log.Printf("Order completed: %s", event.OrderID)
}
// Inventory Service
type InventoryService struct {
eventBus EventBus
}
// 监听订单创建事件
func (s *InventoryService) HandleOrderCreated(event OrderCreatedEvent) {
// 扣减库存
for _, item := range event.Items {
if err := s.inventoryRepo.DeductStock(item.ProductID, item.Quantity); err != nil {
// 扣减失败,发布失败事件
s.eventBus.Publish(StockDeductionFailedEvent{
OrderID: event.OrderID,
ProductID: item.ProductID,
Reason: err.Error(),
})
return
}
}
// 扣减成功,发布成功事件
s.eventBus.Publish(StockDeductedEvent{
OrderID: event.OrderID,
Items: event.Items,
})
log.Printf("Stock deducted for order: %s", event.OrderID)
}
// 监听支付失败事件
func (s *InventoryService) HandlePaymentFailed(event PaymentFailedEvent) {
// 补偿:恢复库存
order, _ := s.getOrderItems(event.OrderID)
for _, item := range order.Items {
s.inventoryRepo.RestoreStock(item.ProductID, item.Quantity)
}
log.Printf("Stock restored for order: %s", event.OrderID)
}
// Payment Service
type PaymentService struct {
eventBus EventBus
}
// 监听库存扣减成功事件
func (s *PaymentService) HandleStockDeducted(event StockDeductedEvent) {
// 扣款
if err := s.paymentRepo.Deduct(event.OrderID, event.Amount); err != nil {
// 扣款失败,发布失败事件
s.eventBus.Publish(PaymentFailedEvent{
OrderID: event.OrderID,
Reason: err.Error(),
})
return
}
// 扣款成功,发布成功事件
s.eventBus.Publish(PaymentCompletedEvent{
OrderID: event.OrderID,
Amount: event.Amount,
})
log.Printf("Payment completed for order: %s", event.OrderID)
}
特点:
去中心化,无单点故障
服务自治
复杂度高(事件依赖复杂)
难以追踪(分散在各个服务)
2. Orchestration(编排模式)
定义:中心化,由Orchestrator协调所有服务
流程:
┌──────────────┐
│ Orchestrator │
└──────────────┘
↓ ↓ ↓
┌──────────┼────┼────┼──────────┐
↓ ↓ ↓ ↓ ↓
Order Inventory Payment Notification
Service Service Service Service
Orchestrator控制流程:
1. 调用Order Service创建订单
2. 调用Inventory Service扣减库存
3. 调用Payment Service扣款
4. 调用Notification Service发送通知
如果步骤3失败:
- 调用Inventory Service恢复库存
- 调用Order Service取消订单
代码实现:
// Saga Orchestrator
type OrderSagaOrchestrator struct {
orderService *OrderService
inventoryService *InventoryService
paymentService *PaymentService
}
// 执行订单Saga
func (o *OrderSagaOrchestrator) Execute(req CreateOrderRequest) error {
sagaID := generateID()
// 1. 创建订单
order, err := o.orderService.CreateOrder(sagaID, req)
if err != nil {
log.Printf("[Saga] Step 1 failed: create order")
return err
}
log.Printf("[Saga] Step 1 completed: order created (%s)", order.ID)
// 2. 扣减库存
if err := o.inventoryService.DeductStock(sagaID, req.Items); err != nil {
log.Printf("[Saga] Step 2 failed: deduct stock, compensating...")
// 补偿:取消订单
o.orderService.CancelOrder(sagaID, order.ID)
return err
}
log.Printf("[Saga] Step 2 completed: stock deducted")
// 3. 扣款
if err := o.paymentService.Deduct(sagaID, order.ID, order.Amount); err != nil {
log.Printf("[Saga] Step 3 failed: payment, compensating...")
// 补偿:恢复库存
o.inventoryService.RestoreStock(sagaID, req.Items)
// 补偿:取消订单
o.orderService.CancelOrder(sagaID, order.ID)
return err
}
log.Printf("[Saga] Step 3 completed: payment succeeded")
// 4. 订单完成
o.orderService.CompleteOrder(sagaID, order.ID)
log.Printf("[Saga] Transaction completed (sagaID: %s)", sagaID)
return nil
}
Order Service:
type OrderService struct {
db *sql.DB
}
// 创建订单
func (s *OrderService) CreateOrder(sagaID string, req CreateOrderRequest) (*Order, error) {
order := &Order{
ID: generateID(),
UserID: req.UserID,
Items: req.Items,
Amount: calculateTotal(req.Items),
Status: OrderStatusPending,
SagaID: sagaID,
}
_, err := s.db.Exec(`
INSERT INTO orders (id, user_id, amount, status, saga_id)
VALUES (?, ?, ?, ?, ?)
`, order.ID, order.UserID, order.Amount, order.Status, sagaID)
if err != nil {
return nil, err
}
return order, nil
}
// 取消订单(补偿)
func (s *OrderService) CancelOrder(sagaID, orderID string) error {
_, err := s.db.Exec(`
UPDATE orders SET status = 'CANCELLED'
WHERE id = ? AND saga_id = ?
`, orderID, sagaID)
log.Printf("[OrderService] Order cancelled (compensation): %s", orderID)
return err
}
// 完成订单
func (s *OrderService) CompleteOrder(sagaID, orderID string) error {
_, err := s.db.Exec(`
UPDATE orders SET status = 'COMPLETED'
WHERE id = ? AND saga_id = ?
`, orderID, sagaID)
log.Printf("[OrderService] Order completed: %s", orderID)
return err
}
Inventory Service:
type InventoryService struct {
db *sql.DB
}
// 扣减库存
func (s *InventoryService) DeductStock(sagaID string, items []OrderItem) error {
tx, _ := s.db.Begin()
defer tx.Rollback()
for _, item := range items {
// 检查库存
var stock int
err := tx.QueryRow(`
SELECT stock FROM inventory WHERE product_id = ?
`, item.ProductID).Scan(&stock)
if err != nil {
return err
}
if stock < item.Quantity {
return errors.New("insufficient stock")
}
// 扣减库存
_, err = tx.Exec(`
UPDATE inventory SET stock = stock - ?
WHERE product_id = ?
`, item.Quantity, item.ProductID)
if err != nil {
return err
}
// 记录Saga日志
_, err = tx.Exec(`
INSERT INTO saga_inventory_log (saga_id, product_id, quantity, operation)
VALUES (?, ?, ?, 'DEDUCT')
`, sagaID, item.ProductID, item.Quantity)
}
return tx.Commit()
}
// 恢复库存(补偿)
func (s *InventoryService) RestoreStock(sagaID string, items []OrderItem) error {
tx, _ := s.db.Begin()
defer tx.Rollback()
for _, item := range items {
// 恢复库存
_, err := tx.Exec(`
UPDATE inventory SET stock = stock + ?
WHERE product_id = ?
`, item.Quantity, item.ProductID)
if err != nil {
return err
}
// 记录补偿日志
_, err = tx.Exec(`
INSERT INTO saga_inventory_log (saga_id, product_id, quantity, operation)
VALUES (?, ?, ?, 'RESTORE')
`, sagaID, item.ProductID, item.Quantity)
}
log.Printf("[InventoryService] Stock restored (compensation)")
return tx.Commit()
}
特点:
中心化控制,易于追踪
流程清晰
易于实现复杂补偿逻辑
Orchestrator是单点
服务耦合到Orchestrator
Choreography vs Orchestration
| 维度 | Choreography | Orchestration |
|---|---|---|
| 控制方式 | 去中心化 | 中心化 |
| 协调机制 | 事件驱动 | 协调者控制 |
| 单点故障 | 无 | Orchestrator是单点 |
| 流程追踪 | 困难 | 容易 |
| 服务自治 | 高 | 低 |
| 复杂度 | 事件依赖复杂 | Orchestrator逻辑复杂 |
| 适用场景 | 简单流程 | 复杂流程 |
本地消息表
原理
核心思想:利用本地事务保证消息可靠发送
流程:
1. Order Service:
BEGIN TRANSACTION
- 创建订单
- 插入消息表(OrderCreated事件)
COMMIT
2. 定时任务:
- 扫描消息表中未发送的消息
- 发送到消息队列
- 标记为已发送
3. Inventory Service:
- 消费消息
- 扣减库存
代码实现
Order Service(生产者):
// Order Service
type OrderService struct {
db *sql.DB
eventBus *KafkaProducer
}
// 创建订单
func (s *OrderService) CreateOrder(req CreateOrderRequest) (*Order, error) {
tx, _ := s.db.Begin()
defer tx.Rollback()
// 1. 创建订单
order := &Order{
ID: generateID(),
UserID: req.UserID,
Items: req.Items,
Amount: calculateTotal(req.Items),
Status: OrderStatusPending,
}
_, err := tx.Exec(`
INSERT INTO orders (id, user_id, amount, status)
VALUES (?, ?, ?, ?)
`, order.ID, order.UserID, order.Amount, order.Status)
if err != nil {
return nil, err
}
// 2. 插入消息表(本地事务保证)
event := OrderCreatedEvent{
OrderID: order.ID,
UserID: order.UserID,
Items: order.Items,
Amount: order.Amount,
}
eventData, _ := json.Marshal(event)
_, err = tx.Exec(`
INSERT INTO local_message_table (id, event_type, event_data, status)
VALUES (?, ?, ?, 'PENDING')
`, generateID(), "OrderCreated", eventData)
if err != nil {
return nil, err
}
// 3. 提交事务(订单和消息要么都成功,要么都失败)
if err := tx.Commit(); err != nil {
return nil, err
}
log.Printf("Order created with local message: %s", order.ID)
return order, nil
}
消息发送器(定时任务):
// 消息发送器
type MessageSender struct {
db *sql.DB
eventBus *KafkaProducer
}
// 定时扫描并发送消息
func (s *MessageSender) Start() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
s.sendPendingMessages()
}
}
func (s *MessageSender) sendPendingMessages() {
// 1. 查询未发送的消息
rows, err := s.db.Query(`
SELECT id, event_type, event_data
FROM local_message_table
WHERE status = 'PENDING'
LIMIT 100
`)
if err != nil {
log.Printf("Query failed: %v", err)
return
}
defer rows.Close()
// 2. 发送每条消息
for rows.Next() {
var id, eventType string
var eventData []byte
rows.Scan(&id, &eventType, &eventData)
// 发送到Kafka
if err := s.eventBus.PublishRaw(eventType, eventData); err != nil {
log.Printf("Failed to publish message %s: %v", id, err)
continue
}
// 3. 标记为已发送
_, err := s.db.Exec(`
UPDATE local_message_table
SET status = 'SENT', sent_at = NOW()
WHERE id = ?
`, id)
if err != nil {
log.Printf("Failed to update message status: %v", err)
} else {
log.Printf("Message sent: %s", id)
}
}
}
Inventory Service(消费者):
// Inventory Service
type InventoryService struct {
db *sql.DB
processedEvents map[string]bool // 幂等性:记录已处理的事件
}
// 监听订单创建事件
func (s *InventoryService) HandleOrderCreated(event OrderCreatedEvent) error {
// 幂等性检查
if s.isEventProcessed(event.OrderID) {
log.Printf("Event already processed: %s", event.OrderID)
return nil
}
tx, _ := s.db.Begin()
defer tx.Rollback()
// 1. 扣减库存
for _, item := range event.Items {
_, err := tx.Exec(`
UPDATE inventory SET stock = stock - ?
WHERE product_id = ?
`, item.Quantity, item.ProductID)
if err != nil {
return err
}
}
// 2. 记录已处理(幂等性)
_, err := tx.Exec(`
INSERT INTO processed_events (event_id, event_type, processed_at)
VALUES (?, ?, NOW())
`, event.OrderID, "OrderCreated")
if err != nil {
return err
}
tx.Commit()
s.markEventProcessed(event.OrderID)
log.Printf("Stock deducted for order: %s", event.OrderID)
return nil
}
func (s *InventoryService) isEventProcessed(eventID string) bool {
var count int
s.db.QueryRow(`
SELECT COUNT(*) FROM processed_events WHERE event_id = ?
`, eventID).Scan(&count)
return count > 0
}
本地消息表特点
优点:
实现简单
利用本地事务保证可靠性
不依赖外部分布式事务组件
性能好
缺点:
最终一致性(有延迟)
需要定时任务扫描
需要实现幂等性
适用场景:
高并发场景
可接受最终一致性
事件驱动架构
最佳实践
方案选择
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 金融交易 | TCC | 严格一致性要求 |
| 订单系统 | Saga(Orchestration) | 流程清晰,易于追踪 |
| 事件通知 | 本地消息表 + 消息队列 | 高性能,最终一致性 |
| 简单流程 | Saga(Choreography) | 服务自治,去中心化 |
| 强一致性 | 不用微服务(或用2PC) | 2PC性能差,不推荐 |
设计原则
1. 幂等性
// 每个操作必须幂等
func (s *Service) HandleEvent(event Event) error {
// 1. 幂等性检查
if s.isProcessed(event.ID) {
return nil // 已处理,直接返回
}
// 2. 执行业务逻辑
if err := s.doSomething(); err != nil {
return err
}
// 3. 标记已处理
s.markProcessed(event.ID)
return nil
}
2. 超时和重试
// 超时控制
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 重试
err := retry.Do(
func() error {
return callService(ctx)
},
retry.Attempts(3),
retry.Delay(time.Second),
)
3. 补偿操作必须成功
// 补偿操作必须重试直到成功
func (s *Service) Compensate(ctx context.Context) error {
for {
err := s.doCompensate()
if err == nil {
return nil
}
log.Printf("Compensation failed: %v, retrying...", err)
time.Sleep(5 * time.Second)
// 检查context是否取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}
4. 监控和告警
// 记录每个步骤
func (o *Orchestrator) Execute() error {
metrics.SagaStarted.Inc()
if err := o.step1(); err != nil {
metrics.SagaFailed.WithLabelValues("step1").Inc()
return err
}
if err := o.step2(); err != nil {
metrics.SagaFailed.WithLabelValues("step2").Inc()
o.compensateStep1() // 补偿
return err
}
metrics.SagaCompleted.Inc()
return nil
}
面试问答
什么是分布式事务?有哪些解决方案?
答案:
定义:
分布式事务:跨多个数据库/服务的事务操作
目标:保证多个操作的原子性和一致性
挑战:
单体:一个数据库事务(ACID)
微服务:多个独立数据库,无法用一个事务
示例:
订单服务创建订单 → 库存服务扣库存 → 支付服务扣款
如何保证三个操作的一致性?
解决方案:
| 方案 | 一致性 | 性能 | 复杂度 | 推荐度 |
|---|---|---|---|---|
| 2PC | 强一致 | 差 | 高 | 不推荐 |
| 3PC | 强一致 | 差 | 高 | 不推荐 |
| TCC | 最终一致 | 好 | 高 | 金融场景 |
| Saga | 最终一致 | 好 | 中 | 推荐 |
| 本地消息表 | 最终一致 | 好 | 低 | 推荐 |
实践建议:
1. 优先避免分布式事务(服务拆分时注意边界)
2. 可接受最终一致性 → Saga或本地消息表
3. 严格一致性 → TCC(代价:复杂度高)
4. 不要用2PC/3PC(性能差、阻塞)
Saga的Choreography和Orchestration如何选择?
答案:
Choreography(编舞):
定义:去中心化,服务通过事件协作
优点:
去中心化,无单点故障
服务自治
松耦合
缺点:
事件依赖复杂
难以追踪整个流程
测试困难
适用场景:
简单流程(2-3个服务)
服务高度自治
事件驱动架构
Orchestration(编排):
定义:中心化,Orchestrator协调所有服务
优点:
流程清晰,易于追踪
易于实现复杂补偿逻辑
集中管理
缺点:
Orchestrator是单点
服务耦合到Orchestrator
适用场景:
复杂流程(3+服务)
需要集中管理和监控
频繁变化的业务流程
选择建议:
简单流程(如订单通知):
→ Choreography
复杂流程(如订单-库存-支付-物流):
→ Orchestration
混合使用:
核心流程用Orchestration
非关键流程用Choreography
如何保证分布式事务的幂等性?
答案:
问题:
网络不稳定,消息可能重复
Saga补偿可能重试
如何保证操作只执行一次?
方案1:唯一ID + 数据库唯一约束
// 使用唯一事件ID
type Event struct {
EventID string // 全局唯一
OrderID string
}
// 数据库唯一约束
CREATE TABLE processed_events (
event_id VARCHAR(64) PRIMARY KEY,
processed_at TIMESTAMP
);
// 处理事件
func HandleEvent(event Event) error {
// 插入processed_events(唯一约束)
_, err := db.Exec(`
INSERT INTO processed_events (event_id, processed_at)
VALUES (?, NOW())
`, event.EventID)
if isDuplicateKeyError(err) {
// 已处理过,直接返回
return nil
}
// 执行业务逻辑
doBusinessLogic(event)
return nil
}
方案2:版本号
// 乐观锁
func UpdateStock(productID string, quantity int, version int) error {
result, err := db.Exec(`
UPDATE inventory
SET stock = stock - ?, version = version + 1
WHERE product_id = ? AND version = ?
`, quantity, productID, version)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("version conflict")
}
return nil
}
方案3:状态机
// 订单状态机
const (
OrderStatusPending = "PENDING"
OrderStatusProcessing = "PROCESSING"
OrderStatusCompleted = "COMPLETED"
)
func ProcessOrder(orderID string) error {
// 只有PENDING状态才能转为PROCESSING
result, err := db.Exec(`
UPDATE orders
SET status = 'PROCESSING'
WHERE id = ? AND status = 'PENDING'
`, orderID)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
// 已经处理过或状态不对
return nil
}
// 执行业务逻辑
doBusinessLogic(orderID)
// 更新为COMPLETED
db.Exec(`
UPDATE orders SET status = 'COMPLETED' WHERE id = ?
`, orderID)
return nil
}
本地消息表的可靠性如何保证?
答案:
核心机制:
1. 本地事务保证
// 订单和消息在同一个事务
tx.Begin()
tx.Exec("INSERT INTO orders ...")
tx.Exec("INSERT INTO local_message_table ...")
tx.Commit()
// 要么都成功,要么都失败
2. 定时扫描发送
func sendPendingMessages() {
// 每秒扫描一次
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
messages := queryPendingMessages()
for _, msg := range messages {
// 发送到Kafka
kafka.Publish(msg)
// 标记为已发送
updateMessageStatus(msg.ID, "SENT")
}
}
}
3. 消费幂等
func HandleMessage(event Event) error {
// 检查是否已处理
if isProcessed(event.ID) {
return nil
}
// 处理业务逻辑
doBusinessLogic(event)
// 标记已处理
markProcessed(event.ID)
return nil
}
4. 失败重试
// Kafka消费失败,自动重试
consumer.Consume(func(msg Message) error {
if err := handle(msg); err != nil {
// 返回错误,消息不确认,会自动重试
return err
}
// 确认消息
msg.Ack()
return nil
})
可靠性保证:
1. 本地事务 → 消息一定会写入消息表
2. 定时扫描 → 消息一定会发送(最终)
3. Kafka持久化 → 消息不会丢失
4. 消费幂等 → 重复消费不影响结果
最终一致性:有延迟(秒级),但一定会达成一致
微服务数据一致性的最佳实践是什么?
答案:
1. 优先避免分布式事务
原则:服务拆分时注意边界
反例:
Order Service管订单
Inventory Service管库存
→ 创建订单必须扣库存(分布式事务)
正例:
Order Service管订单+库存
→ 一个服务,本地事务
结论:合理拆分服务,避免频繁跨服务事务
2. 接受最终一致性
大多数场景不需要强一致性
示例:
订单创建后,发送确认邮件
→ 不需要强一致,最终发送即可
3. 使用Saga或本地消息表
必须跨服务事务时:
简单流程:
→ 本地消息表 + 消息队列
复杂流程:
→ Saga(Orchestration)
金融场景:
→ TCC
4. 保证幂等性
// 所有操作必须幂等
func HandleEvent(event Event) error {
if isProcessed(event.ID) {
return nil // 已处理
}
doBusinessLogic(event)
markProcessed(event.ID)
return nil
}
5. 监控和告警
监控指标:
- Saga成功率
- 补偿次数
- 消息积压
- 处理延迟
告警:
- Saga失败超过阈值
- 消息堆积
- 补偿失败