HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

【编程难度第五名】普通业务系统 - 中高级开发者的主战场

本系列文章

➤ [NO.1 调度器]

➤ [NO.2 一致性协议](Paxos / Raft)

➤ [NO.3 高性能异步系统](消息队列、回调、重试)

➤ [NO.4 交易系统](钱的事不能错)

➤ NO.5 普通业务系统(绝大多数人做的)

一、什么是普通业务系统?

1.1 定义和范围

代表系统:

  • 登录注册系统: 用户认证、权限管理
  • 内容管理系统: 文章、评论、点赞
  • 推荐系统: Feed 流、个性化推荐
  • 搜索系统: 全文搜索、筛选排序
  • IM 系统: 聊天、消息推送

特点:

  • 并发量适中(1000-10000 QPS)
  • 延迟要求不高(100-500ms)
  • 数据量适中(百万-千万级)
  • 以 CRUD 为主,但有一定业务逻辑
  • 需要考虑缓存、分页、搜索等

与其他系统的差异:

CRUD:
- 纯增删改查
- 无复杂逻辑
- 无缓存优化

普通业务系统:
- CRUD + 业务逻辑
- 需要缓存
- 需要优化性能
- 需要考虑并发

复杂系统(交易/异步/调度/一致性):
- 分布式事务
- 高并发(10 万+ QPS)
- 强一致性
- 状态机复杂

二、核心技术点

2.1 权限系统(RBAC)

什么是 RBAC?

RBAC: Role-Based Access Control(基于角色的访问控制)

用户 -> 角色 -> 权限 -> 资源

示例:
- 用户: 张三
- 角色: 编辑
- 权限: 文章管理(读、写、删除)
- 资源: /api/articles/*

数据库设计

-- 用户表
CREATE TABLE users (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    username    VARCHAR(64) UNIQUE NOT NULL,
    password    VARCHAR(128) NOT NULL,  -- bcrypt hash
    email       VARCHAR(128),
    phone       VARCHAR(32),
    status      VARCHAR(32) NOT NULL DEFAULT 'Active',
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    INDEX idx_username (username),
    INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 角色表
CREATE TABLE roles (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    name        VARCHAR(64) UNIQUE NOT NULL,
    description VARCHAR(256),
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 权限表
CREATE TABLE permissions (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    name        VARCHAR(64) UNIQUE NOT NULL,
    resource    VARCHAR(128) NOT NULL,  -- 资源路径,如 /api/articles
    action      VARCHAR(32) NOT NULL,   -- 操作,如 read/write/delete
    description VARCHAR(256),
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_name (name),
    INDEX idx_resource (resource)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 用户-角色关联表
CREATE TABLE user_roles (
    user_id     BIGINT NOT NULL,
    role_id     BIGINT NOT NULL,
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    PRIMARY KEY (user_id, role_id),
    INDEX idx_user (user_id),
    INDEX idx_role (role_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 角色-权限关联表
CREATE TABLE role_permissions (
    role_id         BIGINT NOT NULL,
    permission_id   BIGINT NOT NULL,
    created_at      TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    PRIMARY KEY (role_id, permission_id),
    INDEX idx_role (role_id),
    INDEX idx_permission (permission_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

代码实现

type RBACService struct {
    db    *sql.DB
    redis *redis.Client
}

// 检查用户是否有权限
func (s *RBACService) HasPermission(
    ctx context.Context,
    userID int64,
    resource string,
    action string,
) (bool, error) {
    // 1. 从缓存获取用户权限
    cacheKey := fmt.Sprintf("user_permissions:%d", userID)
    cached, err := s.redis.Get(ctx, cacheKey).Result()

    if err == nil {
        // 命中缓存
        var permissions []Permission
        json.Unmarshal([]byte(cached), &permissions)

        return s.checkPermission(permissions, resource, action), nil
    }

    // 2. 从数据库查询
    permissions, err := s.getUserPermissions(ctx, userID)
    if err != nil {
        return false, err
    }

    // 3. 写入缓存(1 小时)
    data, _ := json.Marshal(permissions)
    s.redis.Set(ctx, cacheKey, data, 1*time.Hour)

    // 4. 检查权限
    return s.checkPermission(permissions, resource, action), nil
}

func (s *RBACService) getUserPermissions(
    ctx context.Context,
    userID int64,
) ([]Permission, error) {
    rows, err := s.db.QueryContext(ctx, `
        SELECT DISTINCT p.id, p.name, p.resource, p.action
        FROM permissions p
        JOIN role_permissions rp ON p.id = rp.permission_id
        JOIN roles r ON rp.role_id = r.id
        JOIN user_roles ur ON r.id = ur.role_id
        WHERE ur.user_id = ?
    `, userID)

    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var permissions []Permission
    for rows.Next() {
        var p Permission
        if err := rows.Scan(&p.ID, &p.Name, &p.Resource, &p.Action); err != nil {
            return nil, err
        }
        permissions = append(permissions, p)
    }

    return permissions, nil
}

func (s *RBACService) checkPermission(
    permissions []Permission,
    resource string,
    action string,
) bool {
    for _, p := range permissions {
        // 支持通配符
        // 例如: /api/articles/* 匹配 /api/articles/123
        if matchPattern(p.Resource, resource) && p.Action == action {
            return true
        }

        // 支持 * 操作(所有操作)
        if matchPattern(p.Resource, resource) && p.Action == "*" {
            return true
        }
    }

    return false
}

func matchPattern(pattern, resource string) bool {
    // 简单实现:支持 * 通配符
    if pattern == "*" {
        return true
    }

    if strings.HasSuffix(pattern, "/*") {
        prefix := strings.TrimSuffix(pattern, "/*")
        return strings.HasPrefix(resource, prefix)
    }

    return pattern == resource
}

// 分配角色给用户
func (s *RBACService) AssignRole(
    ctx context.Context,
    userID int64,
    roleID int64,
) error {
    _, err := s.db.ExecContext(ctx, `
        INSERT INTO user_roles (user_id, role_id)
        VALUES (?, ?)
        ON DUPLICATE KEY UPDATE user_id = user_id
    `, userID, roleID)

    if err != nil {
        return err
    }

    // 清除用户权限缓存
    cacheKey := fmt.Sprintf("user_permissions:%d", userID)
    s.redis.Del(ctx, cacheKey)

    return nil
}

// 创建角色
func (s *RBACService) CreateRole(
    ctx context.Context,
    name string,
    description string,
    permissionIDs []int64,
) (int64, error) {
    // 开启事务
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return 0, err
    }
    defer tx.Rollback()

    // 1. 创建角色
    result, err := tx.ExecContext(ctx, `
        INSERT INTO roles (name, description)
        VALUES (?, ?)
    `, name, description)

    if err != nil {
        return 0, err
    }

    roleID, _ := result.LastInsertId()

    // 2. 分配权限
    for _, permID := range permissionIDs {
        _, err = tx.ExecContext(ctx, `
            INSERT INTO role_permissions (role_id, permission_id)
            VALUES (?, ?)
        `, roleID, permID)

        if err != nil {
            return 0, err
        }
    }

    // 提交事务
    if err := tx.Commit(); err != nil {
        return 0, err
    }

    return roleID, nil
}

中间件集成

// 权限检查中间件
func PermissionMiddleware(rbac *RBACService) gin.HandlerFunc {
    return func(c *gin.Context) {
        // 1. 获取用户 ID(从 JWT token 或 session)
        userID, exists := c.Get("user_id")
        if !exists {
            c.JSON(401, gin.H{"error": "Unauthorized"})
            c.Abort()
            return
        }

        // 2. 获取资源和操作
        resource := c.Request.URL.Path
        action := strings.ToLower(c.Request.Method)

        // HTTP Method -> Action 映射
        actionMap := map[string]string{
            "get":    "read",
            "post":   "write",
            "put":    "write",
            "patch":  "write",
            "delete": "delete",
        }

        action = actionMap[action]

        // 3. 检查权限
        hasPermission, err := rbac.HasPermission(
            c.Request.Context(),
            userID.(int64),
            resource,
            action,
        )

        if err != nil {
            c.JSON(500, gin.H{"error": "Internal server error"})
            c.Abort()
            return
        }

        if !hasPermission {
            c.JSON(403, gin.H{"error": "Forbidden"})
            c.Abort()
            return
        }

        c.Next()
    }
}

// 使用示例
func main() {
    r := gin.Default()

    // 应用权限中间件
    r.Use(PermissionMiddleware(rbacService))

    r.GET("/api/articles", listArticles)
    r.POST("/api/articles", createArticle)
    r.DELETE("/api/articles/:id", deleteArticle)

    r.Run(":8080")
}

2.2 缓存策略

Cache-Aside 模式(最常用)

type ArticleService struct {
    db    *sql.DB
    redis *redis.Client
}

// 读取文章(带缓存)
func (s *ArticleService) GetArticle(
    ctx context.Context,
    id int64,
) (*Article, error) {
    // 1. 查缓存
    cacheKey := fmt.Sprintf("article:%d", id)
    cached, err := s.redis.Get(ctx, cacheKey).Result()

    if err == nil {
        // 命中缓存
        var article Article
        json.Unmarshal([]byte(cached), &article)

        log.Debug("cache hit", "articleID", id)
        return &article, nil
    }

    log.Debug("cache miss", "articleID", id)

    // 2. 查数据库
    var article Article
    err = s.db.QueryRowContext(ctx, `
        SELECT id, title, content, author_id, view_count, created_at
        FROM articles
        WHERE id = ?
    `, id).Scan(
        &article.ID,
        &article.Title,
        &article.Content,
        &article.AuthorID,
        &article.ViewCount,
        &article.CreatedAt,
    )

    if err == sql.ErrNoRows {
        // 缓存空值,防止缓存穿透
        s.redis.Set(ctx, cacheKey, "null", 5*time.Minute)
        return nil, errors.New("article not found")
    }

    if err != nil {
        return nil, err
    }

    // 3. 写入缓存
    data, _ := json.Marshal(article)
    s.redis.Set(ctx, cacheKey, data, 1*time.Hour)

    return &article, nil
}

// 更新文章(删除缓存)
func (s *ArticleService) UpdateArticle(
    ctx context.Context,
    article *Article,
) error {
    // 1. 更新数据库
    _, err := s.db.ExecContext(ctx, `
        UPDATE articles
        SET title = ?,
            content = ?,
            updated_at = NOW()
        WHERE id = ?
    `, article.Title, article.Content, article.ID)

    if err != nil {
        return err
    }

    // 2. 删除缓存(让下次读取时重新加载)
    cacheKey := fmt.Sprintf("article:%d", article.ID)
    s.redis.Del(ctx, cacheKey)

    log.Info("article updated, cache deleted", "articleID", article.ID)

    return nil
}

缓存穿透解决方案

问题: 大量请求不存在的 key,绕过缓存直接打到数据库

// 方案 1: 缓存空值
func (s *ArticleService) GetArticleSafe(
    ctx context.Context,
    id int64,
) (*Article, error) {
    cacheKey := fmt.Sprintf("article:%d", id)
    cached, err := s.redis.Get(ctx, cacheKey).Result()

    if err == nil {
        if cached == "null" {
            // 空值缓存
            return nil, errors.New("article not found")
        }

        var article Article
        json.Unmarshal([]byte(cached), &article)
        return &article, nil
    }

    // 查数据库
    var article Article
    err = s.db.QueryRowContext(ctx, `
        SELECT id, title, content FROM articles WHERE id = ?
    `, id).Scan(&article.ID, &article.Title, &article.Content)

    if err == sql.ErrNoRows {
        // 缓存空值(短时间)
        s.redis.Set(ctx, cacheKey, "null", 5*time.Minute)
        return nil, errors.New("article not found")
    }

    if err != nil {
        return nil, err
    }

    data, _ := json.Marshal(article)
    s.redis.Set(ctx, cacheKey, data, 1*time.Hour)

    return &article, nil
}
// 方案 2: 布隆过滤器
type BloomFilter struct {
    redis *redis.Client
    key   string
}

func (bf *BloomFilter) Add(id int64) error {
    // 使用 Redis 的 Bloom Filter(需要 RedisBloom 模块)
    return bf.redis.Do(
        context.Background(),
        "BF.ADD",
        bf.key,
        id,
    ).Err()
}

func (bf *BloomFilter) Exists(id int64) (bool, error) {
    result, err := bf.redis.Do(
        context.Background(),
        "BF.EXISTS",
        bf.key,
        id,
    ).Int()

    return result == 1, err
}

func (s *ArticleService) GetArticleWithBloom(
    ctx context.Context,
    id int64,
) (*Article, error) {
    // 1. 布隆过滤器检查
    exists, _ := s.bloomFilter.Exists(id)
    if !exists {
        // 一定不存在,直接返回
        return nil, errors.New("article not found")
    }

    // 2. 可能存在,查缓存和数据库
    return s.GetArticle(ctx, id)
}

缓存雪崩解决方案

问题: 大量缓存同时过期,请求全部打到数据库

// 方案: 随机过期时间
func (s *ArticleService) SetCacheWithRandomTTL(
    ctx context.Context,
    key string,
    value interface{},
    baseTTL time.Duration,
) error {
    // 在基础 TTL 上增加随机时间(0-30 分钟)
    randomExtra := time.Duration(rand.Intn(1800)) * time.Second
    ttl := baseTTL + randomExtra

    data, _ := json.Marshal(value)
    return s.redis.Set(ctx, key, data, ttl).Err()
}

// 使用示例
func (s *ArticleService) GetArticleWithRandomTTL(
    ctx context.Context,
    id int64,
) (*Article, error) {
    cacheKey := fmt.Sprintf("article:%d", id)
    cached, err := s.redis.Get(ctx, cacheKey).Result()

    if err == nil {
        var article Article
        json.Unmarshal([]byte(cached), &article)
        return &article, nil
    }

    // 查数据库
    var article Article
    err = s.db.QueryRowContext(ctx, `...`).Scan(...)

    if err != nil {
        return nil, err
    }

    // 缓存(随机 TTL)
    s.SetCacheWithRandomTTL(ctx, cacheKey, article, 1*time.Hour)

    return &article, nil
}

2.3 分页查询

Offset 分页(最常用)

type PageRequest struct {
    Page     int `form:"page" binding:"min=1"`
    PageSize int `form:"page_size" binding:"min=1,max=100"`
}

type PageResponse struct {
    Total    int64       `json:"total"`
    Page     int         `json:"page"`
    PageSize int         `json:"page_size"`
    Data     interface{} `json:"data"`
}

func (s *ArticleService) ListArticles(
    ctx context.Context,
    req PageRequest,
) (*PageResponse, error) {
    // 1. 计算偏移量
    offset := (req.Page - 1) * req.PageSize

    // 2. 查询总数(可以缓存)
    var total int64
    err := s.db.QueryRowContext(ctx, `
        SELECT COUNT(*) FROM articles WHERE status = 'Published'
    `).Scan(&total)

    if err != nil {
        return nil, err
    }

    // 3. 查询数据
    rows, err := s.db.QueryContext(ctx, `
        SELECT id, title, author_id, view_count, created_at
        FROM articles
        WHERE status = 'Published'
        ORDER BY created_at DESC
        LIMIT ? OFFSET ?
    `, req.PageSize, offset)

    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var articles []Article
    for rows.Next() {
        var a Article
        rows.Scan(&a.ID, &a.Title, &a.AuthorID, &a.ViewCount, &a.CreatedAt)
        articles = append(articles, a)
    }

    return &PageResponse{
        Total:    total,
        Page:     req.Page,
        PageSize: req.PageSize,
        Data:     articles,
    }, nil
}

优点: 简单,支持跳页 缺点: 深度分页性能差(OFFSET 10000 很慢)


Cursor 分页(更好的性能)

type CursorRequest struct {
    Cursor   int64 `form:"cursor"`     // 上一页最后一条的 ID
    PageSize int   `form:"page_size" binding:"min=1,max=100"`
}

type CursorResponse struct {
    NextCursor int64       `json:"next_cursor"`
    HasMore    bool        `json:"has_more"`
    Data       interface{} `json:"data"`
}

func (s *ArticleService) ListArticlesCursor(
    ctx context.Context,
    req CursorRequest,
) (*CursorResponse, error) {
    // 查询数据(多查一条判断是否还有更多)
    rows, err := s.db.QueryContext(ctx, `
        SELECT id, title, author_id, view_count, created_at
        FROM articles
        WHERE status = 'Published'
          AND id < ?
        ORDER BY id DESC
        LIMIT ?
    `, req.Cursor, req.PageSize+1)

    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var articles []Article
    for rows.Next() {
        var a Article
        rows.Scan(&a.ID, &a.Title, &a.AuthorID, &a.ViewCount, &a.CreatedAt)
        articles = append(articles, a)
    }

    // 判断是否还有更多
    hasMore := len(articles) > req.PageSize
    if hasMore {
        articles = articles[:req.PageSize]
    }

    // 下一页的 cursor
    var nextCursor int64
    if len(articles) > 0 {
        nextCursor = articles[len(articles)-1].ID
    }

    return &CursorResponse{
        NextCursor: nextCursor,
        HasMore:    hasMore,
        Data:       articles,
    }, nil
}

优点: 性能好,适合深度分页 缺点: 不支持跳页


2.4 全文搜索

使用 Elasticsearch

type SearchService struct {
    es *elasticsearch.Client
}

// 索引文章到 ES
func (s *SearchService) IndexArticle(article *Article) error {
    doc := map[string]interface{}{
        "id":         article.ID,
        "title":      article.Title,
        "content":    article.Content,
        "author_id":  article.AuthorID,
        "view_count": article.ViewCount,
        "created_at": article.CreatedAt,
    }

    data, _ := json.Marshal(doc)

    req := esapi.IndexRequest{
        Index:      "articles",
        DocumentID: strconv.FormatInt(article.ID, 10),
        Body:       bytes.NewReader(data),
        Refresh:    "true",
    }

    res, err := req.Do(context.Background(), s.es)
    if err != nil {
        return err
    }
    defer res.Body.Close()

    if res.IsError() {
        return fmt.Errorf("index failed: %s", res.String())
    }

    return nil
}

// 搜索文章
func (s *SearchService) SearchArticles(
    keyword string,
    page int,
    pageSize int,
) ([]Article, int64, error) {
    // 构建查询
    query := map[string]interface{}{
        "query": map[string]interface{}{
            "multi_match": map[string]interface{}{
                "query":  keyword,
                "fields": []string{"title^3", "content"},  // title 权重更高
            },
        },
        "from": (page - 1) * pageSize,
        "size": pageSize,
        "sort": []map[string]interface{}{
            {"_score": "desc"},
            {"created_at": "desc"},
        },
    }

    data, _ := json.Marshal(query)

    res, err := s.es.Search(
        s.es.Search.WithContext(context.Background()),
        s.es.Search.WithIndex("articles"),
        s.es.Search.WithBody(bytes.NewReader(data)),
    )

    if err != nil {
        return nil, 0, err
    }
    defer res.Body.Close()

    // 解析结果
    var result map[string]interface{}
    json.NewDecoder(res.Body).Decode(&result)

    hits := result["hits"].(map[string]interface{})
    total := int64(hits["total"].(map[string]interface{})["value"].(float64))

    var articles []Article
    for _, hit := range hits["hits"].([]interface{}) {
        h := hit.(map[string]interface{})
        source := h["_source"].(map[string]interface{})

        article := Article{
            ID:        int64(source["id"].(float64)),
            Title:     source["title"].(string),
            Content:   source["content"].(string),
            AuthorID:  int64(source["author_id"].(float64)),
            ViewCount: int(source["view_count"].(float64)),
        }

        articles = append(articles, article)
    }

    return articles, total, nil
}

2.5 消息队列(异步处理)

使用 Redis Streams

type MessageQueue struct {
    redis *redis.Client
}

// 发送消息
func (mq *MessageQueue) Publish(
    ctx context.Context,
    topic string,
    message interface{},
) error {
    data, _ := json.Marshal(message)

    return mq.redis.XAdd(ctx, &redis.XAddArgs{
        Stream: topic,
        Values: map[string]interface{}{
            "data": data,
        },
    }).Err()
}

// 消费消息
func (mq *MessageQueue) Subscribe(
    ctx context.Context,
    topic string,
    group string,
    consumer string,
    handler func(interface{}) error,
) error {
    // 创建消费者组
    mq.redis.XGroupCreateMkStream(ctx, topic, group, "0")

    for {
        // 读取消息
        streams, err := mq.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    group,
            Consumer: consumer,
            Streams:  []string{topic, ">"},
            Count:    10,
            Block:    5 * time.Second,
        }).Result()

        if err != nil {
            if err == redis.Nil {
                continue
            }
            return err
        }

        for _, stream := range streams {
            for _, message := range stream.Messages {
                data := message.Values["data"].(string)

                var msg interface{}
                json.Unmarshal([]byte(data), &msg)

                // 处理消息
                err := handler(msg)
                if err != nil {
                    log.Error("handle message failed",
                        "messageID", message.ID,
                        "err", err)
                    continue
                }

                // ACK
                mq.redis.XAck(ctx, topic, group, message.ID)
            }
        }
    }
}

// 使用示例
func main() {
    mq := &MessageQueue{redis: redisClient}

    // 发布消息
    go func() {
        for {
            mq.Publish(context.Background(), "email", map[string]interface{}{
                "to":      "user@example.com",
                "subject": "Welcome",
                "body":    "Welcome to our platform!",
            })
            time.Sleep(1 * time.Second)
        }
    }()

    // 消费消息
    mq.Subscribe(
        context.Background(),
        "email",
        "email-workers",
        "worker-1",
        func(msg interface{}) error {
            // 发送邮件
            log.Info("sending email", "msg", msg)
            return nil
        },
    )
}

三、完整案例:博客系统

3.1 需求

  • 用户注册、登录
  • 发布文章、编辑、删除
  • 评论、点赞
  • 文章搜索
  • Feed 流推荐

3.2 数据库设计

-- 用户表
CREATE TABLE users (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    username    VARCHAR(64) UNIQUE NOT NULL,
    email       VARCHAR(128) UNIQUE NOT NULL,
    password    VARCHAR(128) NOT NULL,
    avatar      VARCHAR(256),
    bio         TEXT,
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_username (username),
    INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 文章表
CREATE TABLE articles (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    title       VARCHAR(256) NOT NULL,
    content     TEXT NOT NULL,
    author_id   BIGINT NOT NULL,
    status      VARCHAR(32) NOT NULL DEFAULT 'Draft',
    view_count  INT NOT NULL DEFAULT 0,
    like_count  INT NOT NULL DEFAULT 0,
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    INDEX idx_author (author_id),
    INDEX idx_status_created (status, created_at),
    FULLTEXT idx_content (title, content)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 评论表
CREATE TABLE comments (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    article_id  BIGINT NOT NULL,
    user_id     BIGINT NOT NULL,
    content     TEXT NOT NULL,
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_article (article_id),
    INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 点赞表
CREATE TABLE likes (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    article_id  BIGINT NOT NULL,
    user_id     BIGINT NOT NULL,
    created_at  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    UNIQUE KEY uk_article_user (article_id, user_id),
    INDEX idx_article (article_id),
    INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3.3 核心功能实现

发布文章

func (s *ArticleService) PublishArticle(
    ctx context.Context,
    userID int64,
    title string,
    content string,
) (*Article, error) {
    // 1. 创建文章
    result, err := s.db.ExecContext(ctx, `
        INSERT INTO articles (title, content, author_id, status)
        VALUES (?, ?, ?, 'Published')
    `, title, content, userID)

    if err != nil {
        return nil, err
    }

    articleID, _ := result.LastInsertId()

    article := &Article{
        ID:        articleID,
        Title:     title,
        Content:   content,
        AuthorID:  userID,
        Status:    "Published",
        CreatedAt: time.Now(),
    }

    // 2. 索引到 ES(异步)
    go s.searchService.IndexArticle(article)

    // 3. 推送到粉丝 Feed(异步)
    go s.feedService.PushToFollowers(userID, articleID)

    log.Info("article published",
        "articleID", articleID,
        "authorID", userID)

    return article, nil
}

点赞文章

func (s *ArticleService) LikeArticle(
    ctx context.Context,
    userID int64,
    articleID int64,
) error {
    // 1. 检查是否已点赞(Redis)
    likeKey := fmt.Sprintf("like:%d:%d", articleID, userID)
    exists, _ := s.redis.Exists(ctx, likeKey).Result()

    if exists > 0 {
        return errors.New("already liked")
    }

    // 2. 插入点赞记录
    _, err := s.db.ExecContext(ctx, `
        INSERT INTO likes (article_id, user_id)
        VALUES (?, ?)
    `, articleID, userID)

    if err != nil {
        // 可能重复点赞(并发)
        if strings.Contains(err.Error(), "Duplicate") {
            return errors.New("already liked")
        }
        return err
    }

    // 3. 增加文章点赞数
    s.db.ExecContext(ctx, `
        UPDATE articles
        SET like_count = like_count + 1
        WHERE id = ?
    `, articleID)

    // 4. 缓存点赞状态(1 天)
    s.redis.Set(ctx, likeKey, 1, 24*time.Hour)

    // 5. 删除文章缓存
    s.redis.Del(ctx, fmt.Sprintf("article:%d", articleID))

    log.Info("article liked",
        "articleID", articleID,
        "userID", userID)

    return nil
}

Feed 流推荐

type FeedService struct {
    db    *sql.DB
    redis *redis.Client
}

// 推送文章到粉丝 Feed
func (s *FeedService) PushToFollowers(
    authorID int64,
    articleID int64,
) error {
    // 1. 查询粉丝列表
    rows, err := s.db.Query(`
        SELECT user_id FROM follows WHERE author_id = ?
    `, authorID)

    if err != nil {
        return err
    }
    defer rows.Close()

    // 2. 推送到每个粉丝的 Feed
    for rows.Next() {
        var followerID int64
        rows.Scan(&followerID)

        // 使用 Redis Sorted Set(按时间排序)
        feedKey := fmt.Sprintf("feed:%d", followerID)
        score := float64(time.Now().Unix())

        s.redis.ZAdd(context.Background(), feedKey, &redis.Z{
            Score:  score,
            Member: articleID,
        })

        // 只保留最新 1000 条
        s.redis.ZRemRangeByRank(context.Background(), feedKey, 0, -1001)
    }

    return nil
}

// 获取用户 Feed
func (s *FeedService) GetUserFeed(
    userID int64,
    page int,
    pageSize int,
) ([]Article, error) {
    feedKey := fmt.Sprintf("feed:%d", userID)

    // 从 Redis 获取文章 ID 列表(倒序)
    start := int64((page - 1) * pageSize)
    stop := start + int64(pageSize) - 1

    articleIDs, err := s.redis.ZRevRange(
        context.Background(),
        feedKey,
        start,
        stop,
    ).Result()

    if err != nil {
        return nil, err
    }

    // 批量查询文章
    var articles []Article
    for _, idStr := range articleIDs {
        articleID, _ := strconv.ParseInt(idStr, 10, 64)
        article, err := articleService.GetArticle(context.Background(), articleID)
        if err == nil {
            articles = append(articles, *article)
        }
    }

    return articles, nil
}

四、性能优化技巧

4.1 数据库优化

-- 1. 添加索引
CREATE INDEX idx_articles_author_status ON articles(author_id, status);
CREATE INDEX idx_articles_created ON articles(created_at DESC);

-- 2. 优化查询
-- 差的查询
SELECT * FROM articles;  -- 避免 SELECT *

-- 好的查询
SELECT id, title, author_id, created_at
FROM articles
WHERE status = 'Published'
ORDER BY created_at DESC
LIMIT 20;

-- 3. 使用 EXPLAIN 分析
EXPLAIN SELECT * FROM articles WHERE author_id = 123;

4.2 N+1 查询问题

//  差的做法(N+1 查询)
func listArticlesWithAuthor(ctx context.Context) ([]ArticleWithAuthor, error) {
    // 1. 查询文章列表
    rows, _ := db.Query("SELECT id, title, author_id FROM articles LIMIT 20")
    defer rows.Close()

    var results []ArticleWithAuthor
    for rows.Next() {
        var article Article
        rows.Scan(&article.ID, &article.Title, &article.AuthorID)

        // 2. 为每篇文章查询作者(N 次查询!)
        var author User
        db.QueryRow(
            "SELECT id, username FROM users WHERE id = ?",
            article.AuthorID,
        ).Scan(&author.ID, &author.Username)

        results = append(results, ArticleWithAuthor{
            Article: article,
            Author:  author,
        })
    }

    return results, nil
}

//  好的做法(2 次查询)
func listArticlesWithAuthorOptimized(ctx context.Context) ([]ArticleWithAuthor, error) {
    // 1. 查询文章列表
    rows, _ := db.Query("SELECT id, title, author_id FROM articles LIMIT 20")
    defer rows.Close()

    var articles []Article
    authorIDs := make(map[int64]bool)

    for rows.Next() {
        var article Article
        rows.Scan(&article.ID, &article.Title, &article.AuthorID)
        articles = append(articles, article)
        authorIDs[article.AuthorID] = true
    }

    // 2. 批量查询作者(1 次查询)
    ids := make([]int64, 0, len(authorIDs))
    for id := range authorIDs {
        ids = append(ids, id)
    }

    query := fmt.Sprintf(
        "SELECT id, username FROM users WHERE id IN (%s)",
        strings.Repeat("?,", len(ids)-1)+"?",
    )

    args := make([]interface{}, len(ids))
    for i, id := range ids {
        args[i] = id
    }

    rows2, _ := db.Query(query, args...)
    defer rows2.Close()

    authors := make(map[int64]User)
    for rows2.Next() {
        var author User
        rows2.Scan(&author.ID, &author.Username)
        authors[author.ID] = author
    }

    // 3. 组装结果
    var results []ArticleWithAuthor
    for _, article := range articles {
        results = append(results, ArticleWithAuthor{
            Article: article,
            Author:  authors[article.AuthorID],
        })
    }

    return results, nil
}

五、总结

5.1 为什么普通业务系统排第五?

比 CRUD 难:

  • 有业务逻辑
  • 需要缓存优化
  • 需要考虑性能
  • 需要权限控制

比复杂系统简单:

  • 无分布式事务
  • 并发量适中
  • 延迟要求不高
  • 无复杂状态机

5.2 关键技术点

必备技能:

  • RBAC 权限系统
  • Redis 缓存
  • 分页查询
  • 全文搜索(ES)
  • 消息队列

性能优化:

  • 数据库索引
  • 查询优化
  • 缓存策略
  • 批量查询

最佳实践:

  • 缓存穿透/雪崩/击穿
  • N+1 查询问题
  • 深度分页优化
  • 异步处理

5.3 学习路径

  1. 掌握基础: SQL、Redis、HTTP
  2. 学习框架: Gin/Echo、GORM
  3. 实践项目: 博客、论坛、电商
  4. 性能优化: 缓存、索引、分页
  5. 生产经验: 监控、日志、错误处理

普通业务系统是中高级开发者的主战场!

本系列文章

➤ [NO.1 调度器]

➤ [NO.2 一致性协议](Paxos / Raft)

➤ [NO.3 高性能异步系统](消息队列、回调、重试)

➤ [NO.4 交易系统](钱的事不能错)

➤ NO.5 普通业务系统(绝大多数人做的)