HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 分布式架构模式

    • 分布式架构模式手册
    • 第1章:分布式一致性
    • 第2章:分布式锁
    • 第3章:分布式协调
    • 第4章:服务发现与注册
    • 第5章:负载均衡
    • 第6章:熔断降级
    • 第7章:DDD领域驱动设计
    • 第8章:CQRS与Event Sourcing

第8章:CQRS与Event Sourcing

什么是CQRS

定义

CQRS(Command Query Responsibility Segregation):命令查询职责分离

核心思想:将写操作(Command)和读操作(Query)分离到不同的模型

传统架构 vs CQRS

传统架构:

┌────────┐      ┌─────────────┐      ┌──────────┐
│ Client │─────→│ Application │─────→│ Database │
└────────┘      │   Service   │      └──────────┘
                └─────────────┘
                       ↓
              同一个模型处理读写

问题:
 读写需求不同,强制使用同一模型
 复杂查询影响写操作性能
 难以独立优化和扩展

CQRS架构:

              ┌─────────────────────┐
              │      Client         │
              └─────────────────────┘
                  ↙             ↘
          写请求                   读请求
            ↓                       ↓
┌──────────────────┐    ┌──────────────────┐
│  Command Side    │    │   Query Side     │
│  (写模型)         │    │   (读模型)        │
│  - Command       │    │   - Query        │
│  - Domain Model  │    │   - DTO          │
│  - Write DB      │    │   - Read DB      │
└──────────────────┘    └──────────────────┘
         ↓                      ↑
    ┌────────┐                  │
    │ Event  │──────────────────┘
    │  Bus   │  同步/异步更新
    └────────┘

优势:
 读写独立扩展
 读模型针对查询优化
 写模型专注业务逻辑
 性能提升

CQRS的核心概念

1. Command(命令)

定义:改变系统状态的操作

特征:

  • 有副作用(修改数据)
  • 返回值简单(成功/失败)
  • 异步执行

代码示例:

// 创建订单命令
type CreateOrderCommand struct {
    OrderID  string
    UserID   string
    Items    []OrderItem
    Address  Address
}

// 命令处理器
type CreateOrderCommandHandler struct {
    orderRepo OrderRepository
    eventBus  EventBus
}

func (h *CreateOrderCommandHandler) Handle(cmd CreateOrderCommand) error {
    // 1. 创建聚合
    order, err := NewOrder(cmd.OrderID, cmd.UserID, cmd.Items, cmd.Address)
    if err != nil {
        return err
    }

    // 2. 持久化
    if err := h.orderRepo.Save(order); err != nil {
        return err
    }

    // 3. 发布事件
    for _, event := range order.GetEvents() {
        h.eventBus.Publish(event)
    }

    // 4. 返回简单结果
    return nil
}

2. Query(查询)

定义:获取系统状态的操作

特征:

  • 无副作用(不修改数据)
  • 返回复杂DTO
  • 同步执行
  • 针对查询优化

代码示例:

// 订单查询DTO
type OrderDTO struct {
    OrderID      string        `json:"order_id"`
    UserName     string        `json:"user_name"`
    Items        []OrderItemDTO `json:"items"`
    TotalPrice   float64       `json:"total_price"`
    Status       string        `json:"status"`
    CreatedAt    time.Time     `json:"created_at"`
}

// 查询处理器
type OrderQueryHandler struct {
    readDB *sql.DB  // 读模型数据库
}

func (h *OrderQueryHandler) GetOrderDetail(orderID string) (*OrderDTO, error) {
    // 直接查询优化后的读模型
    query := `
        SELECT o.id, u.name, o.total_price, o.status, o.created_at
        FROM orders_view o
        JOIN users u ON o.user_id = u.id
        WHERE o.id = ?
    `

    var dto OrderDTO
    err := h.readDB.QueryRow(query, orderID).Scan(
        &dto.OrderID,
        &dto.UserName,
        &dto.TotalPrice,
        &dto.Status,
        &dto.CreatedAt,
    )

    return &dto, err
}

// 列表查询(带分页)
func (h *OrderQueryHandler) ListOrders(userID string, page, pageSize int) ([]OrderDTO, error) {
    query := `
        SELECT id, total_price, status, created_at
        FROM orders_view
        WHERE user_id = ?
        ORDER BY created_at DESC
        LIMIT ? OFFSET ?
    `

    rows, err := h.readDB.Query(query, userID, pageSize, (page-1)*pageSize)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var orders []OrderDTO
    for rows.Next() {
        var dto OrderDTO
        rows.Scan(&dto.OrderID, &dto.TotalPrice, &dto.Status, &dto.CreatedAt)
        orders = append(orders, dto)
    }

    return orders, nil
}

Event Sourcing

定义

Event Sourcing(事件溯源):不存储当前状态,而是存储导致状态变化的所有事件

传统存储 vs Event Sourcing

传统存储(State-Based):

数据库存储当前状态:

accounts 表:
| id  | balance | status |
|-----|---------|--------|
| 101 | 1000    | ACTIVE |

操作:
1. 存款500 → balance = 1500(更新)
2. 取款200 → balance = 1300(更新)

问题:
 无法追溯历史
 无法重现状态变化过程
 审计困难

Event Sourcing:

存储事件序列:

account_events 表:
| id | account_id | event_type       | amount | timestamp  |
|----|------------|------------------|--------|------------|
| 1  | 101        | AccountCreated   | 1000   | 2024-01-01 |
| 2  | 101        | MoneyDeposited   | 500    | 2024-01-02 |
| 3  | 101        | MoneyWithdrawn   | 200    | 2024-01-03 |

当前状态 = 重放所有事件:
AccountCreated(1000) + MoneyDeposited(500) - MoneyWithdrawn(200) = 1300

优势:
 完整历史记录
 可重现任意时刻状态
 天然审计日志
 支持时间旅行

Event Sourcing核心概念

1. 事件(Event)

定义:已发生的事实,不可变

// 事件接口
type Event interface {
    AggregateID() string
    EventType() string
    OccurredAt() time.Time
    Version() int
}

// 账户创建事件
type AccountCreatedEvent struct {
    accountID   string
    initialBalance Money
    occurredAt  time.Time
    version     int
}

// 存款事件
type MoneyDepositedEvent struct {
    accountID  string
    amount     Money
    occurredAt time.Time
    version    int
}

// 取款事件
type MoneyWithdrawnEvent struct {
    accountID  string
    amount     Money
    occurredAt time.Time
    version    int
}

2. 事件存储(Event Store)

接口定义:

type EventStore interface {
    // 保存事件
    SaveEvents(aggregateID string, events []Event, expectedVersion int) error

    // 加载事件
    LoadEvents(aggregateID string) ([]Event, error)

    // 加载指定版本之后的事件
    LoadEventsSince(aggregateID string, version int) ([]Event, error)
}

// 实现(MySQL)
type MySQLEventStore struct {
    db *sql.DB
}

func (es *MySQLEventStore) SaveEvents(aggregateID string, events []Event, expectedVersion int) error {
    tx, _ := es.db.Begin()
    defer tx.Rollback()

    // 1. 检查版本冲突(乐观锁)
    var currentVersion int
    err := tx.QueryRow(
        "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = ?",
        aggregateID,
    ).Scan(&currentVersion)

    if currentVersion != expectedVersion {
        return errors.New("version conflict")
    }

    // 2. 保存事件
    for _, event := range events {
        eventData, _ := json.Marshal(event)

        _, err := tx.Exec(`
            INSERT INTO events (aggregate_id, event_type, event_data, version, created_at)
            VALUES (?, ?, ?, ?, ?)
        `, aggregateID, event.EventType(), eventData, event.Version(), event.OccurredAt())

        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

func (es *MySQLEventStore) LoadEvents(aggregateID string) ([]Event, error) {
    rows, err := es.db.Query(`
        SELECT event_type, event_data, version, created_at
        FROM events
        WHERE aggregate_id = ?
        ORDER BY version ASC
    `, aggregateID)

    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var events []Event
    for rows.Next() {
        var eventType string
        var eventData []byte
        var version int
        var createdAt time.Time

        rows.Scan(&eventType, &eventData, &version, &createdAt)

        // 反序列化事件
        event := es.deserializeEvent(eventType, eventData)
        events = append(events, event)
    }

    return events, nil
}

3. 聚合(Event-Sourced Aggregate)

实现:

// 账户聚合
type Account struct {
    id              string
    balance         Money
    status          AccountStatus
    version         int
    uncommittedEvents []Event
}

// 从事件重建聚合
func LoadAccount(eventStore EventStore, accountID string) (*Account, error) {
    events, err := eventStore.LoadEvents(accountID)
    if err != nil {
        return nil, err
    }

    account := &Account{id: accountID}

    // 重放事件
    for _, event := range events {
        account.Apply(event)
    }

    return account, nil
}

// 应用事件(更新状态)
func (a *Account) Apply(event Event) {
    switch e := event.(type) {
    case *AccountCreatedEvent:
        a.balance = e.initialBalance
        a.status = AccountStatusActive

    case *MoneyDepositedEvent:
        a.balance = a.balance.Add(e.amount)

    case *MoneyWithdrawnEvent:
        a.balance = a.balance.Subtract(e.amount)

    case *AccountClosedEvent:
        a.status = AccountStatusClosed
    }

    a.version = event.Version()
}

// 业务方法(产生事件)
func (a *Account) Deposit(amount Money) error {
    if a.status != AccountStatusActive {
        return errors.New("account is not active")
    }

    event := &MoneyDepositedEvent{
        accountID:  a.id,
        amount:     amount,
        occurredAt: time.Now(),
        version:    a.version + 1,
    }

    // 1. 应用事件(更新状态)
    a.Apply(event)

    // 2. 记录未提交事件
    a.uncommittedEvents = append(a.uncommittedEvents, event)

    return nil
}

func (a *Account) Withdraw(amount Money) error {
    if a.status != AccountStatusActive {
        return errors.New("account is not active")
    }

    if a.balance.LessThan(amount) {
        return errors.New("insufficient balance")
    }

    event := &MoneyWithdrawnEvent{
        accountID:  a.id,
        amount:     amount,
        occurredAt: time.Now(),
        version:    a.version + 1,
    }

    a.Apply(event)
    a.uncommittedEvents = append(a.uncommittedEvents, event)

    return nil
}

// 获取未提交事件
func (a *Account) GetUncommittedEvents() []Event {
    return a.uncommittedEvents
}

func (a *Account) ClearUncommittedEvents() {
    a.uncommittedEvents = nil
}

4. 快照(Snapshot)

问题:事件过多时,重放性能下降

解决:定期保存快照

type Snapshot struct {
    AggregateID string
    Version     int
    State       interface{}
    CreatedAt   time.Time
}

type SnapshotStore interface {
    SaveSnapshot(snapshot Snapshot) error
    LoadSnapshot(aggregateID string) (*Snapshot, error)
}

// 从快照加载聚合
func LoadAccountWithSnapshot(eventStore EventStore, snapshotStore SnapshotStore, accountID string) (*Account, error) {
    // 1. 加载快照
    snapshot, err := snapshotStore.LoadSnapshot(accountID)

    var account *Account
    var fromVersion int

    if err == nil && snapshot != nil {
        // 从快照恢复
        account = snapshot.State.(*Account)
        fromVersion = snapshot.Version
    } else {
        // 从头开始
        account = &Account{id: accountID}
        fromVersion = 0
    }

    // 2. 加载快照之后的事件
    events, _ := eventStore.LoadEventsSince(accountID, fromVersion)

    // 3. 重放事件
    for _, event := range events {
        account.Apply(event)
    }

    return account, nil
}

// 定期保存快照
func (repo *AccountRepository) SaveWithSnapshot(account *Account) error {
    // 1. 保存事件
    err := repo.eventStore.SaveEvents(
        account.id,
        account.GetUncommittedEvents(),
        account.version,
    )
    if err != nil {
        return err
    }

    // 2. 每100个事件保存一次快照
    if account.version%100 == 0 {
        snapshot := Snapshot{
            AggregateID: account.id,
            Version:     account.version,
            State:       account,
            CreatedAt:   time.Now(),
        }
        repo.snapshotStore.SaveSnapshot(snapshot)
    }

    account.ClearUncommittedEvents()
    return nil
}

CQRS与ES结合

架构

┌──────────────────────────────────────────┐
│              Client                      │
└──────────────────────────────────────────┘
         ↓ Command          ↑ Query
┌──────────────────┐  ┌──────────────────┐
│  Command Side    │  │   Query Side     │
│  (Event Sourcing)│  │   (Read Model)   │
└──────────────────┘  └──────────────────┘
         ↓                     ↑
┌──────────────────┐           │
│   Event Store    │───────────┘
│   (Events DB)    │  Projection
└──────────────────┘  (实时/异步更新)

代码示例

Command Side(事件溯源):

// 应用服务
type AccountApplicationService struct {
    eventStore EventStore
    eventBus   EventBus
}

func (app *AccountApplicationService) CreateAccount(cmd CreateAccountCommand) error {
    // 1. 创建聚合
    account := NewAccount(cmd.AccountID, cmd.InitialBalance)

    // 2. 保存事件
    events := account.GetUncommittedEvents()
    err := app.eventStore.SaveEvents(account.id, events, 0)
    if err != nil {
        return err
    }

    // 3. 发布事件(更新读模型)
    for _, event := range events {
        app.eventBus.Publish(event)
    }

    return nil
}

func (app *AccountApplicationService) Deposit(cmd DepositCommand) error {
    // 1. 加载聚合(重放事件)
    account, err := LoadAccount(app.eventStore, cmd.AccountID)
    if err != nil {
        return err
    }

    // 2. 执行业务逻辑
    if err := account.Deposit(cmd.Amount); err != nil {
        return err
    }

    // 3. 保存事件
    events := account.GetUncommittedEvents()
    err = app.eventStore.SaveEvents(account.id, events, account.version-len(events))
    if err != nil {
        return err
    }

    // 4. 发布事件
    for _, event := range events {
        app.eventBus.Publish(event)
    }

    return nil
}

Query Side(读模型):

// 读模型
type AccountReadModel struct {
    ID             string
    Balance        float64
    Status         string
    TransactionCount int
    LastTransaction time.Time
}

// 投影(Projection)
type AccountProjection struct {
    db *sql.DB
}

// 监听事件,更新读模型
func (proj *AccountProjection) Handle(event Event) error {
    switch e := event.(type) {
    case *AccountCreatedEvent:
        return proj.handleAccountCreated(e)

    case *MoneyDepositedEvent:
        return proj.handleMoneyDeposited(e)

    case *MoneyWithdrawnEvent:
        return proj.handleMoneyWithdrawn(e)
    }

    return nil
}

func (proj *AccountProjection) handleAccountCreated(event *AccountCreatedEvent) error {
    _, err := proj.db.Exec(`
        INSERT INTO account_read_model (id, balance, status, transaction_count, last_transaction)
        VALUES (?, ?, ?, ?, ?)
    `, event.accountID, event.initialBalance.Amount, "ACTIVE", 0, event.occurredAt)

    return err
}

func (proj *AccountProjection) handleMoneyDeposited(event *MoneyDepositedEvent) error {
    _, err := proj.db.Exec(`
        UPDATE account_read_model
        SET balance = balance + ?,
            transaction_count = transaction_count + 1,
            last_transaction = ?
        WHERE id = ?
    `, event.amount.Amount, event.occurredAt, event.accountID)

    return err
}

// 查询服务
type AccountQueryService struct {
    db *sql.DB
}

func (qs *AccountQueryService) GetAccount(accountID string) (*AccountReadModel, error) {
    var model AccountReadModel

    err := qs.db.QueryRow(`
        SELECT id, balance, status, transaction_count, last_transaction
        FROM account_read_model
        WHERE id = ?
    `, accountID).Scan(
        &model.ID,
        &model.Balance,
        &model.Status,
        &model.TransactionCount,
        &model.LastTransaction,
    )

    return &model, err
}

func (qs *AccountQueryService) GetTopAccounts(limit int) ([]AccountReadModel, error) {
    rows, err := qs.db.Query(`
        SELECT id, balance, status, transaction_count
        FROM account_read_model
        ORDER BY balance DESC
        LIMIT ?
    `, limit)

    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var accounts []AccountReadModel
    for rows.Next() {
        var model AccountReadModel
        rows.Scan(&model.ID, &model.Balance, &model.Status, &model.TransactionCount)
        accounts = append(accounts, model)
    }

    return accounts, nil
}

实战案例

银行账户系统

完整实现:

package main

import (
    "encoding/json"
    "errors"
    "time"
)

// ========== 事件定义 ==========

type Event interface {
    AggregateID() string
    Version() int
    OccurredAt() time.Time
}

type BaseEvent struct {
    aggregateID string
    version     int
    occurredAt  time.Time
}

func (e BaseEvent) AggregateID() string { return e.aggregateID }
func (e BaseEvent) Version() int        { return e.version }
func (e BaseEvent) OccurredAt() time.Time { return e.occurredAt }

type AccountCreatedEvent struct {
    BaseEvent
    InitialBalance float64
}

type MoneyDepositedEvent struct {
    BaseEvent
    Amount float64
}

type MoneyWithdrawnEvent struct {
    BaseEvent
    Amount float64
}

// ========== 聚合根 ==========

type AccountStatus int

const (
    AccountStatusActive AccountStatus = iota
    AccountStatusClosed
)

type Account struct {
    id               string
    balance          float64
    status           AccountStatus
    version          int
    uncommittedEvents []Event
}

func NewAccount(id string, initialBalance float64) *Account {
    account := &Account{
        id:      id,
        balance: 0,
        status:  AccountStatusActive,
        version: 0,
    }

    event := &AccountCreatedEvent{
        BaseEvent: BaseEvent{
            aggregateID: id,
            version:     1,
            occurredAt:  time.Now(),
        },
        InitialBalance: initialBalance,
    }

    account.Apply(event)
    account.uncommittedEvents = append(account.uncommittedEvents, event)

    return account
}

func (a *Account) Apply(event Event) {
    switch e := event.(type) {
    case *AccountCreatedEvent:
        a.balance = e.InitialBalance
        a.status = AccountStatusActive

    case *MoneyDepositedEvent:
        a.balance += e.Amount

    case *MoneyWithdrawnEvent:
        a.balance -= e.Amount
    }

    a.version = event.Version()
}

func (a *Account) Deposit(amount float64) error {
    if amount <= 0 {
        return errors.New("amount must be positive")
    }

    event := &MoneyDepositedEvent{
        BaseEvent: BaseEvent{
            aggregateID: a.id,
            version:     a.version + 1,
            occurredAt:  time.Now(),
        },
        Amount: amount,
    }

    a.Apply(event)
    a.uncommittedEvents = append(a.uncommittedEvents, event)

    return nil
}

func (a *Account) Withdraw(amount float64) error {
    if amount <= 0 {
        return errors.New("amount must be positive")
    }

    if a.balance < amount {
        return errors.New("insufficient balance")
    }

    event := &MoneyWithdrawnEvent{
        BaseEvent: BaseEvent{
            aggregateID: a.id,
            version:     a.version + 1,
            occurredAt:  time.Now(),
        },
        Amount: amount,
    }

    a.Apply(event)
    a.uncommittedEvents = append(a.uncommittedEvents, event)

    return nil
}

func (a *Account) GetUncommittedEvents() []Event {
    return a.uncommittedEvents
}

func (a *Account) ClearEvents() {
    a.uncommittedEvents = nil
}

// ========== 完整应用 ==========

func main() {
    // 1. 创建账户
    account := NewAccount("ACC-001", 1000.0)
    fmt.Printf("Account created: balance=%.2f\n", account.balance)

    // 2. 存款
    account.Deposit(500)
    fmt.Printf("After deposit: balance=%.2f\n", account.balance)

    // 3. 取款
    account.Withdraw(200)
    fmt.Printf("After withdrawal: balance=%.2f\n", account.balance)

    // 4. 查看事件
    fmt.Println("\nEvents:")
    for _, event := range account.GetUncommittedEvents() {
        eventJSON, _ := json.MarshalIndent(event, "", "  ")
        fmt.Println(string(eventJSON))
    }

    // 5. 重放事件(重建状态)
    fmt.Println("\nRebuilding from events:")
    rebuiltAccount := &Account{id: "ACC-001"}
    for _, event := range account.GetUncommittedEvents() {
        rebuiltAccount.Apply(event)
    }
    fmt.Printf("Rebuilt balance: %.2f\n", rebuiltAccount.balance)
}

常见问题

1. 事件存储性能优化

问题:重放大量事件性能差

解决:

  1. 快照
每N个事件保存一次快照
加载时:快照 + 增量事件
  1. 缓存聚合
type CachedAccountRepository struct {
    cache      *Cache
    eventStore EventStore
}

func (repo *CachedAccountRepository) Load(id string) (*Account, error) {
    // 先查缓存
    if cached, ok := repo.cache.Get(id); ok {
        return cached.(*Account), nil
    }

    // 缓存未命中,从事件重建
    account, _ := LoadAccount(repo.eventStore, id)

    // 放入缓存
    repo.cache.Set(id, account, 10*time.Minute)

    return account, nil
}

2. 读模型同步延迟

问题:写入后立即查询,可能读到旧数据

解决:

  1. 同步更新
func (app *AccountApp) Deposit(cmd DepositCommand) error {
    // 1. 保存事件
    account, _ := LoadAccount(app.eventStore, cmd.AccountID)
    account.Deposit(cmd.Amount)
    app.eventStore.SaveEvents(account.id, account.GetUncommittedEvents(), account.version-1)

    // 2. 同步更新读模型
    for _, event := range account.GetUncommittedEvents() {
        app.projection.Handle(event)  // 同步处理
    }

    return nil
}
  1. 版本号校验
func (qs *AccountQueryService) GetAccount(accountID string, minVersion int) (*AccountReadModel, error) {
    for i := 0; i < 10; i++ {
        model, _ := qs.loadAccount(accountID)

        if model.Version >= minVersion {
            return model, nil
        }

        time.Sleep(100 * time.Millisecond)  // 等待投影完成
    }

    return nil, errors.New("timeout waiting for projection")
}

3. 事件版本演进

问题:事件结构变化,历史事件如何处理

解决:

// V1事件
type MoneyDepositedEventV1 struct {
    Amount float64
}

// V2事件(增加货币字段)
type MoneyDepositedEventV2 struct {
    Amount   float64
    Currency string
}

// 事件升级器
type EventUpgrader struct{}

func (eu *EventUpgrader) Upgrade(event Event) Event {
    switch e := event.(type) {
    case *MoneyDepositedEventV1:
        return &MoneyDepositedEventV2{
            Amount:   e.Amount,
            Currency: "CNY",  // 默认值
        }
    }

    return event
}

// 加载事件时自动升级
func (es *EventStore) LoadEvents(aggregateID string) ([]Event, error) {
    rawEvents, _ := es.loadRawEvents(aggregateID)

    upgrader := &EventUpgrader{}
    upgradedEvents := make([]Event, 0)

    for _, event := range rawEvents {
        upgraded := upgrader.Upgrade(event)
        upgradedEvents = append(upgradedEvents, upgraded)
    }

    return upgradedEvents, nil
}

面试问答

CQRS的优缺点是什么?

答案:

优点:

 读写独立扩展
 读模型针对查询优化(视图、冗余)
 写模型专注业务逻辑
 性能提升(读写分离)
 最终一致性(解耦)

缺点:

 架构复杂度高
 数据同步延迟(最终一致性)
 学习成本高
 代码量增加

适用场景:

适合:
 读写比例悬殊(10:1以上)
 复杂查询需求(多维度、聚合)
 高并发读写

不适合:
 简单CRUD应用
 强一致性要求
 小团队、短期项目

Event Sourcing vs 传统存储有什么区别?

答案:

维度传统存储Event Sourcing
存储内容当前状态事件序列
数据可追溯
审计日志需单独实现天然支持
时间旅行(重放到任意时刻)
性能读快,写快读慢(需重放),写快
复杂度低高

示例:

传统存储:
balance = 1300(只知道当前值,不知道怎么来的)

Event Sourcing:
balance = 1000(初始)+ 500(存款)- 200(取款)= 1300
完整历史可追溯!

如何解决Event Sourcing的性能问题?

答案:

1. 快照(Snapshot)

策略:每100个事件保存一次快照
加载:快照(版本100)+ 增量事件(101-150)

性能对比:
无快照:重放1000个事件(慢)
有快照:加载快照 + 重放100个事件(快10倍)

2. 缓存聚合

type CachedRepository struct {
    cache      *LRUCache
    eventStore EventStore
}

func (repo *CachedRepository) Load(id string) (*Account, error) {
    // 先查缓存
    if cached, ok := repo.cache.Get(id); ok {
        return cached.(*Account), nil
    }

    // 从事件重建
    account, _ := LoadAccount(repo.eventStore, id)

    // 缓存
    repo.cache.Set(id, account)

    return account, nil
}

3. 异步投影

事件写入后异步更新读模型
读操作直接查询优化后的读模型(快)

CQRS和Event Sourcing必须一起使用吗?

答案:

不是必须的,但经常一起使用

CQRS without ES:

Command Side: 传统ORM存储
Query Side: 优化的读模型

适用:
- 需要读写分离
- 不需要完整事件历史

ES without CQRS:

所有操作都通过事件溯源
读取也是重放事件

适用:
- 审计要求高
- 读写模式相似

CQRS + ES(最强组合):

Command Side: Event Sourcing
Query Side: 优化读模型

适用:
- 复杂业务逻辑
- 高并发读写
- 完整审计需求

如何处理跨聚合的一致性?

答案:

问题:

转账操作涉及两个账户(两个聚合)
如何保证一致性?

方案1:Saga模式

type TransferSaga struct {
    eventBus EventBus
}

// 1. 扣款
func (saga *TransferSaga) DebitAccount(fromAccountID string, amount float64) error {
    // 发布扣款命令
    saga.eventBus.Publish(DebitAccountCommand{
        AccountID: fromAccountID,
        Amount:    amount,
    })

    return nil
}

// 2. 监听扣款成功事件,执行入账
func (saga *TransferSaga) OnMoneyDebited(event MoneyDebitedEvent) {
    // 发布入账命令
    saga.eventBus.Publish(CreditAccountCommand{
        AccountID: event.ToAccountID,
        Amount:    event.Amount,
    })
}

// 3. 监听入账失败事件,回滚扣款
func (saga *TransferSaga) OnCreditFailed(event CreditFailedEvent) {
    // 发布补偿命令(回滚)
    saga.eventBus.Publish(RefundAccountCommand{
        AccountID: event.FromAccountID,
        Amount:    event.Amount,
    })
}

方案2:Process Manager

type TransferProcessManager struct {
    state  TransferState
    eventBus EventBus
}

func (pm *TransferProcessManager) Handle(event Event) {
    switch pm.state {
    case StateInitiated:
        if _, ok := event.(*MoneyDebitedEvent); ok {
            pm.state = StateDebited
            pm.creditToAccount()
        }

    case StateDebited:
        if _, ok := event.(*MoneyCreditedEvent); ok {
            pm.state = StateCompleted
        } else if _, ok := event.(*CreditFailedEvent); ok {
            pm.state = StateFailed
            pm.refundFromAccount()
        }
    }
}

参考资料

  • Martin Fowler - CQRS
  • Greg Young - Event Sourcing
  • Microsoft - CQRS Pattern
  • Axon Framework
  • EventStore
Prev
第7章:DDD领域驱动设计