第8章:CQRS与Event Sourcing
什么是CQRS
定义
CQRS(Command Query Responsibility Segregation):命令查询职责分离
核心思想:将写操作(Command)和读操作(Query)分离到不同的模型
传统架构 vs CQRS
传统架构:
┌────────┐ ┌─────────────┐ ┌──────────┐
│ Client │─────→│ Application │─────→│ Database │
└────────┘ │ Service │ └──────────┘
└─────────────┘
↓
同一个模型处理读写
问题:
读写需求不同,强制使用同一模型
复杂查询影响写操作性能
难以独立优化和扩展
CQRS架构:
┌─────────────────────┐
│ Client │
└─────────────────────┘
↙ ↘
写请求 读请求
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ Command Side │ │ Query Side │
│ (写模型) │ │ (读模型) │
│ - Command │ │ - Query │
│ - Domain Model │ │ - DTO │
│ - Write DB │ │ - Read DB │
└──────────────────┘ └──────────────────┘
↓ ↑
┌────────┐ │
│ Event │──────────────────┘
│ Bus │ 同步/异步更新
└────────┘
优势:
读写独立扩展
读模型针对查询优化
写模型专注业务逻辑
性能提升
CQRS的核心概念
1. Command(命令)
定义:改变系统状态的操作
特征:
- 有副作用(修改数据)
- 返回值简单(成功/失败)
- 异步执行
代码示例:
// 创建订单命令
type CreateOrderCommand struct {
OrderID string
UserID string
Items []OrderItem
Address Address
}
// 命令处理器
type CreateOrderCommandHandler struct {
orderRepo OrderRepository
eventBus EventBus
}
func (h *CreateOrderCommandHandler) Handle(cmd CreateOrderCommand) error {
// 1. 创建聚合
order, err := NewOrder(cmd.OrderID, cmd.UserID, cmd.Items, cmd.Address)
if err != nil {
return err
}
// 2. 持久化
if err := h.orderRepo.Save(order); err != nil {
return err
}
// 3. 发布事件
for _, event := range order.GetEvents() {
h.eventBus.Publish(event)
}
// 4. 返回简单结果
return nil
}
2. Query(查询)
定义:获取系统状态的操作
特征:
- 无副作用(不修改数据)
- 返回复杂DTO
- 同步执行
- 针对查询优化
代码示例:
// 订单查询DTO
type OrderDTO struct {
OrderID string `json:"order_id"`
UserName string `json:"user_name"`
Items []OrderItemDTO `json:"items"`
TotalPrice float64 `json:"total_price"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
// 查询处理器
type OrderQueryHandler struct {
readDB *sql.DB // 读模型数据库
}
func (h *OrderQueryHandler) GetOrderDetail(orderID string) (*OrderDTO, error) {
// 直接查询优化后的读模型
query := `
SELECT o.id, u.name, o.total_price, o.status, o.created_at
FROM orders_view o
JOIN users u ON o.user_id = u.id
WHERE o.id = ?
`
var dto OrderDTO
err := h.readDB.QueryRow(query, orderID).Scan(
&dto.OrderID,
&dto.UserName,
&dto.TotalPrice,
&dto.Status,
&dto.CreatedAt,
)
return &dto, err
}
// 列表查询(带分页)
func (h *OrderQueryHandler) ListOrders(userID string, page, pageSize int) ([]OrderDTO, error) {
query := `
SELECT id, total_price, status, created_at
FROM orders_view
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
`
rows, err := h.readDB.Query(query, userID, pageSize, (page-1)*pageSize)
if err != nil {
return nil, err
}
defer rows.Close()
var orders []OrderDTO
for rows.Next() {
var dto OrderDTO
rows.Scan(&dto.OrderID, &dto.TotalPrice, &dto.Status, &dto.CreatedAt)
orders = append(orders, dto)
}
return orders, nil
}
Event Sourcing
定义
Event Sourcing(事件溯源):不存储当前状态,而是存储导致状态变化的所有事件
传统存储 vs Event Sourcing
传统存储(State-Based):
数据库存储当前状态:
accounts 表:
| id | balance | status |
|-----|---------|--------|
| 101 | 1000 | ACTIVE |
操作:
1. 存款500 → balance = 1500(更新)
2. 取款200 → balance = 1300(更新)
问题:
无法追溯历史
无法重现状态变化过程
审计困难
Event Sourcing:
存储事件序列:
account_events 表:
| id | account_id | event_type | amount | timestamp |
|----|------------|------------------|--------|------------|
| 1 | 101 | AccountCreated | 1000 | 2024-01-01 |
| 2 | 101 | MoneyDeposited | 500 | 2024-01-02 |
| 3 | 101 | MoneyWithdrawn | 200 | 2024-01-03 |
当前状态 = 重放所有事件:
AccountCreated(1000) + MoneyDeposited(500) - MoneyWithdrawn(200) = 1300
优势:
完整历史记录
可重现任意时刻状态
天然审计日志
支持时间旅行
Event Sourcing核心概念
1. 事件(Event)
定义:已发生的事实,不可变
// 事件接口
type Event interface {
AggregateID() string
EventType() string
OccurredAt() time.Time
Version() int
}
// 账户创建事件
type AccountCreatedEvent struct {
accountID string
initialBalance Money
occurredAt time.Time
version int
}
// 存款事件
type MoneyDepositedEvent struct {
accountID string
amount Money
occurredAt time.Time
version int
}
// 取款事件
type MoneyWithdrawnEvent struct {
accountID string
amount Money
occurredAt time.Time
version int
}
2. 事件存储(Event Store)
接口定义:
type EventStore interface {
// 保存事件
SaveEvents(aggregateID string, events []Event, expectedVersion int) error
// 加载事件
LoadEvents(aggregateID string) ([]Event, error)
// 加载指定版本之后的事件
LoadEventsSince(aggregateID string, version int) ([]Event, error)
}
// 实现(MySQL)
type MySQLEventStore struct {
db *sql.DB
}
func (es *MySQLEventStore) SaveEvents(aggregateID string, events []Event, expectedVersion int) error {
tx, _ := es.db.Begin()
defer tx.Rollback()
// 1. 检查版本冲突(乐观锁)
var currentVersion int
err := tx.QueryRow(
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = ?",
aggregateID,
).Scan(¤tVersion)
if currentVersion != expectedVersion {
return errors.New("version conflict")
}
// 2. 保存事件
for _, event := range events {
eventData, _ := json.Marshal(event)
_, err := tx.Exec(`
INSERT INTO events (aggregate_id, event_type, event_data, version, created_at)
VALUES (?, ?, ?, ?, ?)
`, aggregateID, event.EventType(), eventData, event.Version(), event.OccurredAt())
if err != nil {
return err
}
}
return tx.Commit()
}
func (es *MySQLEventStore) LoadEvents(aggregateID string) ([]Event, error) {
rows, err := es.db.Query(`
SELECT event_type, event_data, version, created_at
FROM events
WHERE aggregate_id = ?
ORDER BY version ASC
`, aggregateID)
if err != nil {
return nil, err
}
defer rows.Close()
var events []Event
for rows.Next() {
var eventType string
var eventData []byte
var version int
var createdAt time.Time
rows.Scan(&eventType, &eventData, &version, &createdAt)
// 反序列化事件
event := es.deserializeEvent(eventType, eventData)
events = append(events, event)
}
return events, nil
}
3. 聚合(Event-Sourced Aggregate)
实现:
// 账户聚合
type Account struct {
id string
balance Money
status AccountStatus
version int
uncommittedEvents []Event
}
// 从事件重建聚合
func LoadAccount(eventStore EventStore, accountID string) (*Account, error) {
events, err := eventStore.LoadEvents(accountID)
if err != nil {
return nil, err
}
account := &Account{id: accountID}
// 重放事件
for _, event := range events {
account.Apply(event)
}
return account, nil
}
// 应用事件(更新状态)
func (a *Account) Apply(event Event) {
switch e := event.(type) {
case *AccountCreatedEvent:
a.balance = e.initialBalance
a.status = AccountStatusActive
case *MoneyDepositedEvent:
a.balance = a.balance.Add(e.amount)
case *MoneyWithdrawnEvent:
a.balance = a.balance.Subtract(e.amount)
case *AccountClosedEvent:
a.status = AccountStatusClosed
}
a.version = event.Version()
}
// 业务方法(产生事件)
func (a *Account) Deposit(amount Money) error {
if a.status != AccountStatusActive {
return errors.New("account is not active")
}
event := &MoneyDepositedEvent{
accountID: a.id,
amount: amount,
occurredAt: time.Now(),
version: a.version + 1,
}
// 1. 应用事件(更新状态)
a.Apply(event)
// 2. 记录未提交事件
a.uncommittedEvents = append(a.uncommittedEvents, event)
return nil
}
func (a *Account) Withdraw(amount Money) error {
if a.status != AccountStatusActive {
return errors.New("account is not active")
}
if a.balance.LessThan(amount) {
return errors.New("insufficient balance")
}
event := &MoneyWithdrawnEvent{
accountID: a.id,
amount: amount,
occurredAt: time.Now(),
version: a.version + 1,
}
a.Apply(event)
a.uncommittedEvents = append(a.uncommittedEvents, event)
return nil
}
// 获取未提交事件
func (a *Account) GetUncommittedEvents() []Event {
return a.uncommittedEvents
}
func (a *Account) ClearUncommittedEvents() {
a.uncommittedEvents = nil
}
4. 快照(Snapshot)
问题:事件过多时,重放性能下降
解决:定期保存快照
type Snapshot struct {
AggregateID string
Version int
State interface{}
CreatedAt time.Time
}
type SnapshotStore interface {
SaveSnapshot(snapshot Snapshot) error
LoadSnapshot(aggregateID string) (*Snapshot, error)
}
// 从快照加载聚合
func LoadAccountWithSnapshot(eventStore EventStore, snapshotStore SnapshotStore, accountID string) (*Account, error) {
// 1. 加载快照
snapshot, err := snapshotStore.LoadSnapshot(accountID)
var account *Account
var fromVersion int
if err == nil && snapshot != nil {
// 从快照恢复
account = snapshot.State.(*Account)
fromVersion = snapshot.Version
} else {
// 从头开始
account = &Account{id: accountID}
fromVersion = 0
}
// 2. 加载快照之后的事件
events, _ := eventStore.LoadEventsSince(accountID, fromVersion)
// 3. 重放事件
for _, event := range events {
account.Apply(event)
}
return account, nil
}
// 定期保存快照
func (repo *AccountRepository) SaveWithSnapshot(account *Account) error {
// 1. 保存事件
err := repo.eventStore.SaveEvents(
account.id,
account.GetUncommittedEvents(),
account.version,
)
if err != nil {
return err
}
// 2. 每100个事件保存一次快照
if account.version%100 == 0 {
snapshot := Snapshot{
AggregateID: account.id,
Version: account.version,
State: account,
CreatedAt: time.Now(),
}
repo.snapshotStore.SaveSnapshot(snapshot)
}
account.ClearUncommittedEvents()
return nil
}
CQRS与ES结合
架构
┌──────────────────────────────────────────┐
│ Client │
└──────────────────────────────────────────┘
↓ Command ↑ Query
┌──────────────────┐ ┌──────────────────┐
│ Command Side │ │ Query Side │
│ (Event Sourcing)│ │ (Read Model) │
└──────────────────┘ └──────────────────┘
↓ ↑
┌──────────────────┐ │
│ Event Store │───────────┘
│ (Events DB) │ Projection
└──────────────────┘ (实时/异步更新)
代码示例
Command Side(事件溯源):
// 应用服务
type AccountApplicationService struct {
eventStore EventStore
eventBus EventBus
}
func (app *AccountApplicationService) CreateAccount(cmd CreateAccountCommand) error {
// 1. 创建聚合
account := NewAccount(cmd.AccountID, cmd.InitialBalance)
// 2. 保存事件
events := account.GetUncommittedEvents()
err := app.eventStore.SaveEvents(account.id, events, 0)
if err != nil {
return err
}
// 3. 发布事件(更新读模型)
for _, event := range events {
app.eventBus.Publish(event)
}
return nil
}
func (app *AccountApplicationService) Deposit(cmd DepositCommand) error {
// 1. 加载聚合(重放事件)
account, err := LoadAccount(app.eventStore, cmd.AccountID)
if err != nil {
return err
}
// 2. 执行业务逻辑
if err := account.Deposit(cmd.Amount); err != nil {
return err
}
// 3. 保存事件
events := account.GetUncommittedEvents()
err = app.eventStore.SaveEvents(account.id, events, account.version-len(events))
if err != nil {
return err
}
// 4. 发布事件
for _, event := range events {
app.eventBus.Publish(event)
}
return nil
}
Query Side(读模型):
// 读模型
type AccountReadModel struct {
ID string
Balance float64
Status string
TransactionCount int
LastTransaction time.Time
}
// 投影(Projection)
type AccountProjection struct {
db *sql.DB
}
// 监听事件,更新读模型
func (proj *AccountProjection) Handle(event Event) error {
switch e := event.(type) {
case *AccountCreatedEvent:
return proj.handleAccountCreated(e)
case *MoneyDepositedEvent:
return proj.handleMoneyDeposited(e)
case *MoneyWithdrawnEvent:
return proj.handleMoneyWithdrawn(e)
}
return nil
}
func (proj *AccountProjection) handleAccountCreated(event *AccountCreatedEvent) error {
_, err := proj.db.Exec(`
INSERT INTO account_read_model (id, balance, status, transaction_count, last_transaction)
VALUES (?, ?, ?, ?, ?)
`, event.accountID, event.initialBalance.Amount, "ACTIVE", 0, event.occurredAt)
return err
}
func (proj *AccountProjection) handleMoneyDeposited(event *MoneyDepositedEvent) error {
_, err := proj.db.Exec(`
UPDATE account_read_model
SET balance = balance + ?,
transaction_count = transaction_count + 1,
last_transaction = ?
WHERE id = ?
`, event.amount.Amount, event.occurredAt, event.accountID)
return err
}
// 查询服务
type AccountQueryService struct {
db *sql.DB
}
func (qs *AccountQueryService) GetAccount(accountID string) (*AccountReadModel, error) {
var model AccountReadModel
err := qs.db.QueryRow(`
SELECT id, balance, status, transaction_count, last_transaction
FROM account_read_model
WHERE id = ?
`, accountID).Scan(
&model.ID,
&model.Balance,
&model.Status,
&model.TransactionCount,
&model.LastTransaction,
)
return &model, err
}
func (qs *AccountQueryService) GetTopAccounts(limit int) ([]AccountReadModel, error) {
rows, err := qs.db.Query(`
SELECT id, balance, status, transaction_count
FROM account_read_model
ORDER BY balance DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var accounts []AccountReadModel
for rows.Next() {
var model AccountReadModel
rows.Scan(&model.ID, &model.Balance, &model.Status, &model.TransactionCount)
accounts = append(accounts, model)
}
return accounts, nil
}
实战案例
银行账户系统
完整实现:
package main
import (
"encoding/json"
"errors"
"time"
)
// ========== 事件定义 ==========
type Event interface {
AggregateID() string
Version() int
OccurredAt() time.Time
}
type BaseEvent struct {
aggregateID string
version int
occurredAt time.Time
}
func (e BaseEvent) AggregateID() string { return e.aggregateID }
func (e BaseEvent) Version() int { return e.version }
func (e BaseEvent) OccurredAt() time.Time { return e.occurredAt }
type AccountCreatedEvent struct {
BaseEvent
InitialBalance float64
}
type MoneyDepositedEvent struct {
BaseEvent
Amount float64
}
type MoneyWithdrawnEvent struct {
BaseEvent
Amount float64
}
// ========== 聚合根 ==========
type AccountStatus int
const (
AccountStatusActive AccountStatus = iota
AccountStatusClosed
)
type Account struct {
id string
balance float64
status AccountStatus
version int
uncommittedEvents []Event
}
func NewAccount(id string, initialBalance float64) *Account {
account := &Account{
id: id,
balance: 0,
status: AccountStatusActive,
version: 0,
}
event := &AccountCreatedEvent{
BaseEvent: BaseEvent{
aggregateID: id,
version: 1,
occurredAt: time.Now(),
},
InitialBalance: initialBalance,
}
account.Apply(event)
account.uncommittedEvents = append(account.uncommittedEvents, event)
return account
}
func (a *Account) Apply(event Event) {
switch e := event.(type) {
case *AccountCreatedEvent:
a.balance = e.InitialBalance
a.status = AccountStatusActive
case *MoneyDepositedEvent:
a.balance += e.Amount
case *MoneyWithdrawnEvent:
a.balance -= e.Amount
}
a.version = event.Version()
}
func (a *Account) Deposit(amount float64) error {
if amount <= 0 {
return errors.New("amount must be positive")
}
event := &MoneyDepositedEvent{
BaseEvent: BaseEvent{
aggregateID: a.id,
version: a.version + 1,
occurredAt: time.Now(),
},
Amount: amount,
}
a.Apply(event)
a.uncommittedEvents = append(a.uncommittedEvents, event)
return nil
}
func (a *Account) Withdraw(amount float64) error {
if amount <= 0 {
return errors.New("amount must be positive")
}
if a.balance < amount {
return errors.New("insufficient balance")
}
event := &MoneyWithdrawnEvent{
BaseEvent: BaseEvent{
aggregateID: a.id,
version: a.version + 1,
occurredAt: time.Now(),
},
Amount: amount,
}
a.Apply(event)
a.uncommittedEvents = append(a.uncommittedEvents, event)
return nil
}
func (a *Account) GetUncommittedEvents() []Event {
return a.uncommittedEvents
}
func (a *Account) ClearEvents() {
a.uncommittedEvents = nil
}
// ========== 完整应用 ==========
func main() {
// 1. 创建账户
account := NewAccount("ACC-001", 1000.0)
fmt.Printf("Account created: balance=%.2f\n", account.balance)
// 2. 存款
account.Deposit(500)
fmt.Printf("After deposit: balance=%.2f\n", account.balance)
// 3. 取款
account.Withdraw(200)
fmt.Printf("After withdrawal: balance=%.2f\n", account.balance)
// 4. 查看事件
fmt.Println("\nEvents:")
for _, event := range account.GetUncommittedEvents() {
eventJSON, _ := json.MarshalIndent(event, "", " ")
fmt.Println(string(eventJSON))
}
// 5. 重放事件(重建状态)
fmt.Println("\nRebuilding from events:")
rebuiltAccount := &Account{id: "ACC-001"}
for _, event := range account.GetUncommittedEvents() {
rebuiltAccount.Apply(event)
}
fmt.Printf("Rebuilt balance: %.2f\n", rebuiltAccount.balance)
}
常见问题
1. 事件存储性能优化
问题:重放大量事件性能差
解决:
- 快照
每N个事件保存一次快照
加载时:快照 + 增量事件
- 缓存聚合
type CachedAccountRepository struct {
cache *Cache
eventStore EventStore
}
func (repo *CachedAccountRepository) Load(id string) (*Account, error) {
// 先查缓存
if cached, ok := repo.cache.Get(id); ok {
return cached.(*Account), nil
}
// 缓存未命中,从事件重建
account, _ := LoadAccount(repo.eventStore, id)
// 放入缓存
repo.cache.Set(id, account, 10*time.Minute)
return account, nil
}
2. 读模型同步延迟
问题:写入后立即查询,可能读到旧数据
解决:
- 同步更新
func (app *AccountApp) Deposit(cmd DepositCommand) error {
// 1. 保存事件
account, _ := LoadAccount(app.eventStore, cmd.AccountID)
account.Deposit(cmd.Amount)
app.eventStore.SaveEvents(account.id, account.GetUncommittedEvents(), account.version-1)
// 2. 同步更新读模型
for _, event := range account.GetUncommittedEvents() {
app.projection.Handle(event) // 同步处理
}
return nil
}
- 版本号校验
func (qs *AccountQueryService) GetAccount(accountID string, minVersion int) (*AccountReadModel, error) {
for i := 0; i < 10; i++ {
model, _ := qs.loadAccount(accountID)
if model.Version >= minVersion {
return model, nil
}
time.Sleep(100 * time.Millisecond) // 等待投影完成
}
return nil, errors.New("timeout waiting for projection")
}
3. 事件版本演进
问题:事件结构变化,历史事件如何处理
解决:
// V1事件
type MoneyDepositedEventV1 struct {
Amount float64
}
// V2事件(增加货币字段)
type MoneyDepositedEventV2 struct {
Amount float64
Currency string
}
// 事件升级器
type EventUpgrader struct{}
func (eu *EventUpgrader) Upgrade(event Event) Event {
switch e := event.(type) {
case *MoneyDepositedEventV1:
return &MoneyDepositedEventV2{
Amount: e.Amount,
Currency: "CNY", // 默认值
}
}
return event
}
// 加载事件时自动升级
func (es *EventStore) LoadEvents(aggregateID string) ([]Event, error) {
rawEvents, _ := es.loadRawEvents(aggregateID)
upgrader := &EventUpgrader{}
upgradedEvents := make([]Event, 0)
for _, event := range rawEvents {
upgraded := upgrader.Upgrade(event)
upgradedEvents = append(upgradedEvents, upgraded)
}
return upgradedEvents, nil
}
面试问答
CQRS的优缺点是什么?
答案:
优点:
读写独立扩展
读模型针对查询优化(视图、冗余)
写模型专注业务逻辑
性能提升(读写分离)
最终一致性(解耦)
缺点:
架构复杂度高
数据同步延迟(最终一致性)
学习成本高
代码量增加
适用场景:
适合:
读写比例悬殊(10:1以上)
复杂查询需求(多维度、聚合)
高并发读写
不适合:
简单CRUD应用
强一致性要求
小团队、短期项目
Event Sourcing vs 传统存储有什么区别?
答案:
| 维度 | 传统存储 | Event Sourcing |
|---|---|---|
| 存储内容 | 当前状态 | 事件序列 |
| 数据可追溯 | ||
| 审计日志 | 需单独实现 | 天然支持 |
| 时间旅行 | (重放到任意时刻) | |
| 性能 | 读快,写快 | 读慢(需重放),写快 |
| 复杂度 | 低 | 高 |
示例:
传统存储:
balance = 1300(只知道当前值,不知道怎么来的)
Event Sourcing:
balance = 1000(初始)+ 500(存款)- 200(取款)= 1300
完整历史可追溯!
如何解决Event Sourcing的性能问题?
答案:
1. 快照(Snapshot)
策略:每100个事件保存一次快照
加载:快照(版本100)+ 增量事件(101-150)
性能对比:
无快照:重放1000个事件(慢)
有快照:加载快照 + 重放100个事件(快10倍)
2. 缓存聚合
type CachedRepository struct {
cache *LRUCache
eventStore EventStore
}
func (repo *CachedRepository) Load(id string) (*Account, error) {
// 先查缓存
if cached, ok := repo.cache.Get(id); ok {
return cached.(*Account), nil
}
// 从事件重建
account, _ := LoadAccount(repo.eventStore, id)
// 缓存
repo.cache.Set(id, account)
return account, nil
}
3. 异步投影
事件写入后异步更新读模型
读操作直接查询优化后的读模型(快)
CQRS和Event Sourcing必须一起使用吗?
答案:
不是必须的,但经常一起使用
CQRS without ES:
Command Side: 传统ORM存储
Query Side: 优化的读模型
适用:
- 需要读写分离
- 不需要完整事件历史
ES without CQRS:
所有操作都通过事件溯源
读取也是重放事件
适用:
- 审计要求高
- 读写模式相似
CQRS + ES(最强组合):
Command Side: Event Sourcing
Query Side: 优化读模型
适用:
- 复杂业务逻辑
- 高并发读写
- 完整审计需求
如何处理跨聚合的一致性?
答案:
问题:
转账操作涉及两个账户(两个聚合)
如何保证一致性?
方案1:Saga模式
type TransferSaga struct {
eventBus EventBus
}
// 1. 扣款
func (saga *TransferSaga) DebitAccount(fromAccountID string, amount float64) error {
// 发布扣款命令
saga.eventBus.Publish(DebitAccountCommand{
AccountID: fromAccountID,
Amount: amount,
})
return nil
}
// 2. 监听扣款成功事件,执行入账
func (saga *TransferSaga) OnMoneyDebited(event MoneyDebitedEvent) {
// 发布入账命令
saga.eventBus.Publish(CreditAccountCommand{
AccountID: event.ToAccountID,
Amount: event.Amount,
})
}
// 3. 监听入账失败事件,回滚扣款
func (saga *TransferSaga) OnCreditFailed(event CreditFailedEvent) {
// 发布补偿命令(回滚)
saga.eventBus.Publish(RefundAccountCommand{
AccountID: event.FromAccountID,
Amount: event.Amount,
})
}
方案2:Process Manager
type TransferProcessManager struct {
state TransferState
eventBus EventBus
}
func (pm *TransferProcessManager) Handle(event Event) {
switch pm.state {
case StateInitiated:
if _, ok := event.(*MoneyDebitedEvent); ok {
pm.state = StateDebited
pm.creditToAccount()
}
case StateDebited:
if _, ok := event.(*MoneyCreditedEvent); ok {
pm.state = StateCompleted
} else if _, ok := event.(*CreditFailedEvent); ok {
pm.state = StateFailed
pm.refundFromAccount()
}
}
}