第14章:分布式事务
面试频率: \ │ │ / | \ │ │ / | \ │ │ / | \ │ │ / | \ │ │ / | \ │ │ / CP | CA \ │ │ / | \ │ │ /|\ │ │ A P P │ │ │ │ AP: 最终一致性(常用) │ │ CP: 强一致性 │ │ CA: 不存在(网络一定会分区) │ └─────────────────────────────────┘
实际选择:
- 金融系统:CP(强一致性)
- 互联网系统:AP(最终一致性)
### BASE理论
BASE = Basically Available, Soft state, Eventually consistent
BA (Basically Available)
- 基本可用:系统允许损失部分可用性
- 例:响应时间增加、降级服务
S (Soft state)
- 软状态:允许中间状态
- 例:订单"处理中"状态
E (Eventually consistent)
- 最终一致性:不保证实时一致,但最终会一致
- 例:余额更新延迟几秒
对比: ACID(强一致性) vs BASE(最终一致性) 单机数据库 vs 分布式系统
---
## 2PC两阶段提交
### 2PC原理
两阶段提交(Two-Phase Commit)
角色:
- 协调者(Coordinator):事务管理器
- 参与者(Participant):各个资源管理器
流程: ┌────────────┐ │ Coordinator│ └─────┬──────┘ │ │ Phase 1: Prepare(准备阶段) ├──────> 参与者A: "能提交吗?" ├──────> 参与者B: "能提交吗?" ├──────> 参与者C: "能提交吗?" │ ↓ 收到所有YES │ │ Phase 2: Commit(提交阶段) ├──────> 参与者A: "提交!" ├──────> 参与者B: "提交!" └──────> 参与者C: "提交!"
详细流程:
Prepare阶段
- Coordinator发送prepare请求
- Participant执行事务操作,但不提交
- Participant锁定资源
- Participant返回YES/NO
Commit阶段
- 如果所有Participant都返回YES → Coordinator发送commit → Participant提交事务
- 如果任何Participant返回NO → Coordinator发送abort → Participant回滚事务
### 2PC实现
```go
package transaction
import (
"context"
"errors"
"sync"
"time"
)
// TwoPhaseCommit 两阶段提交协调器
type TwoPhaseCommit struct {
participants []Participant
timeout time.Duration
}
// Participant 参与者接口
type Participant interface {
Prepare(ctx context.Context, txID string) error
Commit(ctx context.Context, txID string) error
Abort(ctx context.Context, txID string) error
}
// Execute 执行2PC事务
func (tpc *TwoPhaseCommit) Execute(ctx context.Context, txID string) error {
// Phase 1: Prepare
if err := tpc.preparePhase(ctx, txID); err != nil {
// Prepare失败,执行回滚
tpc.abortPhase(ctx, txID)
return err
}
// Phase 2: Commit
if err := tpc.commitPhase(ctx, txID); err != nil {
// Commit失败,尝试回滚(可能部分成功)
tpc.abortPhase(ctx, txID)
return err
}
return nil
}
// preparePhase Prepare阶段
func (tpc *TwoPhaseCommit) preparePhase(ctx context.Context, txID string) error {
ctx, cancel := context.WithTimeout(ctx, tpc.timeout)
defer cancel()
errChan := make(chan error, len(tpc.participants))
var wg sync.WaitGroup
// 并发发送Prepare请求
for _, p := range tpc.participants {
wg.Add(1)
go func(participant Participant) {
defer wg.Done()
err := participant.Prepare(ctx, txID)
if err != nil {
errChan <- err
}
}(p)
}
wg.Wait()
close(errChan)
// 检查是否所有Participant都Prepare成功
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
// commitPhase Commit阶段
func (tpc *TwoPhaseCommit) commitPhase(ctx context.Context, txID string) error {
ctx, cancel := context.WithTimeout(ctx, tpc.timeout)
defer cancel()
errChan := make(chan error, len(tpc.participants))
var wg sync.WaitGroup
for _, p := range tpc.participants {
wg.Add(1)
go func(participant Participant) {
defer wg.Done()
err := participant.Commit(ctx, txID)
if err != nil {
errChan <- err
}
}(p)
}
wg.Wait()
close(errChan)
// 收集错误
var errs []error
for err := range errChan {
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.New("commit phase failed")
}
return nil
}
// abortPhase 回滚阶段
func (tpc *TwoPhaseCommit) abortPhase(ctx context.Context, txID string) {
ctx, cancel := context.WithTimeout(ctx, tpc.timeout)
defer cancel()
var wg sync.WaitGroup
for _, p := range tpc.participants {
wg.Add(1)
go func(participant Participant) {
defer wg.Done()
participant.Abort(ctx, txID)
}(p)
}
wg.Wait()
}
// 具体参与者实现示例:订单服务
type OrderService struct {
db *sql.DB
}
func (os *OrderService) Prepare(ctx context.Context, txID string) error {
// 1. 开启事务
tx, err := os.db.BeginTx(ctx, nil)
if err != nil {
return err
}
// 2. 执行业务逻辑(但不提交)
_, err = tx.ExecContext(ctx,
"INSERT INTO orders (order_id, status) VALUES (?, 'pending')",
txID)
if err != nil {
tx.Rollback()
return err
}
// 3. 保存事务句柄,等待Commit/Abort
saveTx(txID, tx)
return nil
}
func (os *OrderService) Commit(ctx context.Context, txID string) error {
tx := getTx(txID)
if tx == nil {
return errors.New("transaction not found")
}
return tx.Commit()
}
func (os *OrderService) Abort(ctx context.Context, txID string) error {
tx := getTx(txID)
if tx == nil {
return nil
}
return tx.Rollback()
}
2PC的问题
问题1:同步阻塞
┌────────────┐
│Participant │
│ 等待Commit │ → 资源被锁定,无法处理其他请求
└────────────┘
问题2:单点故障
Coordinator宕机 → 所有Participant无限等待
问题3:数据不一致
Phase 2网络分区:
- 部分Participant收到Commit
- 部分Participant未收到
→ 数据不一致
问题4:太过保守
任何一个Participant失败 → 整个事务回滚
3PC三阶段提交
3PC原理
三阶段提交(Three-Phase Commit)
改进:增加超时机制 + Pre-Commit阶段
Phase 1: CanCommit(询问)
Coordinator: "能提交吗?"
Participant: "YES/NO"
Phase 2: PreCommit(预提交)
如果所有YES:
Coordinator: "预提交!"
Participant: 执行事务,锁定资源,但不提交
如果有NO:
Coordinator: "Abort!"
Phase 3: DoCommit(提交)
Coordinator: "正式提交!"
Participant: 提交事务
超时机制:
- Participant在PreCommit后超时 → 自动提交
- 降低阻塞时间
3PC vs 2PC
| 特性 | 2PC | 3PC |
|---|---|---|
| 阶段数 | 2个 | 3个 |
| 超时 | 无 | 有 |
| 阻塞 | 同步阻塞 | 非阻塞 |
| 一致性 | 强一致性 | 可能不一致 |
| 性能 | 较差 | 稍好 |
| 复杂度 | 低 | 高 |
选择建议:
- 实际生产很少用2PC/3PC
- 原因:性能差、阻塞、单点故障
- 更多使用:TCC、Saga、最终一致性
TCC补偿模式
TCC原理
TCC = Try-Confirm-Cancel
Try阶段:尝试执行,预留资源
Confirm阶段:确认执行,使用预留资源
Cancel阶段:取消执行,释放预留资源
示例:转账
┌─────────────────────────────────────────────────────┐
│ 转账 A → B │
├─────────────────────────────────────────────────────┤
│ │
│ Try阶段: │
│ ┌────────────┐ ┌────────────┐ │
│ │ 账户A │ │ 账户B │ │
│ │ 余额:1000 │ │ 余额:0 │ │
│ │ 冻结:100 │ → 冻结100 │ 预留:100 │ │
│ └────────────┘ └────────────┘ │
│ │
│ Confirm阶段: │
│ ┌────────────┐ ┌────────────┐ │
│ │ 账户A │ │ 账户B │ │
│ │ 余额:900 │ → 扣除冻结 │ 余额:100 │ │
│ │ 冻结:0 │ │ 预留:0 │ │
│ └────────────┘ └────────────┘ │
│ │
│ Cancel阶段(失败时): │
│ ┌────────────┐ ┌────────────┐ │
│ │ 账户A │ │ 账户B │ │
│ │ 余额:1000 │ → 解冻 │ 余额:0 │ │
│ │ 冻结:0 │ │ 预留:0 │ │
│ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────┘
TCC完整实现
package tcc
import (
"context"
"errors"
"sync"
)
// TCCTransaction TCC事务
type TCCTransaction struct {
id string
participants []*TCCParticipant
}
// TCCParticipant TCC参与者
type TCCParticipant struct {
serviceName string
tryFunc func(ctx context.Context) error
confirmFunc func(ctx context.Context) error
cancelFunc func(ctx context.Context) error
}
// TCCCoordinator TCC协调器
type TCCCoordinator struct {
transactions sync.Map
}
func NewTCCCoordinator() *TCCCoordinator {
return &TCCCoordinator{}
}
// Begin 开启TCC事务
func (tc *TCCCoordinator) Begin(txID string) *TCCTransaction {
tx := &TCCTransaction{
id: txID,
participants: make([]*TCCParticipant, 0),
}
tc.transactions.Store(txID, tx)
return tx
}
// AddParticipant 添加参与者
func (tx *TCCTransaction) AddParticipant(p *TCCParticipant) {
tx.participants = append(tx.participants, p)
}
// Execute 执行TCC事务
func (tc *TCCCoordinator) Execute(ctx context.Context, txID string) error {
txInterface, ok := tc.transactions.Load(txID)
if !ok {
return errors.New("transaction not found")
}
tx := txInterface.(*TCCTransaction)
// Phase 1: Try
if err := tc.tryPhase(ctx, tx); err != nil {
// Try失败,执行Cancel
tc.cancelPhase(ctx, tx)
return err
}
// Phase 2: Confirm
if err := tc.confirmPhase(ctx, tx); err != nil {
// Confirm失败,执行Cancel
tc.cancelPhase(ctx, tx)
return err
}
return nil
}
// tryPhase Try阶段
func (tc *TCCCoordinator) tryPhase(ctx context.Context, tx *TCCTransaction) error {
errChan := make(chan error, len(tx.participants))
var wg sync.WaitGroup
for _, p := range tx.participants {
wg.Add(1)
go func(participant *TCCParticipant) {
defer wg.Done()
if err := participant.tryFunc(ctx); err != nil {
errChan <- err
}
}(p)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
// confirmPhase Confirm阶段
func (tc *TCCCoordinator) confirmPhase(ctx context.Context, tx *TCCTransaction) error {
errChan := make(chan error, len(tx.participants))
var wg sync.WaitGroup
for _, p := range tx.participants {
wg.Add(1)
go func(participant *TCCParticipant) {
defer wg.Done()
if err := participant.confirmFunc(ctx); err != nil {
errChan <- err
}
}(p)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
// cancelPhase Cancel阶段
func (tc *TCCCoordinator) cancelPhase(ctx context.Context, tx *TCCTransaction) {
var wg sync.WaitGroup
for _, p := range tx.participants {
wg.Add(1)
go func(participant *TCCParticipant) {
defer wg.Done()
participant.cancelFunc(ctx)
}(p)
}
wg.Wait()
}
// 使用示例:下单扣库存
type OrderTCCService struct {
db *sql.DB
}
// Try: 创建订单(pending状态)
func (s *OrderTCCService) TryCreateOrder(ctx context.Context, order *Order) error {
_, err := s.db.ExecContext(ctx,
"INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, 'try')",
order.OrderID, order.UserID, order.Amount)
return err
}
// Confirm: 确认订单
func (s *OrderTCCService) ConfirmOrder(ctx context.Context, orderID string) error {
_, err := s.db.ExecContext(ctx,
"UPDATE orders SET status = 'confirmed' WHERE order_id = ? AND status = 'try'",
orderID)
return err
}
// Cancel: 取消订单
func (s *OrderTCCService) CancelOrder(ctx context.Context, orderID string) error {
_, err := s.db.ExecContext(ctx,
"DELETE FROM orders WHERE order_id = ? AND status = 'try'",
orderID)
return err
}
// 库存TCC服务
type InventoryTCCService struct {
db *sql.DB
}
// Try: 冻结库存
func (s *InventoryTCCService) TryFreezeInventory(ctx context.Context, productID string, quantity int) error {
// 检查库存
var available int
err := s.db.QueryRowContext(ctx,
"SELECT available FROM inventory WHERE product_id = ?", productID).Scan(&available)
if err != nil {
return err
}
if available < quantity {
return errors.New("insufficient inventory")
}
// 冻结库存
_, err = s.db.ExecContext(ctx,
"UPDATE inventory SET available = available - ?, frozen = frozen + ? WHERE product_id = ?",
quantity, quantity, productID)
return err
}
// Confirm: 扣减库存
func (s *InventoryTCCService) ConfirmInventory(ctx context.Context, productID string, quantity int) error {
_, err := s.db.ExecContext(ctx,
"UPDATE inventory SET frozen = frozen - ? WHERE product_id = ?",
quantity, productID)
return err
}
// Cancel: 解冻库存
func (s *InventoryTCCService) CancelInventory(ctx context.Context, productID string, quantity int) error {
_, err := s.db.ExecContext(ctx,
"UPDATE inventory SET available = available + ?, frozen = frozen - ? WHERE product_id = ?",
quantity, quantity, productID)
return err
}
// 完整流程
func PlaceOrderWithTCC(order *Order) error {
ctx := context.Background()
txID := generateTxID()
coordinator := NewTCCCoordinator()
tx := coordinator.Begin(txID)
orderService := &OrderTCCService{db: db}
inventoryService := &InventoryTCCService{db: db}
// 添加订单参与者
tx.AddParticipant(&TCCParticipant{
serviceName: "order",
tryFunc: func(ctx context.Context) error {
return orderService.TryCreateOrder(ctx, order)
},
confirmFunc: func(ctx context.Context) error {
return orderService.ConfirmOrder(ctx, order.OrderID)
},
cancelFunc: func(ctx context.Context) error {
return orderService.CancelOrder(ctx, order.OrderID)
},
})
// 添加库存参与者
tx.AddParticipant(&TCCParticipant{
serviceName: "inventory",
tryFunc: func(ctx context.Context) error {
return inventoryService.TryFreezeInventory(ctx, order.ProductID, order.Quantity)
},
confirmFunc: func(ctx context.Context) error {
return inventoryService.ConfirmInventory(ctx, order.ProductID, order.Quantity)
},
cancelFunc: func(ctx context.Context) error {
return inventoryService.CancelInventory(ctx, order.ProductID, order.Quantity)
},
})
// 执行TCC事务
return coordinator.Execute(ctx, txID)
}
TCC注意事项
1. 幂等性
- Try/Confirm/Cancel都可能重复调用
- 必须保证幂等
2. 空回滚
- Try超时,直接收到Cancel
- Cancel需要判断Try是否执行
3. 悬挂
- Try超时后Cancel执行
- Try后续才到达
- 需要记录Cancel状态,拒绝Try
4. 资源预留
- Try阶段必须预留资源
- 不能等到Confirm才检查
数据库设计:
CREATE TABLE tcc_transaction (
tx_id VARCHAR(64) PRIMARY KEY,
status ENUM('try', 'confirm', 'cancel'),
create_time DATETIME,
update_time DATETIME
);
-- 防止空回滚和悬挂
Saga模式
Saga原理
Saga模式 = 长事务拆分 + 补偿
特点:
- 每个子事务独立提交
- 失败时执行补偿事务
示例:订单流程
┌─────────────────────────────────────────────────┐
│ 正向流程 │
├─────────────────────────────────────────────────┤
│ T1: 创建订单 → T2: 扣库存 → T3: 扣款 → T4: 发货│
│ ↓ 提交 ↓ 提交 ↓ 提交 ↓ 提交 │
└─────────────────────────────────────────────────┘
如果T3失败:
┌─────────────────────────────────────────────────┐
│ 补偿流程 │
├─────────────────────────────────────────────────┤
│ T1: 创建订单 → T2: 扣库存 → T3: 扣款(失败) │
│ ↓ 提交 ↓ 提交 ↓ 失败 │
│ ↓ │
│ C2: 恢复库存 ← C1: 取消订单 │
│ ↓ 补偿 ↓ 补偿 │
└─────────────────────────────────────────────────┘
Saga实现(协调)
package saga
import (
"context"
"errors"
)
// SagaStep Saga步骤
type SagaStep struct {
name string
action func(ctx context.Context) error // 正向操作
compensation func(ctx context.Context) error // 补偿操作
}
// Saga事务
type Saga struct {
steps []*SagaStep
executedSteps []*SagaStep
}
func NewSaga() *Saga {
return &Saga{
steps: make([]*SagaStep, 0),
executedSteps: make([]*SagaStep, 0),
}
}
// AddStep 添加步骤
func (s *Saga) AddStep(name string, action, compensation func(ctx context.Context) error) {
s.steps = append(s.steps, &SagaStep{
name: name,
action: action,
compensation: compensation,
})
}
// Execute 执行Saga
func (s *Saga) Execute(ctx context.Context) error {
// 正向执行所有步骤
for _, step := range s.steps {
if err := step.action(ctx); err != nil {
// 失败,执行补偿
s.compensate(ctx)
return err
}
// 记录已执行步骤
s.executedSteps = append(s.executedSteps, step)
}
return nil
}
// compensate 补偿
func (s *Saga) compensate(ctx context.Context) {
// 逆序执行补偿
for i := len(s.executedSteps) - 1; i >= 0; i-- {
step := s.executedSteps[i]
if err := step.compensation(ctx); err != nil {
// 补偿失败,记录日志,人工介入
log.Printf("Compensation failed for step %s: %v", step.name, err)
}
}
}
// 使用示例
func PlaceOrderWithSaga(order *Order) error {
ctx := context.Background()
saga := NewSaga()
// Step 1: 创建订单
saga.AddStep("create_order",
func(ctx context.Context) error {
return createOrder(ctx, order)
},
func(ctx context.Context) error {
return cancelOrder(ctx, order.OrderID)
})
// Step 2: 扣减库存
saga.AddStep("deduct_inventory",
func(ctx context.Context) error {
return deductInventory(ctx, order.ProductID, order.Quantity)
},
func(ctx context.Context) error {
return restoreInventory(ctx, order.ProductID, order.Quantity)
})
// Step 3: 扣款
saga.AddStep("deduct_balance",
func(ctx context.Context) error {
return deductBalance(ctx, order.UserID, order.Amount)
},
func(ctx context.Context) error {
return refund(ctx, order.UserID, order.Amount)
})
// Step 4: 通知发货
saga.AddStep("notify_shipping",
func(ctx context.Context) error {
return notifyShipping(ctx, order.OrderID)
},
func(ctx context.Context) error {
return cancelShipping(ctx, order.OrderID)
})
return saga.Execute(ctx)
}
Saga vs TCC
| 特性 | Saga | TCC |
|---|---|---|
| 一致性 | 最终一致性 | 强一致性 |
| 实现复杂度 | 低 | 高 |
| 性能 | 高(无锁) | 中(有锁) |
| 补偿 | 事后补偿 | 预留资源 |
| 适用场景 | 长事务 | 短事务 |
| 隔离性 | 弱 | 强 |
选择建议:
- 短事务、强一致性 → TCC
- 长事务、最终一致性 → Saga
本地消息表
原理
本地消息表 = 本地事务 + 消息队列
流程:
┌─────────────────────────────────────────────┐
│ 服务A │
│ ┌─────────────────────────────┐ │
│ │ 开启事务 │ │
│ │ 1. 执行业务(创建订单) │ │
│ │ 2. 插入消息表(待发送) │ │
│ │ 提交事务 │ │
│ └─────────────────────────────┘ │
│ ↓ │
│ 定时任务扫描消息表 │
│ ↓ │
│ 发送消息到MQ │
│ ↓ │
│ 标记消息为已发送 │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ 消息队列(Kafka) │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ 服务B │
│ 消费消息 → 扣减库存 │
└─────────────────────────────────────────────┘
本地消息表实现
// 消息表
type LocalMessage struct {
ID int64
MessageID string
Topic string
Payload string
Status string // pending/sent/failed
RetryCount int
NextRetry time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
// 创建订单 + 插入消息表(本地事务)
func CreateOrderWithMessage(order *Order) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 1. 创建订单
_, err = tx.Exec(
"INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, 'created')",
order.OrderID, order.UserID, order.Amount)
if err != nil {
return err
}
// 2. 插入消息表
message := &LocalMessage{
MessageID: generateMessageID(),
Topic: "order-created",
Payload: marshalJSON(order),
Status: "pending",
CreatedAt: time.Now(),
}
_, err = tx.Exec(
"INSERT INTO local_messages (message_id, topic, payload, status, created_at) VALUES (?, ?, ?, ?, ?)",
message.MessageID, message.Topic, message.Payload, message.Status, message.CreatedAt)
if err != nil {
return err
}
// 3. 提交事务
return tx.Commit()
}
// 定时任务:扫描并发送消息
func MessageSender() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
// 查询待发送消息
rows, _ := db.Query(
"SELECT id, message_id, topic, payload FROM local_messages WHERE status = 'pending' AND next_retry < NOW() LIMIT 100")
for rows.Next() {
var msg LocalMessage
rows.Scan(&msg.ID, &msg.MessageID, &msg.Topic, &msg.Payload)
// 发送到Kafka
err := sendToKafka(msg.Topic, msg.MessageID, []byte(msg.Payload))
if err != nil {
// 发送失败,更新重试时间
updateMessageStatus(msg.ID, "pending", msg.RetryCount+1)
continue
}
// 发送成功,标记为已发送
updateMessageStatus(msg.ID, "sent", msg.RetryCount)
}
rows.Close()
}
}
func sendToKafka(topic, key string, value []byte) error {
return producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte(key),
Value: value,
}, nil)
}
func updateMessageStatus(id int64, status string, retryCount int) {
nextRetry := time.Now().Add(time.Duration(math.Pow(2, float64(retryCount))) * time.Second)
db.Exec(
"UPDATE local_messages SET status = ?, retry_count = ?, next_retry = ?, updated_at = NOW() WHERE id = ?",
status, retryCount, nextRetry, id)
}
可靠消息最终一致性
RocketMQ事务消息
RocketMQ事务消息流程:
1. 发送Half消息(半消息)
2. 执行本地事务
3. Commit/Rollback Half消息
4. 消费者消费消息
┌─────────────┐
│ Producer │
└──────┬──────┘
│
│ 1. sendHalfMessage
├────────────────────────────> ┌─────────────┐
│ │ RocketMQ │
│ 2. 执行本地事务 │ (Half Msg) │
│ 成功 └─────────────┘
│
│ 3. commitMessage
├────────────────────────────>
│ ┌─────────────┐
│ │ RocketMQ │
│ │ (Normal) │
│ └──────┬──────┘
│ │
│ ↓
│ ┌─────────────┐
│ │ Consumer │
│ └─────────────┘
如果Producer宕机:
RocketMQ回查事务状态 → Producer检查本地事务 → Commit/Rollback
事务消息实现
// RocketMQ事务消息Producer
type TransactionProducer struct {
producer *rocketmq.TransactionProducer
}
// 事务监听器
type OrderTransactionListener struct{}
// ExecuteLocalTransaction 执行本地事务
func (l *OrderTransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
// 解析消息
var order Order
json.Unmarshal(msg.Body, &order)
// 执行本地事务
err := createOrder(&order)
if err != nil {
return primitive.RollbackMessageState
}
return primitive.CommitMessageState
}
// CheckLocalTransaction 回查本地事务
func (l *OrderTransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
// 解析消息
var order Order
json.Unmarshal(msg.Body, &order)
// 检查订单是否存在
exists, _ := checkOrderExists(order.OrderID)
if exists {
return primitive.CommitMessageState
}
return primitive.RollbackMessageState
}
// 发送事务消息
func SendTransactionMessage(order *Order) error {
// 创建事务Producer
p, err := rocketmq.NewTransactionProducer(
&OrderTransactionListener{},
producer.WithNameServer([]string{"localhost:9876"}),
)
if err != nil {
return err
}
p.Start()
defer p.Shutdown()
// 构造消息
msg := primitive.NewMessage("order-created", marshalJSON(order))
// 发送事务消息
res, err := p.SendMessageInTransaction(context.Background(), msg)
if err != nil {
return err
}
log.Printf("Send transaction message: %s", res.MsgID)
return nil
}
最大努力通知
原理
最大努力通知 = 尽力而为 + 定时重试
适用场景:
- 对一致性要求不高
- 允许短期不一致
- 例:支付通知、积分通知
流程:
1. 服务A执行操作
2. 同步调用服务B(失败不回滚)
3. 如果失败,异步重试(指数退避)
4. 重试N次后放弃
重试策略:
第1次:立即
第2次:1分钟后
第3次:5分钟后
第4次:30分钟后
第5次:2小时后
...
放弃:24小时后
实现
type NotificationService struct {
db *sql.DB
maxRetry int
}
// 发送通知(最大努力)
func (ns *NotificationService) SendNotification(event *Event) error {
// 1. 保存通知记录
notification := &Notification{
ID: generateID(),
EventType: event.Type,
Payload: marshalJSON(event),
Status: "pending",
RetryCount: 0,
CreatedAt: time.Now(),
}
ns.db.Exec(
"INSERT INTO notifications (id, event_type, payload, status, retry_count, created_at) VALUES (?, ?, ?, ?, ?, ?)",
notification.ID, notification.EventType, notification.Payload, notification.Status, notification.RetryCount, notification.CreatedAt)
// 2. 尝试发送
return ns.trySend(notification)
}
func (ns *NotificationService) trySend(notification *Notification) error {
// 发送HTTP请求
err := sendHTTPNotification(notification.Payload)
if err == nil {
// 成功
ns.db.Exec("UPDATE notifications SET status = 'success' WHERE id = ?", notification.ID)
return nil
}
// 失败,异步重试
go ns.scheduleRetry(notification)
return err
}
func (ns *NotificationService) scheduleRetry(notification *Notification) {
if notification.RetryCount >= ns.maxRetry {
// 超过最大重试次数,放弃
ns.db.Exec("UPDATE notifications SET status = 'failed' WHERE id = ?", notification.ID)
return
}
// 指数退避
delay := time.Duration(math.Pow(2, float64(notification.RetryCount))) * time.Minute
time.Sleep(delay)
// 更新重试次数
notification.RetryCount++
ns.db.Exec("UPDATE notifications SET retry_count = ? WHERE id = ?", notification.RetryCount, notification.ID)
// 重试
ns.trySend(notification)
}
监控告警
分布式事务监控
# Prometheus指标
distributed_transaction_metrics:
# TCC事务
- tcc_transaction_total
- tcc_transaction_duration_seconds
- tcc_transaction_success_total
- tcc_transaction_failure_total
- tcc_compensation_total
# Saga事务
- saga_transaction_total
- saga_step_duration_seconds
- saga_compensation_triggered_total
# 本地消息表
- local_message_pending_total
- local_message_send_duration_seconds
- local_message_retry_count
告警规则
groups:
- name: transaction_alerts
rules:
# 补偿率过高
- alert: HighCompensationRate
expr: |
rate(tcc_compensation_total[5m]) /
rate(tcc_transaction_total[5m]) > 0.1
for: 10m
labels:
severity: warning
annotations:
summary: "TCC补偿率超过10%"
# 消息积压
- alert: MessageBacklog
expr: local_message_pending_total > 1000
for: 5m
labels:
severity: critical
annotations:
summary: "本地消息表积压超过1000条"
面试问答
什么是分布式事务?为什么需要?
单机事务:
BEGIN;
UPDATE account SET balance = balance - 100 WHERE id = 1;
UPDATE account SET balance = balance + 100 WHERE id = 2;
COMMIT;
数据库保证ACID
分布式事务:
服务A: 创建订单
服务B: 扣减库存
服务C: 扣款
如何保证三个操作要么都成功,要么都失败?
需要分布式事务的原因:
- 微服务拆分
- 数据库分库分表
- 跨系统调用
CAP定理是什么?
CAP = Consistency + Availability + Partition tolerance
C (一致性): 所有节点看到相同数据
A (可用性): 系统一直可用
P (分区容错性): 网络分区时仍能工作
不可能同时满足三个,只能选两个:
- CP: 牺牲可用性,保证一致性(金融系统)
- AP: 牺牲一致性,保证可用性(互联网系统)
- CA: 不存在(网络一定会分区)
实际选择:
- 银行转账 → CP(强一致性)
- 微博点赞 → AP(最终一致性)
2PC有什么问题?
问题1:同步阻塞
Prepare阶段,Participant锁定资源
等待Coordinator的Commit/Abort
→ 资源长时间锁定,吞吐量低
问题2:单点故障
Coordinator宕机 → Participant无限等待
问题3:数据不一致
Phase 2: Commit阶段网络分区
部分Participant收到Commit → 提交
部分Participant未收到 → 无法提交
→ 数据不一致
问题4:太过保守
任何一个Participant失败 → 整个事务回滚
TCC和2PC有什么区别?
| 特性 | 2PC | TCC |
|---|---|---|
| 阶段 | Prepare + Commit | Try + Confirm + Cancel |
| 资源锁定 | 数据库锁 | 业务逻辑锁定 |
| 性能 | 差(同步阻塞) | 好(业务控制) |
| 实现复杂度 | 低 | 高 |
| 隔离性 | 数据库保证 | 业务保证 |
TCC优势:
- 业务层面控制
- 不依赖数据库锁
- 性能更好
TCC劣势:
- 实现复杂
- 需要编写Try/Confirm/Cancel三个方法
Saga和TCC如何选择?
TCC:
- 适合短事务
- 需要强一致性
- 资源预留
示例:转账(需要立即锁定资金)
Saga:
- 适合长事务
- 最终一致性
- 事后补偿
示例:订单流程(创建订单→扣库存→支付→发货)
选择建议:
- 核心业务、强一致性 → TCC
- 长流程、最终一致性 → Saga
本地消息表如何保证消息一定发送?
1. 本地事务保证消息一定写入
BEGIN;
- 执行业务
- 插入消息表
COMMIT;
2. 定时任务扫描未发送消息
SELECT * FROM messages WHERE status = 'pending'
3. 发送到MQ后标记为已发送
UPDATE messages SET status = 'sent'
4. 失败重试(指数退避)
第1次:立即
第2次:1秒后
第3次:2秒后
第4次:4秒后
...
优点:
- 消息一定会发送
- 不丢失
缺点:
- 需要定时任务
- 有延迟
如何保证分布式事务的幂等性?
方案1:唯一键约束
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY
);
-- 重复INSERT会失败
方案2:状态机
-- 只允许pending → confirmed
UPDATE orders
SET status = 'confirmed'
WHERE order_id = ? AND status = 'pending'
方案3:版本号
UPDATE orders
SET amount = ?, version = version + 1
WHERE order_id = ? AND version = ?
方案4:分布式锁
lock := redis.SetNX("order:"+orderID, 1, 10*time.Second)
if !lock {
return errors.New("duplicate request")
}
defer redis.Del("order:"+orderID)
分布式事务如何做补偿?
补偿原则:
正向操作 → 补偿操作
创建订单 → 取消订单
扣减库存 → 恢复库存
扣款 → 退款
发送消息 → 无需补偿(幂等)
补偿实现:
// Saga补偿
saga := NewSaga()
saga.AddStep("create_order",
action: createOrder,
compensation: cancelOrder) // 补偿
saga.AddStep("deduct_inventory",
action: deductInventory,
compensation: restoreInventory) // 补偿
if err := saga.Execute(); err != nil {
// 自动执行补偿
}
注意事项:
- 补偿操作必须幂等
- 补偿可能失败,需要重试
- 记录补偿日志
如何设计一个下单流程的分布式事务?
场景:下单 → 扣库存 → 扣款 → 通知发货
方案1:TCC(强一致性)
Try:
- 创建订单(pending)
- 冻结库存
- 冻结余额
- 预留发货单
Confirm:
- 订单confirmed
- 扣减库存
- 扣减余额
- 通知发货
Cancel:
- 取消订单
- 解冻库存
- 解冻余额
- 取消发货
方案2:Saga(最终一致性,推荐)
Step1: 创建订单
补偿: 取消订单
Step2: 扣减库存
补偿: 恢复库存
Step3: 扣款
补偿: 退款
Step4: 通知发货
补偿: 取消发货
方案3:本地消息表
1. 本地事务(创建订单 + 插入消息)
2. 发送消息到MQ
3. 库存服务消费消息 → 扣库存
4. 支付服务消费消息 → 扣款
5. 物流服务消费消息 → 发货
选择:
- 核心订单 → Saga
- 积分、优惠券 → 最大努力通知
分布式事务性能如何优化?
优化1:减少参与者
不要:订单服务 → 库存服务 → 支付服务 → 物流服务
改为:订单服务 → 库存服务,异步通知支付和物流
优化2:异步化
核心流程同步,非核心流程异步
同步:创建订单、扣库存、扣款
异步:发送短信、积分、优惠券
优化3:降级
高峰期:跳过非核心步骤
- 跳过积分
- 跳过优惠券
- 保证核心流程
优化4:缓存
库存缓存:减少库存服务调用
优化5:批量
批量确认、批量补偿